Okay, so it works if I write it more intuitively, like this:
def runTxn[A](client: MongoClient)(txn: ClientSession => Observable[A]): Observable[ClientSession] = {
for {
session <- client.startSession()
_ = session.startTransaction()
_ <- txn(session)
_ <- session.commitTransaction()
} yield {
session.close()
session
}
}
But the question remains: why is the example written the way it is? If you look at the code, you’ll see that it boils down to my code above:
def runTxn[A](client: MongoClient)(txn: ClientSession => A): Observable[Unit] = {
client
.startSession()
.map { session =>
session.startTransaction()
txn(session)
session
}.flatMap { session =>
session.commitTransaction().map { _ =>
println("Committed transaction")
session.close()
}
}
}
In particular:
- The Observable(s) generated by the txn (which are an
updateOne / insertOne in the example) are “thrown away” instead of being (flat)mapped over like in my example above, so that the transaction commit is disconnected from them.
- The session is never closed.
I just don’t see how the provided example can possibly work. Can someone from the team please set me straight?