Наблюдаемые против текучих rxJava2


я смотрел на новый rx java 2, и я не совсем уверен, что понимаю идею

3 91

3 ответа:

то, что обратное давление проявляется на практике, является ограниченными буферами,Flowable.observeOn имеет буфер из 128 элементов, который сливается так быстро, как dowstream может его принять. Вы можете увеличить этот размер буфера по отдельности для обработки импульсного источника, и все методы управления обратным давлением по-прежнему применяются с 1.x.Observable.observeOn имеет неограниченный буфер, который продолжает собирать элементы, и ваше приложение может запустить из памяти.

вы можете использовать Observable например:

  • обработка GUI события
  • работа с короткими последовательностями (всего менее 1000 элементов)

вы можете использовать Flowable например:

  • холодные и несвоевременные источники
  • генератор как источник
  • доступ к сети и базе данных

обратное давление-это когда ваш наблюдаемый (издатель) создает больше событий, чем может обработать ваш подписчик. Таким образом, вы можете получить подписчиков отсутствующих событий, или вы можете получить огромную очередь событий, которые просто приводит к из памяти в конечном итоге. Flowable противодавления принимает во внимание. Observable нет. Вот и все.

это напоминает мне воронку, которая, когда она имеет слишком много жидкости переполняется. Flowable может помочь с тем, чтобы этого не произошло:

С огромное противодавление:

enter image description here

но с использованием flowable, гораздо меньше противодавления:

enter image description here

Rxjava2 имеет несколько стратегий противодавления, которые вы можете использовать в зависимости от вашего usecase. под стратегией я имею в виду Rxjava2 предоставляет способ обработки объектов, которые не могут быть обработаны из-за переполнения (противодавления).

вот стратегии. Я не пойду до конца их все, но, например, если вы хотите не беспокоиться о предметах, которые переполнены вы можете использовать стратегию падения, как это:

наблюдаема.toFlowable (BackpressureStrategy.DROP)

насколько я знаю, в очереди должно быть ограничение 128 элементов, после чего может возникнуть переполнение (противодавление). Даже если его не 128 его близко к этому числу. Надеюсь, это кому-то поможет.

Если вам нужно изменить размер буфера от 128, похоже, это можно сделать как это (но смотреть любые ограничения памяти:

myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.  

дело в том, что ваш Flowable разбился после испускания 128 значений без обработки противодавления не означает, что он всегда будет разбиваться после ровно 128 значений: иногда он будет разбиваться после 10, а иногда он вообще не будет разбиваться. Я считаю, что это то, что произошло, когда вы попробовали пример с Observable - не было никакого противодавления, так что ваш код работал нормально, в следующий раз это может не. Разница в RxJava 2 заключается в том, что нет понятия противодавления в Observables больше, и нет способа справиться с этим. Если вы разрабатываете реактивную последовательность, которая, вероятно, потребует явной обработки противодавления-тогда Flowable ваш самый лучший выбор.