RxJava: наблюдаемый поток и поток по умолчанию
У меня есть следующий код:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
s.onNext("1");
s.onComplete();
}
});
thread.setName("background-thread-1");
thread.start();
}
}).map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
String threadName = Thread.currentThread().getName();
logger.logDebug("map: thread=" + threadName);
return "map-" + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(String s) {
String threadName = Thread.currentThread().getName();
logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
String threadName = Thread.currentThread().getName();
logger.logDebug("onComplete: thread=" + threadName);
}
});
И вот результат:
map: thread=background-thread-1
onNext: thread=background-thread-1, value=map-1
onComplete: thread=background-thread-1
Важная деталь: я вызываю метод subscribe
из другого потока (main
поток в Android).
Так выглядит Observable
класс синхронный и по умолчанию, и он выполняет все (операторы, такие как map
+ уведомление подписчиков) в том же потоке, который выдает события (s.onNext
), верно? Я удивляюсь... это намеренное поведение или я просто что-то неправильно понял? На самом деле я ожидал этого в по меньшей мере onNext
и onComplete
обратные вызовы будут вызываться в потоке вызывающего объекта, а не в потоке, излучающем события. Правильно ли я понимаю, что в данном конкретном случае фактический поток вызывающего не имеет значения? По крайней мере, когда события генерируются асинхронно.
Другая проблема-что делать, если я получаю некоторый наблюдаемый параметр из какого-то внешнего источника (т. е. я не генерирую его самостоятельно)... для меня, как его пользователя, нет способа проверить, является ли он синхронным или асинхронным, и я просто нужно явно указать, где я хочу получать обратные вызовы с помощью методов subscribeOn
и observeOn
, верно?
Спасибо!
1 ответ:
RxJava не имеет представления о параллелизме. Он будет производить значения в потоке подписки, если вы не используете какой-либо другой механизм, такой как observeOn/ subscribeOn. Пожалуйста, не используйте низкоуровневые конструкции, такие как Thread в операторах, вы можете разорвать контракт.
Из-за использования потока onNext будет вызван из вызывающего потока ('background-thread-1'). Подписка происходит в вызывающем потоке (UI-Thread). Каждый оператор вниз по цепочке будет вызван из 'background-thread-1' - вызывающий поток. Подписка onNext также будет вызвана из 'background-thread-1'.
Если вы хотите создавать значения не в вызывающем потоке, используйте: subscribeOn. Если вы хотите переключить поток обратно в главное использование observeOn где-то в цепи. Скорее всего, перед тем, как подписаться на него.
Пример:
Observable.just(1,2,3) // creation of observable happens on Computational-Threads .subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Last will overwrite .map(integer -> integer) // map happens on Computational-Threads .observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread .subscribe(integer -> { // called from mainThread });
Вот хорошее объяснение. http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html