Почему мой RxJava наблюдаемых не излучает и полной, если она блокирует?


Фон

У меня есть ряд наблюдаемых объектов RxJava (либо генерируемых из клиентов Джерси, либо заглушек с использованием Observable.just(someObject)). Все они должны излучать ровно одно значение. У меня есть компонентный тест, который высмеивает всех клиентов Jersey и использует Observable.just(someObject), и я вижу там то же самое поведение, что и при запуске производственного кода.

У меня есть несколько классов, которые воздействуют на эти наблюдаемые объекты, выполняют некоторые вычисления (и некоторые побочные эффекты - я мог бы сделать их прямыми возвращаемыми значениями позже) и возвращайте пустые наблюдаемые пустоты.

В какой - то момент, в одном из таких классов, я пытаюсь застегнуть несколько моих исходных наблюдаемых объектов и затем сопоставить их-что-то вроде следующего:

public Observable<Void> doCalculation() {
    return Observable.zip(
        getObservable1(),
        getObservable2(),
        getObservable3(),
        UnifyingObject::new
    ).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
}

// in Unifying Object
public Observable<Void> processToNewObservable() {
    // ... do some calculation ...
    return Observable.empty();
}

Вычислительные классы затем все объединяются и ждут:

// Wait for rule computations to complete
List<Observable<Void>> calculations = ...;
Observable.zip(calculations, results -> results)
        .toBlocking().lastOrDefault(null);

Проблема

Беда в том, что processToNewObservable() никогда не исполняется. По процессу исключения я вижу, что это getObservable1() вот в чем беда - если я заменю его на Observable.just(null), все выполняется так, как я себе представляю (но с помощью нулевое значение, где я хочу реальное).

Повторяю, getObservable1() возвращает наблюдаемое от клиента Джерси в производственном коде, но этот клиент является Mockito mock returning Observable.just(someValue) в моем тесте.

Расследование

Если я преобразую getObservable1() в блокировку, затем оберну первое значение в just(), опять же, все выполняется так, как я себе представляю (но я не хочу вводить шаг блокировки):

Observable.zip(
    Observable.just(getObservable1().toBlocking().first()),
    getObservable2(),
    getObservable3(),
    UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())

Моей первой мыслью было, что, возможно, что-то другое поглощает излучаемую ценность. из моего наблюдаемого, и zip видел, что он уже был завершен, таким образом определяя, что результат их застегивания должен быть пустым наблюдаемым. Я попытался добавить .cache() к каждому наблюдаемому источнику, который я могу считать связанным, но это не изменило поведение.

Я также пытался добавить обработчики next / error / complete / finally на getObservable1 (без преобразования его в blocking) непосредственно перед zip, но ни один из них не был выполнен:

getObservable1()
    .doOnNext(...)
    .doOnCompleted(...)
    .doOnError(...)
    .finallyDo(...);

Observable.zip(
    getObservable1(),
    getObservable2(),
    getObservable3(),
    UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())

В вопрос

Я очень новичок в RxJava, поэтому я почти уверен, что упускаю что-то фундаментальное. Вопрос в том, какую глупость я мог совершить? Если это не очевидно из того, что я сказал до сих пор, что я могу сделать, чтобы помочь диагностировать проблему?
3 8

3 ответа:

Наблюдаемое должно испускать, чтобы запустить цепочку. Вы должны думать о своем конвейере как о декларации того, что произойдет, когда наблюдаемое излучает.

Вы не делились тем, что на самом деле наблюдалось, но наблюдаемо.just () заставляет наблюдаемое немедленно испускать обернутый объект.

Основываясь на ответе в комментарии, либо один из getObservable не возвращает никакого значения, а просто завершает, либо Mockito mocking делает что-то неправильно. Следующий автономный пример работает для меня. Не могли бы вы проверить его и начать медленно мутировать, чтобы увидеть, где все ломается?

Observable.zip(
        Observable.just(1),
        Observable.just(2),
        Observable.just(3),
        (a, b, c) -> new Integer[] { a, b, c })
 .concatMap(a -> Observable.from(a))
 .subscribe(System.out::println)
 ;

Примечание : я не нашел свой ответ здесь очень удовлетворительным, поэтому я копнул немного дальше и нашел гораздо меньший случай воспроизведения, поэтому я задал здесь новый вопрос: Почему мой RxJava Observable излучает только первому потребителю?


Я выяснил, по крайней мере, часть моих проблем (и, приношу извинения всем, кто пытался ответить, я не думаю, что у вас было много шансов, учитывая мое объяснение).

Различные классы, которые выполняют эти вычисления, были все возвращая Observable.empty() (согласно processToNewObservable() в моем исходном примере). Насколько я могу судить, Observable.zip() не подписывается на N-е наблюдаемое, пока N-1-е наблюдаемое не испустит значение.

Мой первоначальный пример утверждал, что это было getObservable1(), что было неправильным - это было на самом деле немного неточно, это было позже замечено в списке параметров. Как я понимаю, причина, по которой он блокирует, а затем снова превращает это значение в наблюдаемое, заключается в том, что он блокирует и вызывает сначала принудили его к исполнению, и я получил побочные эффекты, которые хотел.

Если я изменю все мои вычислительные классы на return Observable.just(null) вместо этого, все работает: конечный zip() из всех наблюдаемых вычислительных классов работает через них всех, поэтому все ожидаемые побочные эффекты происходят.

Возвращение null Void кажется, что я определенно делаю что-то неправильно с точки зрения дизайна, но, по крайней мере, на этот конкретный вопрос дан ответ.