Читатель RXJava2 наблюдаемый с одновременными подписчиками


Я пытаюсь реализовать RXJava2 Observable<String> от BufferedReader. Пока все хорошо:

public class Launcher {
    public static void main(String[] args) throws Exception {

        Process process = Runtime.getRuntime().exec("ls /tmp");
        BufferedReader reader = new BufferedReader (new InputStreamReader(process.getInputStream()));

        Observable source = Observable.create(emitter -> {
            String line = "";
            while (line != null) {
                line = reader.readLine();
                if (line == null)
                    break;
                System.out.println("Engine " + Thread.currentThread().getName() + " - " + line); //process line
                emitter.onNext(line);
            }
            emitter.onComplete();
        }).subscribeOn(Schedulers.newThread());

        source.subscribe(line -> System.out.println("UI 1   " + Thread.currentThread().getName() + " - " + line));
        source.subscribe(line -> System.out.println("UI 2   " + Thread.currentThread().getName() + " - " + line));

        TimeUnit.SECONDS.sleep(10);

    }
}

onSubscribe заставляет подписчиков получать уведомления параллельно. Что, если я не ошибаюсь, означает, что лямбда в create() будет выполняться параллельно для каждого потребителя.

Как следствие, если у меня есть два подписчика, каждый из них получает половину строк читателя. Thread-1 вызывает readLine(), который получает строку Thread-2 не получит, только следующую.

Все это имеет смысл, тем не менее, Должно быть, я что-то упускаю, потому что не могу понять, как это сделать:

  1. читать строки в одном потоке
  2. уведомлять всех абонентов одновременно - так каждый получает все линии

Я заглянул в Subject s, попытался связать Observable s, но все еще не мог понять этого.

Edit: я обновил пример до полного управляемого класса. Насколько я понимаю, проблема заключается в том, что горячие и холодные наблюдаемые. Как будто в документах сказано, что Observable.create(...) должен создать холодный, тогда как мой код явно ведет себя как горячая штучка.

Следующий вопрос: если я добавлю параметр type, делающий его Observable<String>, то вызов onSubscribe нарушает код, и он не будет компилироваться, так как это вернет Observable<Object>. Почему? Вызов onSubscribe по промежуточному параметру странно работает:

Observable<String> source = Observable.create(emitter -> {...});
Observable<String> source2 = source.subscribeOn(Schedulers.newThread())
1 2

1 ответ:

Использовать publish:

ConnectableObservable<String> o = Observable.create(emitter -> {
    try (BufferedReader reader = ...) {
        while (!emitter.isDisposed()) {
            String line = reader.readLine();
            if (line == null || line.equals("end")) {
                emitter.onComplete();
                return;          
            }
            emitter.onNext(line);
        }
    }
}).subscribeOn(Schedulers.io())
  .publish();

o.subscribe(/* consumer 1 */);
o.subscribe(/* consumer 2 */);

o.connect();