Okay, so it works if I write it more intuitively, like this:

  def runTxn[A](client: MongoClient)(txn: ClientSession => Observable[A]): Observable[ClientSession] = {
    for {
      session <- client.startSession()
      _ = session.startTransaction()
      _ <- txn(session)
      _ <- session.commitTransaction()
    } yield {
      session.close()
      session
    }
  }

But the question remains: why is the example written the way it is? If you look at the code, you’ll see that it boils down to my code above:

  def runTxn[A](client: MongoClient)(txn: ClientSession => A): Observable[Unit] = {
    client
      .startSession()
      .map { session =>
        session.startTransaction()
        txn(session)
        session
      }.flatMap { session =>
        session.commitTransaction().map { _ =>
          println("Committed transaction")
          session.close()
        }
      }
  }

In particular:

  • The Observable(s) generated by the txn (which are an updateOne / insertOne in the example) are “thrown away” instead of being (flat)mapped over like in my example above, so that the transaction commit is disconnected from them.
  • The session is never closed.

I just don’t see how the provided example can possibly work. Can someone from the team please set me straight?