Наследующей моего подписчика и функции onComplete не работать, когда я звоню наследующей в моем классе FlowableOnSubscribe


В проекте Android, использующем RxJava 2, я создаю Flowable Вот так в onCreate моей первоначальной деятельности:

Flowable.create(new MyFlowableOnSubscribe1(), BackpressureStrategy.BUFFER)
        .doOnComplete(new MyAction())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new MySubscriber());

Реализация FlowableOnSubscribe такова:

public class MyFlowableOnSubscribe1 implements FlowableOnSubscribe<String> {
    public static final String TAG = "XX MyFlOnSub1";

    @Override
    public void subscribe(FlowableEmitter<String> emitter) {
        Log.i(TAG, "subscribe");

        emitter.onNext("hello");
        emitter.onComplete();
    }
}

Это реализация подписчика:

public class MySubscriber implements Subscriber<String> {
    public static final String TAG = "XX MySubscriber";

    @Override
    public void onSubscribe(Subscription s) {
        Log.i(TAG, "onSubscribe");
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: " + s);
    }
}

А реализация действия такова:

public class MyAction implements Action {
    public static final String TAG = "XX MyAction";

    @Override
    public void run() {
        Log.i(TAG, "run");
    }
}

В моем выводе я ожидаю лог-оператор от onNext, но я его не вижу. Вместо этого, это весь мой вывод:

02-23 17:56:31.334 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run

Это означает, что onNext никогда не работает, и onComplete тоже даже не бегает. Но MyAction работает успешно.

Вот что происходит, когда я комментирую вызов onNext:

02-23 17:58:31.572 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
02-23 17:58:31.652 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onComplete

В этом случае onNext конечно не бежит, но по крайней мере onComplete бежит.

Я ожидал, что увижу onComplete запуск в обоих случаях, и onNext запуск при вызове emitter.onNext. Что я здесь делаю не так?

2 3

2 ответа:

Вам нужно вручную выдать запрос, иначе никакие данные не будут выдаваться при расширении Subscriber напрямую:

@Override
public void onSubscribe(Subscription s) {
    Log.i(TAG, "onSubscribe");
    s.request(Long.MAX_VALUE);
}

В качестве альтернативы можно расширить DisposableSubscriber или ResourceSubscriber.

Как вы это проверяете? Возможно ли, что ваш основной поток завершает работу до того, как наблюдаемый получит возможность выдать результат, потому что вы используете Schedulers.IO для потока подписки. Кроме того, ваш observeOn не будет ничего делать, так как он используется только для дальнейших операторов нисходящего потока.