RxJava: observeOn, subscribeOn, и doFinally, переключение между Ио и потока пользовательского интерфейса
Я сталкиваюсь с проблемой, когда мой observable подписывается на поток ввода-вывода и наблюдается в главном потоке android (UI), но оператор doFinally
выполняется в потоке ввода-вывода, и он должен быть запущен в потоке пользовательского интерфейса.
Usecase почти точно такой же, как этотсредний артикль .
Я, по существу, хочу показатьProgressBar
, когда наблюдаемое подписано, и скрыть ProgressBar
, когда наблюдаемое прекращено или закончено.
Ошибка, которую я получаю, такова: java.яз..IllegalStateException: текущий поток должен иметь петлитель!
Может ли кто-нибудь помочь мне переместить действие doFinally
обратно в поток пользовательского интерфейса, который имеет петлитель? Или я упустил какую-то другую информацию?
EDIT Рабочий процесс usecase:
- > Запуск Активности
- > инициализировать
- > выполнить наблюдаемый поток
- > начать новую деятельность и закончить текущую деятельность
- > новая деятельность
- > Начало оригинала деятельность и завершение
- > повторить инициализацию
Большое вам спасибо.
Подробности:
- RxJava 2.0.7
- RxAndroid 2.0.1
- Android sdk min 14 и target 25
Пример Кода
listUseCase.execute(null)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
getView().showLoading(true);
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
getView().showLoading(false);
}
})
.subscribeOn(schedulerProvider.io())
.observeOn(schedulerProvider.main())
.subscribe(
new Consumer<List<AccountEntity>>() {
@Override
public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
getView().setAccounts(accountEntities);
}
},
new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
if (isViewAttached()) {
getView().showError(throwable.getMessage());
}
}
}
);
Трассировка Стека:
Фатальное исключение: RxCachedThreadScheduler-1 Процесс: com.образец.андроид.демонстрация.клиент первый.Альфа, PID: 16685 Ява.яз..IllegalStateException: текущий поток должен иметь петлитель! на Андроиде.вид.Хореограф$1.начальное значение (хореограф.java: 96) около андроид.вид.Хореограф$1.начальное значение (хореограф.java: 91) на Яве.яз..ThreadLocal $ Values.getAfterMiss (ThreadLocal.java: 430) на Яве.яз..Нитевидный.get (ThreadLocal.Ява: 65) около андроид.вид.Хореограф.getInstance (хореограф.java: 192) на Андроиде.анимация.ValueAnimator$AnimationHandler.(Реаниматор ценностей.java: 600) на Андроиде.анимация.ValueAnimator$AnimationHandler.(Реаниматор ценностей.java: 575) на Андроиде.анимация.Реаниматор ценностей.getOrCreateAnimationHandler (ValueAnimator.Ява: 1366) на Андроиде.анимация.Реаниматор ценностей.конец (ValueAnimator.java: 998) на Андроиде.графика.вытягиваемый.AnimatedVectorDrawable.стоп (AnimatedVectorDrawable.java: 439) на Андроиде.штучка.Прогрессбар.stopAnimation (ProgressBar.Ява: 1523) на Андроиде.штучка.Прогрессбар.onVisibilityChanged (ProgressBar.Ява: 1583) на Андроиде.вид.Смотреть.dispatchVisibilityChanged (вид.java: 8643) на Андроиде.вид.Смотреть.setFlags (вид.java: 9686) на Андроиде.вид.Смотреть.setVisibility (вид.java: 6663) на Андроиде.штучка.Прогрессбар.setVisibility (ProgressBar.Ява: 1563) на ком.образец.андроид.демонстрация.клиент первый.Особенности магазина.список.Продуктовая активность.showLoading (ProductListActivity.java: 121) на ком.образец.андроид.демонстрация.клиент первый.Особенности магазина.список.ProductListPresenterMediator$3.run (ProductListPresenterMediator.Ява: 56) на Ио.реактивекс.внутренний.операторы.заметный.ObservableDoFinally$DoFinallyObserver.runFinally(ObservableDoFinally.java: 144) на Ио.реактивекс.внутренний.операторы.заметный.ObservableDoFinally$DoFinallyObserver.onComplete (наблюдаемо).java: 94) около ио.реактивекс.внутренний.наблюдатели.DisposableLambdaObserver.onComplete (DisposableLambdaObserver.Ява: 73) на Ио.реактивекс.внутренний.наблюдатели.DeferredScalarDisposable.полный (DeferredScalarDisposable.Ява: 84) около ио.реактивекс.внутренний.операторы.заметный.ObservableFromCallable.subscribeActual (ObservableFromCallable.Ява: 52) на Ио.реактивекс.Заметный.подписаться(наблюдаемо.java: 10700) на Ио.реактивекс.внутренний.операторы.заметный.Наблюдаемый лунный цикл.subscribeActual (ObservableDoOnLifecycle.Ява: 33) на Ио.реактивекс.Заметный.подписаться(наблюдаемо.java: 10700) на Ио.реактивекс.внутренний.операторы.заметный.Наблюдаемо, конечно.subscribeActual (ObservableDoFinally.java: 45) около ио.реактивекс.Заметный.подписаться(наблюдаемо.java: 10700) на Ио.реактивекс.внутренний.операторы.заметный.ObservableSubscribeOn$1.run (ObservableSubscribeOn.Ява: 39) на Ио.реактивекс.Планировщик$1.выполнить (планировщик.java: 138) на Ио.реактивекс.внутренний.планировщики.ScheduledRunnable.run (ScheduledRunnable.Ява: 59) на Ио.реактивекс.внутренний.планировщики.ScheduledRunnable.вызов (ScheduledRunnable.Ява: 51) на Яве.утиль.параллельный.FutureTask.run (FutureTask.java: 237) на Яве.утиль.параллельный.ScheduledThreadPoolExecutor$ScheduledFutureTask.доступ$201 (ScheduledThreadPoolExecutor.java: 152) на Яве.утиль.параллельный.ScheduledThreadPoolExecutor$ScheduledFutureTask.выполнить (ScheduledThreadPoolExecutor.java: 265) на Яве.утиль.параллельный.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java: 1112) на Яве.утиль.параллельный.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java: 587) на Яве.яз..Нить.выполнения(резьба.java: 818)
2 ответа:
Все, что вам нужно сделать, это переместить
observeOn
вверх по цепочке. МетодobserveOn
изменяет поток, которыйonNext
,onError
, иonCompleted
вызываются на который внутренне, как работают операции и побочные эффекты (через лифт)listUseCase.execute(null) .subscribeOn(schedulerProvider.io()) // Move subscribe on here .observeOn(schedulerProvider.main()) // Change threads here .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { getView().showLoading(true); // This should be on the main thread also } }) .doFinally(new Action() { @Override public void run() throws Exception { getView().showLoading(false); } }) .subscribe( new Consumer<List<AccountEntity>>() { @Override public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception { getView().setAccounts(accountEntities); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { if (isViewAttached()) { getView().showError(throwable.getMessage()); } } } );
Проблема возникла из-за того, что я не избавлялся от подписки, когда действие было завершено/уничтожено.
Теперь каждое действие/представление сообщает докладчику, когда они остановлены или уничтожены, и докладчик избавляется от подписки.
Похоже, это решило мою проблему.
@Override public void initialize() { if (!isViewAttached()) { throw new ViewNotAttachedException(); } disposable = listUseCase.execute(null) .subscribeOn(schedulerProvider.io()) // Move subscribe on here .observeOn(schedulerProvider.main()) // Change threads here .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { getView().showLoading(true); // This should be on the main thread also } }) .doFinally(new Action() { @Override public void run() throws Exception { getView().showLoading(false); } }) .subscribe( new Consumer<List<AccountEntity>>() { @Override public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception { getView().setAccounts(accountEntities); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { if (isViewAttached()) { getView().showError(throwable.getMessage()); } } } ); } @Override public void dispose() { if (disposable != null) { disposable.dispose(); } }