RxJava: observeOn, subscribeOn, и doFinally, переключение между Ио и потока пользовательского интерфейса


Я сталкиваюсь с проблемой, когда мой observable подписывается на поток ввода-вывода и наблюдается в главном потоке android (UI), но оператор doFinally выполняется в потоке ввода-вывода, и он должен быть запущен в потоке пользовательского интерфейса.

Usecase почти точно такой же, как этотсредний артикль .

Я, по существу, хочу показать ProgressBar, когда наблюдаемое подписано, и скрыть ProgressBar, когда наблюдаемое прекращено или закончено.

Ошибка, которую я получаю, такова: java.яз..IllegalStateException: текущий поток должен иметь петлитель!

Может ли кто-нибудь помочь мне переместить действие doFinally обратно в поток пользовательского интерфейса, который имеет петлитель? Или я упустил какую-то другую информацию?

EDIT Рабочий процесс usecase:

- > Запуск Активности

- > инициализировать

- > выполнить наблюдаемый поток

- > начать новую деятельность и закончить текущую деятельность

- > новая деятельность

- > Начало оригинала деятельность и завершение

- > повторить инициализацию

Большое вам спасибо.

Подробности:

  • RxJava 2.0.7
  • RxAndroid 2.0.1
  • Android sdk min 14 и target 25

Пример Кода

listUseCase.execute(null)
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull Disposable disposable) throws Exception {
                    getView().showLoading(true);
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    getView().showLoading(false);
                }
            })
            .subscribeOn(schedulerProvider.io())
            .observeOn(schedulerProvider.main())
            .subscribe(
                    new Consumer<List<AccountEntity>>() {
                        @Override
                        public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                            getView().setAccounts(accountEntities);
                        }
                    },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            if (isViewAttached()) {
                                getView().showError(throwable.getMessage());
                            }
                        }
                    }
            );

Трассировка Стека:

Фатальное исключение: RxCachedThreadScheduler-1 Процесс: com.образец.андроид.демонстрация.клиент первый.Альфа, PID: 16685 Ява.яз..IllegalStateException: текущий поток должен иметь петлитель! на Андроиде.вид.Хореограф$1.начальное значение (хореограф.java: 96) около андроид.вид.Хореограф$1.начальное значение (хореограф.java: 91) на Яве.яз..ThreadLocal $ Values.getAfterMiss (ThreadLocal.java: 430) на Яве.яз..Нитевидный.get (ThreadLocal.Ява: 65) около андроид.вид.Хореограф.getInstance (хореограф.java: 192) на Андроиде.анимация.ValueAnimator$AnimationHandler.(Реаниматор ценностей.java: 600) на Андроиде.анимация.ValueAnimator$AnimationHandler.(Реаниматор ценностей.java: 575) на Андроиде.анимация.Реаниматор ценностей.getOrCreateAnimationHandler (ValueAnimator.Ява: 1366) на Андроиде.анимация.Реаниматор ценностей.конец (ValueAnimator.java: 998) на Андроиде.графика.вытягиваемый.AnimatedVectorDrawable.стоп (AnimatedVectorDrawable.java: 439) на Андроиде.штучка.Прогрессбар.stopAnimation (ProgressBar.Ява: 1523) на Андроиде.штучка.Прогрессбар.onVisibilityChanged (ProgressBar.Ява: 1583) на Андроиде.вид.Смотреть.dispatchVisibilityChanged (вид.java: 8643) на Андроиде.вид.Смотреть.setFlags (вид.java: 9686) на Андроиде.вид.Смотреть.setVisibility (вид.java: 6663) на Андроиде.штучка.Прогрессбар.setVisibility (ProgressBar.Ява: 1563) на ком.образец.андроид.демонстрация.клиент первый.Особенности магазина.список.Продуктовая активность.showLoading (ProductListActivity.java: 121) на ком.образец.андроид.демонстрация.клиент первый.Особенности магазина.список.ProductListPresenterMediator$3.run (ProductListPresenterMediator.Ява: 56) на Ио.реактивекс.внутренний.операторы.заметный.ObservableDoFinally$DoFinallyObserver.runFinally(ObservableDoFinally.java: 144) на Ио.реактивекс.внутренний.операторы.заметный.ObservableDoFinally$DoFinallyObserver.onComplete (наблюдаемо).java: 94) около ио.реактивекс.внутренний.наблюдатели.DisposableLambdaObserver.onComplete (DisposableLambdaObserver.Ява: 73) на Ио.реактивекс.внутренний.наблюдатели.DeferredScalarDisposable.полный (DeferredScalarDisposable.Ява: 84) около ио.реактивекс.внутренний.операторы.заметный.ObservableFromCallable.subscribeActual (ObservableFromCallable.Ява: 52) на Ио.реактивекс.Заметный.подписаться(наблюдаемо.java: 10700) на Ио.реактивекс.внутренний.операторы.заметный.Наблюдаемый лунный цикл.subscribeActual (ObservableDoOnLifecycle.Ява: 33) на Ио.реактивекс.Заметный.подписаться(наблюдаемо.java: 10700) на Ио.реактивекс.внутренний.операторы.заметный.Наблюдаемо, конечно.subscribeActual (ObservableDoFinally.java: 45) около ио.реактивекс.Заметный.подписаться(наблюдаемо.java: 10700) на Ио.реактивекс.внутренний.операторы.заметный.ObservableSubscribeOn$1.run (ObservableSubscribeOn.Ява: 39) на Ио.реактивекс.Планировщик$1.выполнить (планировщик.java: 138) на Ио.реактивекс.внутренний.планировщики.ScheduledRunnable.run (ScheduledRunnable.Ява: 59) на Ио.реактивекс.внутренний.планировщики.ScheduledRunnable.вызов (ScheduledRunnable.Ява: 51) на Яве.утиль.параллельный.FutureTask.run (FutureTask.java: 237) на Яве.утиль.параллельный.ScheduledThreadPoolExecutor$ScheduledFutureTask.доступ$201 (ScheduledThreadPoolExecutor.java: 152) на Яве.утиль.параллельный.ScheduledThreadPoolExecutor$ScheduledFutureTask.выполнить (ScheduledThreadPoolExecutor.java: 265) на Яве.утиль.параллельный.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java: 1112) на Яве.утиль.параллельный.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java: 587) на Яве.яз..Нить.выполнения(резьба.java: 818)

2 4

2 ответа:

Все, что вам нужно сделать, это переместить observeOn вверх по цепочке. Метод observeOn изменяет поток, который onNext, onError, и onCompleted вызываются на который внутренне, как работают операции и побочные эффекты (через лифт)

listUseCase.execute(null)
            .subscribeOn(schedulerProvider.io()) // Move subscribe on here
            .observeOn(schedulerProvider.main()) // Change threads here
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull Disposable disposable) throws Exception {
                    getView().showLoading(true); // This should be on the main thread also
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    getView().showLoading(false);
                }
            })

            .subscribe(
                    new Consumer<List<AccountEntity>>() {
                        @Override
                        public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                            getView().setAccounts(accountEntities);
                        }
                    },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            if (isViewAttached()) {
                                getView().showError(throwable.getMessage());
                            }
                        }
                    }
            );

Проблема возникла из-за того, что я не избавлялся от подписки, когда действие было завершено/уничтожено.

Теперь каждое действие/представление сообщает докладчику, когда они остановлены или уничтожены, и докладчик избавляется от подписки.

Похоже, это решило мою проблему.

 @Override
public void initialize() {
    if (!isViewAttached()) {
        throw new ViewNotAttachedException();
    }
    disposable = listUseCase.execute(null)
            .subscribeOn(schedulerProvider.io()) // Move subscribe on here
            .observeOn(schedulerProvider.main()) // Change threads here
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull Disposable disposable) throws Exception {
                    getView().showLoading(true); // This should be on the main thread also
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    getView().showLoading(false);
                }
            })
            .subscribe(
                    new Consumer<List<AccountEntity>>() {
                        @Override
                        public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                            getView().setAccounts(accountEntities);
                        }
                    },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            if (isViewAttached()) {
                                getView().showError(throwable.getMessage());
                            }
                        }
                    }
            );
}

@Override
public void dispose() {
    if (disposable != null) {
        disposable.dispose();
    }
}