I’m looking at the example given here (for which a snippet is given here). Some points of confusion:
- The session is never closed. Is this safe? What are the semantics for this?
- The updateOne and insertOne Observables doesn’t appear to be connected to the transaction commit Observable in any way (e.g., via a flatMap). So:
2a. Does this mean that the observed results of those commands may not actually get applied (e.g., on rollback)?
2b. What prevents the transaction from being committed before the commands have finished? Similarly, if I chain further commands off of them, how does the txn commit know to wait for them?
I’m sure I’m missing something fundamental, but I’m not sure what it is. Here’s a snippet of code showing what I’m trying to do:
object MongoHelper {
def runTxn[A](client: MongoClient)(txn: ClientSession => A): Observable[Unit] = {
client
.startSession()
.map { session =>
session.startTransaction()
txn(session)
session
}.flatMap { session =>
session.commitTransaction().map { _ =>
println("Committed transaction")
session.close()
}
}
}
}
...
def txnTest = {
val obs = MongoHelper.runTxn(client) { session =>
println("Inserting")
col.insertOne(session, Foo(5)).subscribe(res => println(s"Inserted: $res"))
}
Await.result(obs.head, 5.seconds)
I never get the “Inserted” message.
Thanks!