Perform a Transaction
Overview
In this guide, you can learn how to use the Scala driver to perform transactions. Transactions allow you to perform a series of operations that change data only if the entire transaction is committed. If any operation in the transaction does not succeed, the driver stops the transaction and discards all data changes before they ever become visible. This feature is called atomicity.
In MongoDB, transactions run within logical sessions. A session is a grouping of related read or write operations that you want to run sequentially. Sessions enable causal consistency for a group of operations and allow you to run operations in an ACID-compliant transaction, which is a transaction that meets an expectation of atomicity, consistency, isolation, and durability. MongoDB guarantees that the data involved in your transaction operations remains consistent, even if the operations encounter unexpected errors.
When using the Scala driver, you can start a ClientSession
by
calling the startSession()
method on your client. Then, you can
perform transactions within the session.
Warning
Use a ClientSession
only in operations running on the
MongoClient
that created it. Using a ClientSession
with a
different MongoClient
results in operation errors.
Methods
After calling the startSession()
method to start a session, you can
use methods from the ClientSession
class to modify the session state.
The following table describes the methods you can use to manage a
transaction:
Method | Description |
---|---|
| Starts a new transaction on this session. You cannot start a
transaction if there's already an active transaction running in
the session. You can set transaction options by passing a TransactionOptions
instance as a parameter. |
| Commits the active transaction for this session. This method returns an
error if there is no active transaction for the session, the
transaction was previously ended, or if there is a write conflict. |
| Ends the active transaction for this session. This method returns an
error if there is no active transaction for the session or if the
transaction was committed or ended. |
Transaction Example
This example defines a runTransaction()
method that
modifies data in the collections of the sample_mflix
database.
The code performs the following actions:
Creates
MongoCollection
instances to access themovies
andusers
collectionsSpecifies the read and write concern for the transaction
Starts the transaction
Inserts a document into the
movies
collection and prints the resultsUpdates a document in the
users
collection and prints the results
def runTransaction( database: MongoDatabase, observable: SingleObservable[ClientSession] ): SingleObservable[ClientSession] = { observable.map(clientSession => { val moviesCollection = database.getCollection("movies") val usersCollection = database.getCollection("users") val transactionOptions = TransactionOptions .builder() .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build() // Starts the transaction with specified options clientSession.startTransaction(transactionOptions) // Inserts a document into the "movies" collection val insertObservable = moviesCollection.insertOne( clientSession, Document("name" -> "The Menu", "runtime" -> 107) ) val insertResult = Await.result(insertObservable.toFuture(), Duration(10, TimeUnit.SECONDS)) println(s"Insert completed: $insertResult") // Updates a document in the "users" collection val updateObservable = usersCollection.updateOne( clientSession, equal("name", "Amy Phillips"), set("name", "Amy Ryan") ) val updateResult = Await.result(updateObservable.toFuture(), Duration(10, TimeUnit.SECONDS)) println(s"Update completed: $updateResult") clientSession }) }
Note
Within a transaction, operations must run in sequence. The preceding code awaits the result of each write operation to ensure that the operations do not run concurrently.
Then, run the following code to perform the transaction. This code completes the following actions:
Creates a session from the client by using the
startSession()
methodCalls the
runTransaction()
method defined in the preceding example, passing the database and the session as parametersCommits the transaction by calling the
commitTransaction()
method and waits for the operations to complete
val client = MongoClient("<connection string>") val database = client.getDatabase("sample_mflix") val session = client.startSession(); val transactionObservable: SingleObservable[ClientSession] = runTransaction(database, session) val commitTransactionObservable: SingleObservable[Unit] = transactionObservable.flatMap(clientSession => clientSession.commitTransaction()) Await.result(commitTransactionObservable.toFuture(), Duration(10, TimeUnit.SECONDS))
Insert completed: AcknowledgedInsertOneResult{insertedId=BsonObjectId{value=...}} Update completed: AcknowledgedUpdateResult{matchedCount=1, modifiedCount=1, upsertedId=null}
Note
Parallel Operations Not Supported
The Scala driver does not support running parallel operations within a single transaction.
If you're using MongoDB Server v8.0 or later, you can perform
write operations on multiple namespaces within a single transaction by using
the bulkWrite()
method. For more information, see the Bulk Write Operations
guide.
Additional Information
To learn more about the concepts mentioned in this guide, see the following pages in the MongoDB Server manual:
To learn more about ACID compliance, see the A Guide to ACID Properties in Database Management Systems article on the MongoDB website.
To learn more about insert operations, see the Insert Documents guide.
API Documentation
To learn more about the methods and types mentioned in this guide, see the following API documentation: