Многопоточность RxJava с доступом Realm-Realm из некорректного потока


Фон

Я использую Realm в своем приложении. Когда данные загружаются, они затем подвергаются интенсивной обработке, поэтому обработка происходит в фоновом потоке.

Используемый шаблон кодирования является единицей рабочего шаблона, и область существует только в репозитории под DataManager. Идея здесь заключается в том, что каждый репозиторий может иметь различное решение для хранения баз данных/файлов.

Что я пробовал

Ниже приведен пример некоторого аналогичного кода для то, что у меня есть в классе Фореспозиции.

Идея здесь заключается в том, что получается экземпляр области, используемый для запроса области на предмет объектов интереса, возврата их и закрытия экземпляра области. Обратите внимание, что это синхронно и в конце копирует объекты из области в неуправляемое состояние.
public Observable<List<Foo>> getFoosById(List<String> fooIds) {

    Realm realm = Realm.getInstance(fooRealmConfiguration);

    RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);

    for(String id : fooIds) {

        findFoosByIdQuery.equalTo(Foo.FOO_ID_FIELD_NAME, id);
        findFoosByIdQuery.or();
    }

    return findFoosByIdQuery
            .findAll()
            .asObservable()
            .doOnUnsubscribe(realm::close)
            .filter(RealmResults::isLoaded)
            .flatMap(foos -> Observable.just(new ArrayList<>(realm.copyFromRealm(foos))));
}

Этот код позже используется в сочетании с кодом тяжелой обработки через RxJava:

dataManager.getFoosById(foo)
            .flatMap(this::processtheFoosInALongRunningProcess)
            .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
            .subscribe(tileChannelSubscriber);

После прочтения документов, я считаю, что вышеизложенное должно работать, так как это не так асинхронный и поэтому не нуждается в циклическом потоке. Я получаю экземпляр realm в том же потоке, поэтому он не передается между потоками, как и объекты.

Задача

Когда вышеописанное выполняется, я получаю

Доступ к области из неправильного потока. Объекты области могут быть доступны только на нити они были созданы.

Это кажется неправильным. Единственное, о чем я могу думать, - это о бассейне царства. instances-это получение существующего экземпляра, созданного из другого процесса с использованием основного потока.

2 7

2 ответа:

Кей со

return findFoosByIdQuery
        .findAll()
        .asObservable()

Это происходит в потоке пользовательского интерфейса, потому что именно оттуда вы вызываете его изначально

.subscribeOn(Schedulers.io())

Аааа, а потом ты возишься с ними на Schedulers.io().

Нет, это не та же самая нить!

Как бы мне не нравился подход копирования из базы данных нулевого копирования , ваш текущий подход пронизан проблемами из-за неправильного использования realmResults.asObservable(), поэтому вот спойлер для того, каким должен быть ваш код:

public Observable<List<Foo>> getFoosById(List<String> fooIds) {
   return Observable.defer(() -> {
       try(Realm realm = Realm.getInstance(fooRealmConfiguration)) { //try-finally also works
           RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);
           for(String id : fooIds) {
               findFoosByIdQuery.equalTo(FooFields.ID, id);
               findFoosByIdQuery.or(); // please guarantee this works?
           }
           RealmResults<Foo> results = findFoosByIdQuery.findAll();
           return Observable.just(realm.copyFromRealm(results));
       }
   }).subscribeOn(Schedulers.io());
}

Обратите внимание, что вы создаете экземпляр вне всего вашего конвейера обработки RxJava. Таким образом, в главном потоке (или в любом другом потоке, на котором вы находитесь, при вызове getFoosById().

Только потому, что метод возвращает Observable, не означает, что он выполняется в другом потоке. Только конвейер обработки наблюдаемого, созданный последним оператором вашего метода getFoosById(), работает на правильном потоке (filter(), flatMap() и вся обработка, выполняемая вызывающим).

Таким образом, Вы должны убедитесь, что вызов getFoosById()уже выполнен в потоке, используемом Schedulers.io().

Один из способов достичь этого-использовать Observable.defer():
Observable.defer(() -> dataManager.getFoosById(foo))
            .flatMap(this::processtheFoosInALongRunningProcess)
            .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
            .subscribe(tileChannelSubscriber);