Docs Menu
Docs Home
/ / /
Scala

Access Data From an Observable

On this page

  • Overview
  • How to Process an Observable
  • Sample Data
  • Use Callbacks to Process Results
  • Access Read Operation Results
  • Access Write Operation Results
  • Use Lambda Functions to Process Results
  • Example
  • Use Futures to Retrieve All Results
  • Example
  • API Documentation

In this guide, you can learn how to access the results of MongoDB operations from an Observable instance.

An Observable represents a stream of data emitted by an operation over time. To access this data, you can create an Observer instance that subscribes to the Observable.

Note

The Scala driver is built upon the MongoDB Java Reactive Streams driver. The Observable class extends the Publisher class from Java Reactive Streams and includes additional convenience methods to help process results.

To run a MongoDB operation and process its data, you must request the operation results from an Observable. The driver provides the Observable interface for operations that return any number of results. Operations that produce no results or one result, such as the findOne() method, return a SingleObservable[T]. The [T] parameterization corresponds to the data type that the SingleObservable handles.

Operations that can produce an unbounded number of results return an instance of the Observable[T] type. Some operations return specific Observable types that provide additional methods to filter and process results before subscribing to them. The following list describes some types that allow you to chain operation-specific methods to the Observable:

  • FindObservable[T]: Returned by the find() method

  • DistinctObservable[T]: Returned by the distinct() method

  • AggregateObservable[T]: Returned by the aggregate() method

You can request operation results by calling the subscribe() method on the operation's Observable. Pass an instance of the Observer class as a parameter to the subscribe() method. This Observer receives the operation results from the Observable. Then, you can use methods provided by the Observer class to print results, handle errors, and perform additional data processing.

To learn more about processing results, see the following API documentation:

  • Observable

  • Subscription

  • Observer

The examples in this guide use the restaurants collection in the sample_restaurants database from the Atlas sample datasets. To access this collection from your Scala application, create a MongoClient that connects to an Atlas cluster and assign the following values to your database and collection variables:

val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see the Get Started with Atlas guide.

After subscribing to an Observable[T], you can use the following callback methods provided by the Observer class to access operation results or handle errors:

  • onNext(result: TResult): Called when the Observer receives new results. You can define logic for processing results by overriding this method.

  • onError(e: Throwable): Called when the operation generates an error and prevents the Observer from receiving more data from the Observable. You can define error handling logic by overriding this method.

  • onComplete(): Called when the Observer has consumed all results from the Observable. You can perform any final data processing by overriding this method.

The following sections show how to customize these methods to process read and write operation results from an Observable.

To access data retrieved by a read operation, create an Observable[T] to store the operation results. Then, subscribe to the observable and override the Observer class callback methods to process the results.

This example queries the restaurants collection for documents in which the cuisine value is "Czech". To retrieve and process results, the example assigns a Observable[Document] to the operation and performs the following actions:

  • Calls the subscribe() method to subscribe to the Observable and passes an Observer as a parameter

  • Overrides the onNext() method to print each retrieved document, which are Document instances

  • Overrides the onError() method to print any errors

  • Overrides the onComplete() methods to print a message after all the results from the Observable are processed

val filter = equal("cuisine", "Czech")
val findObservable: Observable[Document] = collection.find(filter)
findObservable.subscribe(new Observer[Document] {
override def onNext(result: Document): Unit = println(result)
override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage)
override def onComplete(): Unit = println("Processed all results")
})
Iterable((_id, ...), ..., (name,BsonString{value='Koliba Restaurant'}),
(restaurant_id,BsonString{value='40812870'}))
Iterable((_id, ...), ..., (name,BsonString{value='Bohemian Beer Garden'}),
(restaurant_id,BsonString{value='41485121'}))
Iterable((_id,...), ..., (name,BsonString{value='Hospoda'}),
(restaurant_id,BsonString{value='41569184'}))
Iterable((_id,...), ..., (name,BsonString{value='Olde Prague Tavern'}),
(restaurant_id,BsonString{value='41711983'}))
Processed all results

To access data retrieved by a write operation, create an Observable[T] to store the operation results. Then, subscribe to the observable and override the Observer class callback methods to process the results.

This example inserts documents into the restaurants collection in which the cuisine value is "Nepalese". To retrieve and process results, the example assigns an Observable[InsertManyResult] to the operation and performs the following actions:

  • Calls the subscribe() method to subscribe to the Observable and passes an Observer as a parameter

  • Overrides the onNext() method to print the result of the insert operation, returned as an InsertManyResult

  • Overrides the onError() method to print any errors

  • Overrides the onComplete() methods to print a message after all the results from the Observable are processed

val docs: Seq[Document] = Seq(
Document("name" -> "Cafe Himalaya", "cuisine" -> "Nepalese"),
Document("name" -> "Taste From Everest", "cuisine" -> "Nepalese")
)
val insertObservable: Observable[InsertManyResult] = collection.insertMany(docs)
insertObservable.subscribe(new Observer[InsertManyResult] {
override def onNext(result: InsertManyResult): Unit = println(result)
override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage)
override def onComplete(): Unit = println("Processed all results")
})
AcknowledgedInsertManyResult{insertedIds={0=BsonObjectId{value=...},
1=BsonObjectId{value=...}}}
Processed all results

Instead of explicitly overriding the callback functions from the Observer class, you can use lambda functions to concisely process operation results. These functions allow you to use the => arrow notation to customize the implementation of onNext(), onError(), and onComplete().

Tip

To learn more about lambda functions, also known as anonymous functions, see the Anonymous function Wikipedia entry.

This example queries the restaurants collection for each distinct value of the borough field. The code subscribes to the Observable returned by the distinct() method, then uses lambda functions to print results and handle errors:

collection.distinct("borough")
.subscribe((value: String) => println(value),
(e: Throwable) => println(s"Failed: $e"),
() => println("Processed all results"))
Bronx
Brooklyn
Manhattan
Missing
Queens
Staten Island
Processed all results

You can subscribe to an Observable implicitly and aggregate its results by calling the toFuture() method. When you call toFuture() on an Observable, the driver performs the following actions:

  • Subscribes to the Observable

  • Collects the items emitted by the Observable and stores them in a Future instance

Then, you can iterate through the Future to retrieve the operation results.

Important

If your Observable contains a large number of documents, calling the toFuture() method will consume significant memory. If you expect a large result set, consider using callback or lambda functions to access results.

This example queries the restaurants collection for documents in which the value of the name field is "The Halal Guys". To access the operation results, the code converts the Observable to a Future, waits until the Future collects each result, and iterates through the results:

val observable = collection.find(equal("name", "The Halal Guys"))
val results = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS))
results.foreach(println)
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}),
(restaurant_id,BsonString{value='50012258'}))
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}),
(restaurant_id,BsonString{value='50017823'}))

To learn more about any of the methods or types discussed in this guide, see the following API documentation:

Back

Data Aggregation