Многопоточность RxJava с доступом Realm-Realm из некорректного потока
Фон
Я использую Realm в своем приложении. Когда данные загружаются, они затем подвергаются интенсивной обработке, поэтому обработка происходит в фоновом потоке.
Используемый шаблон кодирования является единицей рабочего шаблона, и область существует только в репозитории под DataManager. Идея здесь заключается в том, что каждый репозиторий может иметь различное решение для хранения баз данных/файлов.Что я пробовал
Ниже приведен пример некоторого аналогичного кода для то, что у меня есть в классе Фореспозиции.
Идея здесь заключается в том, что получается экземпляр области, используемый для запроса области на предмет объектов интереса, возврата их и закрытия экземпляра области. Обратите внимание, что это синхронно и в конце копирует объекты из области в неуправляемое состояние.public Observable<List<Foo>> getFoosById(List<String> fooIds) {
Realm realm = Realm.getInstance(fooRealmConfiguration);
RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);
for(String id : fooIds) {
findFoosByIdQuery.equalTo(Foo.FOO_ID_FIELD_NAME, id);
findFoosByIdQuery.or();
}
return findFoosByIdQuery
.findAll()
.asObservable()
.doOnUnsubscribe(realm::close)
.filter(RealmResults::isLoaded)
.flatMap(foos -> Observable.just(new ArrayList<>(realm.copyFromRealm(foos))));
}
Этот код позже используется в сочетании с кодом тяжелой обработки через RxJava:
dataManager.getFoosById(foo)
.flatMap(this::processtheFoosInALongRunningProcess)
.subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
.subscribe(tileChannelSubscriber);
После прочтения документов, я считаю, что вышеизложенное должно работать, так как это не так асинхронный и поэтому не нуждается в циклическом потоке. Я получаю экземпляр realm в том же потоке, поэтому он не передается между потоками, как и объекты.
Задача
Когда вышеописанное выполняется, я получаю
Доступ к области из неправильного потока. Объекты области могут быть доступны только на нити они были созданы.
Это кажется неправильным. Единственное, о чем я могу думать, - это о бассейне царства. instances-это получение существующего экземпляра, созданного из другого процесса с использованием основного потока.
2 ответа:
Кей со
return findFoosByIdQuery .findAll() .asObservable()
Это происходит в потоке пользовательского интерфейса, потому что именно оттуда вы вызываете его изначально
.subscribeOn(Schedulers.io())
Аааа, а потом ты возишься с ними на
Schedulers.io()
.Нет, это не та же самая нить!
Как бы мне не нравился подход копирования из базы данных нулевого копирования , ваш текущий подход пронизан проблемами из-за неправильного использования
realmResults.asObservable()
, поэтому вот спойлер для того, каким должен быть ваш код:public Observable<List<Foo>> getFoosById(List<String> fooIds) { return Observable.defer(() -> { try(Realm realm = Realm.getInstance(fooRealmConfiguration)) { //try-finally also works RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class); for(String id : fooIds) { findFoosByIdQuery.equalTo(FooFields.ID, id); findFoosByIdQuery.or(); // please guarantee this works? } RealmResults<Foo> results = findFoosByIdQuery.findAll(); return Observable.just(realm.copyFromRealm(results)); } }).subscribeOn(Schedulers.io()); }
Обратите внимание, что вы создаете экземпляр вне всего вашего конвейера обработки RxJava. Таким образом, в главном потоке (или в любом другом потоке, на котором вы находитесь, при вызове
getFoosById()
.Только потому, что метод возвращает Observable, не означает, что он выполняется в другом потоке. Только конвейер обработки наблюдаемого, созданный последним оператором вашего метода
getFoosById()
, работает на правильном потоке (filter()
,flatMap()
и вся обработка, выполняемая вызывающим).Таким образом, Вы должны убедитесь, что вызов
Один из способов достичь этого-использоватьgetFoosById()
уже выполнен в потоке, используемомSchedulers.io()
.Observable.defer()
:Observable.defer(() -> dataManager.getFoosById(foo)) .flatMap(this::processtheFoosInALongRunningProcess) .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc .subscribe(tileChannelSubscriber);