Читатель RXJava2 наблюдаемый с одновременными подписчиками
Я пытаюсь реализовать RXJava2 Observable<String>
от BufferedReader
. Пока все хорошо:
public class Launcher {
public static void main(String[] args) throws Exception {
Process process = Runtime.getRuntime().exec("ls /tmp");
BufferedReader reader = new BufferedReader (new InputStreamReader(process.getInputStream()));
Observable source = Observable.create(emitter -> {
String line = "";
while (line != null) {
line = reader.readLine();
if (line == null)
break;
System.out.println("Engine " + Thread.currentThread().getName() + " - " + line); //process line
emitter.onNext(line);
}
emitter.onComplete();
}).subscribeOn(Schedulers.newThread());
source.subscribe(line -> System.out.println("UI 1 " + Thread.currentThread().getName() + " - " + line));
source.subscribe(line -> System.out.println("UI 2 " + Thread.currentThread().getName() + " - " + line));
TimeUnit.SECONDS.sleep(10);
}
}
onSubscribe
заставляет подписчиков получать уведомления параллельно. Что, если я не ошибаюсь, означает, что лямбда в create()
будет выполняться параллельно для каждого потребителя.
readLine()
, который получает строку Thread-2 не получит, только следующую.
Все это имеет смысл, тем не менее, Должно быть, я что-то упускаю, потому что не могу понять, как это сделать:
- читать строки в одном потоке
- уведомлять всех абонентов одновременно - так каждый получает все линии
Я заглянул в Subject
s, попытался связать Observable
s, но все еще не мог понять этого.
Edit: я обновил пример до полного управляемого класса. Насколько я понимаю, проблема заключается в том, что горячие и холодные наблюдаемые. Как будто в документах сказано, что Observable.create(...)
должен создать холодный, тогда как мой код явно ведет себя как горячая штучка.
Следующий вопрос: если я добавлю параметр type, делающий его Observable<String>
, то вызов onSubscribe
нарушает код, и он не будет компилироваться, так как это вернет Observable<Object>
. Почему? Вызов onSubscribe
по промежуточному параметру странно работает:
Observable<String> source = Observable.create(emitter -> {...});
Observable<String> source2 = source.subscribeOn(Schedulers.newThread())
1 ответ:
Использовать
publish
:ConnectableObservable<String> o = Observable.create(emitter -> { try (BufferedReader reader = ...) { while (!emitter.isDisposed()) { String line = reader.readLine(); if (line == null || line.equals("end")) { emitter.onComplete(); return; } emitter.onNext(line); } } }).subscribeOn(Schedulers.io()) .publish(); o.subscribe(/* consumer 1 */); o.subscribe(/* consumer 2 */); o.connect();