Почему мой RxJava наблюдаемых не излучает и полной, если она блокирует?
Фон
У меня есть ряд наблюдаемых объектов RxJava (либо генерируемых из клиентов Джерси, либо заглушек с использованием Observable.just(someObject)
). Все они должны излучать ровно одно значение. У меня есть компонентный тест, который высмеивает всех клиентов Jersey и использует Observable.just(someObject)
, и я вижу там то же самое поведение, что и при запуске производственного кода.
У меня есть несколько классов, которые воздействуют на эти наблюдаемые объекты, выполняют некоторые вычисления (и некоторые побочные эффекты - я мог бы сделать их прямыми возвращаемыми значениями позже) и возвращайте пустые наблюдаемые пустоты.
В какой - то момент, в одном из таких классов, я пытаюсь застегнуть несколько моих исходных наблюдаемых объектов и затем сопоставить их-что-то вроде следующего:
public Observable<Void> doCalculation() {
return Observable.zip(
getObservable1(),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
}
// in Unifying Object
public Observable<Void> processToNewObservable() {
// ... do some calculation ...
return Observable.empty();
}
Вычислительные классы затем все объединяются и ждут:
// Wait for rule computations to complete
List<Observable<Void>> calculations = ...;
Observable.zip(calculations, results -> results)
.toBlocking().lastOrDefault(null);
Проблема
Беда в том, что processToNewObservable()
никогда не исполняется. По процессу исключения я вижу, что это getObservable1()
вот в чем беда - если я заменю его на Observable.just(null)
, все выполняется так, как я себе представляю (но с помощью нулевое значение, где я хочу реальное).
Повторяю, getObservable1()
возвращает наблюдаемое от клиента Джерси в производственном коде, но этот клиент является Mockito mock returning Observable.just(someValue)
в моем тесте.
Расследование
Если я преобразую getObservable1()
в блокировку, затем оберну первое значение в just()
, опять же, все выполняется так, как я себе представляю (но я не хочу вводить шаг блокировки):
Observable.zip(
Observable.just(getObservable1().toBlocking().first()),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
Моей первой мыслью было, что, возможно, что-то другое поглощает излучаемую ценность. из моего наблюдаемого, и zip
видел, что он уже был завершен, таким образом определяя, что результат их застегивания должен быть пустым наблюдаемым. Я попытался добавить .cache()
к каждому наблюдаемому источнику, который я могу считать связанным, но это не изменило поведение.
Я также пытался добавить обработчики next / error / complete / finally на getObservable1 (без преобразования его в blocking) непосредственно перед zip, но ни один из них не был выполнен:
getObservable1()
.doOnNext(...)
.doOnCompleted(...)
.doOnError(...)
.finallyDo(...);
Observable.zip(
getObservable1(),
getObservable2(),
getObservable3(),
UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
В вопрос
Я очень новичок в RxJava, поэтому я почти уверен, что упускаю что-то фундаментальное. Вопрос в том, какую глупость я мог совершить? Если это не очевидно из того, что я сказал до сих пор, что я могу сделать, чтобы помочь диагностировать проблему?3 ответа:
Наблюдаемое должно испускать, чтобы запустить цепочку. Вы должны думать о своем конвейере как о декларации того, что произойдет, когда наблюдаемое излучает.
Вы не делились тем, что на самом деле наблюдалось, но наблюдаемо.just () заставляет наблюдаемое немедленно испускать обернутый объект.
Основываясь на ответе в комментарии, либо один из
getObservable
не возвращает никакого значения, а просто завершает, либо Mockito mocking делает что-то неправильно. Следующий автономный пример работает для меня. Не могли бы вы проверить его и начать медленно мутировать, чтобы увидеть, где все ломается?Observable.zip( Observable.just(1), Observable.just(2), Observable.just(3), (a, b, c) -> new Integer[] { a, b, c }) .concatMap(a -> Observable.from(a)) .subscribe(System.out::println) ;
Примечание : я не нашел свой ответ здесь очень удовлетворительным, поэтому я копнул немного дальше и нашел гораздо меньший случай воспроизведения, поэтому я задал здесь новый вопрос: Почему мой RxJava Observable излучает только первому потребителю?
Я выяснил, по крайней мере, часть моих проблем (и, приношу извинения всем, кто пытался ответить, я не думаю, что у вас было много шансов, учитывая мое объяснение).Различные классы, которые выполняют эти вычисления, были все возвращая
Observable.empty()
(согласноprocessToNewObservable()
в моем исходном примере). Насколько я могу судить,Observable.zip()
не подписывается на N-е наблюдаемое, пока N-1-е наблюдаемое не испустит значение.Мой первоначальный пример утверждал, что это было
getObservable1()
, что было неправильным - это было на самом деле немного неточно, это было позже замечено в списке параметров. Как я понимаю, причина, по которой он блокирует, а затем снова превращает это значение в наблюдаемое, заключается в том, что он блокирует и вызывает сначала принудили его к исполнению, и я получил побочные эффекты, которые хотел.Если я изменю все мои вычислительные классы на return
Observable.just(null)
вместо этого, все работает: конечныйzip()
из всех наблюдаемых вычислительных классов работает через них всех, поэтому все ожидаемые побочные эффекты происходят.Возвращение null Void кажется, что я определенно делаю что-то неправильно с точки зрения дизайна, но, по крайней мере, на этот конкретный вопрос дан ответ.