Access Data From an Observable
On this page
Overview
In this guide, you can learn how to access the results of MongoDB operations
from an Observable
instance.
An Observable
represents a stream of data emitted by an operation
over time. To access this data, you can create an Observer
instance
that subscribes to the Observable
.
Note
The Scala driver is built upon the MongoDB Java Reactive Streams driver.
The Observable
class extends the Publisher
class from Java Reactive
Streams and includes additional convenience methods to help process results.
How to Process an Observable
To run a MongoDB operation and process its data, you must request
the operation results from an Observable
. The driver provides the
Observable
interface for operations that return any number of
results. Operations that produce no results or one result, such as the
findOne()
method, return a SingleObservable[T]
. The [T]
parameterization corresponds to the data type that the SingleObservable
handles.
Operations that can produce an unbounded number of results return an instance of the
Observable[T]
type. Some operations return specific Observable
types that
provide additional methods to filter and process results before subscribing to them.
The following list describes some types that allow you to chain operation-specific
methods to the Observable
:
FindObservable[T]
: Returned by thefind()
methodDistinctObservable[T]
: Returned by thedistinct()
methodAggregateObservable[T]
: Returned by theaggregate()
method
You can request operation results by calling the subscribe()
method
on the operation's Observable
. Pass an instance of the Observer
class as a parameter to the subscribe()
method. This Observer
receives
the operation results from the Observable
. Then, you can use methods
provided by the Observer
class to print results, handle errors, and
perform additional data processing.
To learn more about processing results, see the following API documentation:
Sample Data
The examples in this guide use the restaurants
collection in the sample_restaurants
database from the Atlas sample datasets. To access this collection
from your Scala application, create a MongoClient
that connects to an Atlas cluster
and assign the following values to your database
and collection
variables:
val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants") val collection: MongoCollection[Document] = database.getCollection("restaurants")
To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see the Get Started with Atlas guide.
Use Callbacks to Process Results
After subscribing to an Observable[T]
, you can use the following
callback methods provided by the Observer
class to access operation results
or handle errors:
onNext(result: TResult)
: Called when theObserver
receives new results. You can define logic for processing results by overriding this method.onError(e: Throwable)
: Called when the operation generates an error and prevents theObserver
from receiving more data from theObservable
. You can define error handling logic by overriding this method.onComplete()
: Called when theObserver
has consumed all results from theObservable
. You can perform any final data processing by overriding this method.
The following sections show how to customize these methods to process read and
write operation results from an Observable
.
Access Read Operation Results
To access data retrieved by a read operation, create an Observable[T]
to store the operation results. Then, subscribe to the observable and
override the Observer
class callback methods to process the results.
This example queries the restaurants
collection for documents
in which the cuisine
value is "Czech"
. To retrieve
and process results, the example assigns a Observable[Document]
to the operation and performs the following actions:
Calls the
subscribe()
method to subscribe to theObservable
and passes anObserver
as a parameterOverrides the
onNext()
method to print each retrieved document, which areDocument
instancesOverrides the
onError()
method to print any errorsOverrides the
onComplete()
methods to print a message after all the results from theObservable
are processed
val filter = equal("cuisine", "Czech") val findObservable: Observable[Document] = collection.find(filter) findObservable.subscribe(new Observer[Document] { override def onNext(result: Document): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
Iterable((_id, ...), ..., (name,BsonString{value='Koliba Restaurant'}), (restaurant_id,BsonString{value='40812870'})) Iterable((_id, ...), ..., (name,BsonString{value='Bohemian Beer Garden'}), (restaurant_id,BsonString{value='41485121'})) Iterable((_id,...), ..., (name,BsonString{value='Hospoda'}), (restaurant_id,BsonString{value='41569184'})) Iterable((_id,...), ..., (name,BsonString{value='Olde Prague Tavern'}), (restaurant_id,BsonString{value='41711983'})) Processed all results
Access Write Operation Results
To access data retrieved by a write operation, create an Observable[T]
to store the operation results. Then, subscribe to the observable and
override the Observer
class callback methods to process the results.
This example inserts documents into the restaurants
collection
in which the cuisine
value is "Nepalese"
. To retrieve
and process results, the example assigns an Observable[InsertManyResult]
to the operation and performs the following actions:
Calls the
subscribe()
method to subscribe to theObservable
and passes anObserver
as a parameterOverrides the
onNext()
method to print the result of the insert operation, returned as anInsertManyResult
Overrides the
onError()
method to print any errorsOverrides the
onComplete()
methods to print a message after all the results from theObservable
are processed
val docs: Seq[Document] = Seq( Document("name" -> "Cafe Himalaya", "cuisine" -> "Nepalese"), Document("name" -> "Taste From Everest", "cuisine" -> "Nepalese") ) val insertObservable: Observable[InsertManyResult] = collection.insertMany(docs) insertObservable.subscribe(new Observer[InsertManyResult] { override def onNext(result: InsertManyResult): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
AcknowledgedInsertManyResult{insertedIds={0=BsonObjectId{value=...}, 1=BsonObjectId{value=...}}} Processed all results
Use Lambda Functions to Process Results
Instead of explicitly overriding the callback functions from the
Observer
class, you can use lambda functions to concisely
process operation results. These functions allow you to use the
=>
arrow notation to customize the implementation of onNext()
,
onError()
, and onComplete()
.
Tip
To learn more about lambda functions, also known as anonymous functions, see the Anonymous function Wikipedia entry.
Example
This example queries the restaurants
collection for each
distinct value of the borough
field. The code subscribes
to the Observable
returned by the distinct()
method, then
uses lambda functions to print results and handle errors:
collection.distinct("borough") .subscribe((value: String) => println(value), (e: Throwable) => println(s"Failed: $e"), () => println("Processed all results"))
Bronx Brooklyn Manhattan Missing Queens Staten Island Processed all results
Use Futures to Retrieve All Results
You can subscribe to an Observable
implicitly and aggregate its results
by calling the toFuture()
method. When you call toFuture()
on
an Observable
, the driver performs the following actions:
Subscribes to the
Observable
Collects the items emitted by the
Observable
and stores them in aFuture
instance
Then, you can iterate through the Future
to retrieve the operation
results.
Important
Example
This example queries the restaurants
collection for documents
in which the value of the name
field is "The Halal Guys"
.
To access the operation results, the code converts the Observable
to a Future
, waits until the Future
collects each result, and
iterates through the results:
val observable = collection.find(equal("name", "The Halal Guys")) val results = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS)) results.foreach(println)
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50012258'})) Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50017823'}))
API Documentation
To learn more about any of the methods or types discussed in this guide, see the following API documentation: