Многопоточность RxJava с доступом Realm-Realm из некорректного потока
Фон
Я использую Realm в своем приложении. Когда данные загружаются, они затем подвергаются интенсивной обработке, поэтому обработка происходит в фоновом потоке.
Используемый шаблон кодирования является единицей рабочего шаблона, и область существует только в репозитории под DataManager. Идея здесь заключается в том, что каждый репозиторий может иметь различное решение для хранения баз данных/файлов.Что я пробовал
Ниже приведен пример некоторого аналогичного кода для то, что у меня есть в классе Фореспозиции.
Идея здесь заключается в том, что получается экземпляр области, используемый для запроса области на предмет объектов интереса, возврата их и закрытия экземпляра области. Обратите внимание, что это синхронно и в конце копирует объекты из области в неуправляемое состояние.public Observable<List<Foo>> getFoosById(List<String> fooIds) {
Realm realm = Realm.getInstance(fooRealmConfiguration);
RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);
for(String id : fooIds) {
findFoosByIdQuery.equalTo(Foo.FOO_ID_FIELD_NAME, id);
findFoosByIdQuery.or();
}
return findFoosByIdQuery
.findAll()
.asObservable()
.doOnUnsubscribe(realm::close)
.filter(RealmResults::isLoaded)
.flatMap(foos -> Observable.just(new ArrayList<>(realm.copyFromRealm(foos))));
}
Этот код позже используется в сочетании с кодом тяжелой обработки через RxJava:
dataManager.getFoosById(foo)
.flatMap(this::processtheFoosInALongRunningProcess)
.subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
.subscribe(tileChannelSubscriber);
После прочтения документов, я считаю, что вышеизложенное должно работать, так как это не так асинхронный и поэтому не нуждается в циклическом потоке. Я получаю экземпляр realm в том же потоке, поэтому он не передается между потоками, как и объекты.
Задача
Когда вышеописанное выполняется, я получаю
Доступ к области из неправильного потока. Объекты области могут быть доступны только на нити они были созданы.
Это кажется неправильным. Единственное, о чем я могу думать, - это о бассейне царства. instances-это получение существующего экземпляра, созданного из другого процесса с использованием основного потока.
2 ответа:
Кей со
return findFoosByIdQuery .findAll() .asObservable()Это происходит в потоке пользовательского интерфейса, потому что именно оттуда вы вызываете его изначально
.subscribeOn(Schedulers.io())Аааа, а потом ты возишься с ними на
Schedulers.io().Нет, это не та же самая нить!
Как бы мне не нравился подход копирования из базы данных нулевого копирования , ваш текущий подход пронизан проблемами из-за неправильного использования
realmResults.asObservable(), поэтому вот спойлер для того, каким должен быть ваш код:public Observable<List<Foo>> getFoosById(List<String> fooIds) { return Observable.defer(() -> { try(Realm realm = Realm.getInstance(fooRealmConfiguration)) { //try-finally also works RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class); for(String id : fooIds) { findFoosByIdQuery.equalTo(FooFields.ID, id); findFoosByIdQuery.or(); // please guarantee this works? } RealmResults<Foo> results = findFoosByIdQuery.findAll(); return Observable.just(realm.copyFromRealm(results)); } }).subscribeOn(Schedulers.io()); }
Обратите внимание, что вы создаете экземпляр вне всего вашего конвейера обработки RxJava. Таким образом, в главном потоке (или в любом другом потоке, на котором вы находитесь, при вызове
getFoosById().Только потому, что метод возвращает Observable, не означает, что он выполняется в другом потоке. Только конвейер обработки наблюдаемого, созданный последним оператором вашего метода
getFoosById(), работает на правильном потоке (filter(),flatMap()и вся обработка, выполняемая вызывающим).Таким образом, Вы должны убедитесь, что вызов
Один из способов достичь этого-использоватьgetFoosById()уже выполнен в потоке, используемомSchedulers.io().Observable.defer():Observable.defer(() -> dataManager.getFoosById(foo)) .flatMap(this::processtheFoosInALongRunningProcess) .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc .subscribe(tileChannelSubscriber);