2 / 9
Sep 2021

I’m trying to setup following background service:

  • realm db to store reports
  • observe realm for closed reports and send them to server
  • this should all be done on bg thread (e.g. DispatchQueue(qos: .background))
func sendReports() { observe(RealmReport.self, predicate: .init(format: "state == %@", .closed) .receive(on: DispatchQueue(label: "report-sender") .sink { report in self.send(report: report) // makes API call } .store(in: &subscriptions) // keep observing as long as app is alive } func observe(_ type: Object.Type, predicate: NSPredicate) -> AnyPublisher<[Report], Error> { Just((type, predicate)) .receive(on: DispatchQueue.main) // should be on bg queue, but realm publisher needs runloop, I guess? .flatMap { objectType, predicate in self.realm().objects(objectType) // Realm instance is kept alive til app is killed .filter(predicate) .collectionPublisher // .threadSafeReference() // send live objects, heavy // .freeze() // lightweight; snapshot of live objects to pass to other threads, but version is cached in file until Realm instance is gone? .map(\.elements) } .tryMap { $0.map { Report(from: $0) } } .mapError { _ in Error.dbError } .eraseToAnyPublisher() }

I found some problems:

  • .freeze() is preferred to be used with Combine,
    but is it increasing active versions?
    I saw:
  • assert due to max number of active versions
  • realm accessed from wrong thread
    sometimes

You could simplify this all to something like:

struct Response { // dummy struct to mock a response from a server } func doNetworkTask(_ results: Results<Movie>) -> AnyPublisher<Result<Response, Error>, Never> { // do network things and return a publisher } let queue = DispatchQueue(label: "my-queue") let objects = try! Realm().objects(Movie.self).filter("name == 'Foo'") objects .collectionPublisher .receive(on: queue) .assertNoFailure() .flatMap(doNetworkTask) .sink { response in print(response) }.store(in: &cancellable)

Your example is causing issues because you have a Realm creating a publisher inside a publisher. This would cause a lot of issues with threading. The Realm should be accessed from outside the publisher, and then access .collectionPublisher from there.

Thanks, got it.
I used the Just().receive(on:) just to make sure that Realm() is called on main.
but I could change it and always call func observe(..) on .main.

It’s correct that it needs to be on .main, right? since it’s the only Queue with runloop, else the Realm does not auto refresh, and publisher would not send any updates. Right?

You will still get changes on a serial queue with Realm. You shouldn’t have to receive on the main queue unless you are updating the UI.

Observations:

try Realm(configuration: configuration) // when called from any other queue than .main, I get this exception after some writes: 'Expected failure: Number of active versions () in the Realm exceeded the limit of (RLMException)' .objects(objectType) .filter(predicate) .collectionPublisher .subscribe(on: DispatchQueue(label: "") // Exception NSException * "Can only add notification blocks from within runloops." .receive(on: /*some queue*/) // calls .threadSafeReference() under the hood

To prevent that downstream publishers use queues other than main, I need to:

... .collectionPublisher .subscribe(on: DispatchQueu.main)

and call try Realm() from .main

3 years later

Is there a faster way to observe for any changes (without needing to get the changed objects) than to use collectionPublisher? collectionPublisher triggers DeepChangeChecker behaviors which are expensive…

Realm observers are fairly lightweight and responsive. Your use case was not outlined but you don’t really need to worry about getting “changed objects” as they are lazily loaded and memory efficient.

Like this:

let realm = try! Realm() // Observe realm notifications. Keep a strong reference to the notification token // or the observation will stop. let token = realm.observe { notification, realm in // `notification` is an enum specifying what kind of notification was emitted viewController.updateUI() } // ... // Later, explicitly stop observing. token.invalidate()

as you can see, we don’t get any objects here; it just updates the UI when something within that Realm changed.

You can do something similar with a collection; just don’t process the results returned in the closure

self.notificationToken = results.observe { [weak self] (changes: //update ui }

If that doesn’t answer the question, perhaps a new thread indicating your use case (with brief code) would help.

Thanks for the explanation Jay

After profiling more I discovered that the bottleneck was check_outgoing_links

The solution was to remove all usage of relations from my schema, and just store IDs and traverse relations manually. This eliminated check_outgoing_links usage and fixed the terrible performance issues I had with collectionPublisher or anything else using DeepChangeChecker (including results.observe and changesetPublisher). (For reference I typically have hundreds of thousands / millions of records)

Sorry for the tangent in this thread