Наследующей моего подписчика и функции onComplete не работать, когда я звоню наследующей в моем классе FlowableOnSubscribe
В проекте Android, использующем RxJava 2, я создаю Flowable
Вот так в onCreate
моей первоначальной деятельности:
Flowable.create(new MyFlowableOnSubscribe1(), BackpressureStrategy.BUFFER)
.doOnComplete(new MyAction())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MySubscriber());
Реализация FlowableOnSubscribe такова:
public class MyFlowableOnSubscribe1 implements FlowableOnSubscribe<String> {
public static final String TAG = "XX MyFlOnSub1";
@Override
public void subscribe(FlowableEmitter<String> emitter) {
Log.i(TAG, "subscribe");
emitter.onNext("hello");
emitter.onComplete();
}
}
Это реализация подписчика:
public class MySubscriber implements Subscriber<String> {
public static final String TAG = "XX MySubscriber";
@Override
public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: " + s);
}
}
А реализация действия такова:
public class MyAction implements Action {
public static final String TAG = "XX MyAction";
@Override
public void run() {
Log.i(TAG, "run");
}
}
В моем выводе я ожидаю лог-оператор от onNext
, но я его не вижу. Вместо этого, это весь мой вывод:
02-23 17:56:31.334 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
Это означает, что onNext
никогда не работает, и onComplete
тоже даже не бегает. Но MyAction
работает успешно.
Вот что происходит, когда я комментирую вызов onNext
:
02-23 17:58:31.572 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
02-23 17:58:31.652 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onComplete
В этом случае onNext
конечно не бежит, но по крайней мере onComplete
бежит.
Я ожидал, что увижу onComplete
запуск в обоих случаях, и onNext
запуск при вызове emitter.onNext
. Что я здесь делаю не так?
2 ответа:
Вам нужно вручную выдать запрос, иначе никакие данные не будут выдаваться при расширении
Subscriber
напрямую:@Override public void onSubscribe(Subscription s) { Log.i(TAG, "onSubscribe"); s.request(Long.MAX_VALUE); }
В качестве альтернативы можно расширить
DisposableSubscriber
илиResourceSubscriber
.
Как вы это проверяете? Возможно ли, что ваш основной поток завершает работу до того, как наблюдаемый получит возможность выдать результат, потому что вы используете
Schedulers.IO
для потока подписки. Кроме того, вашobserveOn
не будет ничего делать, так как он используется только для дальнейших операторов нисходящего потока.