Связывание нескольких наблюдаемых объектов с различными типами излучения
Я пытаюсь понять, как связать воедино наблюдаемые объекты. У меня есть существующий метод: public static Observable<Data> getData()
. В моем другом классе у меня есть такой существующий код:
doSomeBackgroundWork()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })
Теперь я хотел бы связать вызов getData()
с этим вызовом. Как бы я это сделал? Я попробовал Это изначально:
doSomeBackgroundWork()
.flatMap(s -> call() {
mApi.getData()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })
Но это не работает, потому что код getData () фактически выполняется в главном потоке.
Даже это не работает:
doSomeBackgroundWork()
.concatMap(s -> call() {
mApi.getData()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })
Кроме того, когда я пытаюсь это сделать, проблема заключается в том, что zipWith
означает два наблюдаемые объекты идут параллельно, и мне очень хочется, чтобы они шли один за другим.
doSomeBackgroundWork()
.zipWith(mApi.getData()),
new Func2<BgWork, DataResponse,DataResponse>() {
@Override
public DataResponse call(BgWork bgWork, DatResponse data) {
return data;
}})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })
1 ответ:
flatMap
оператор-это путь, по которому нужно идти здесь, вам просто нужно обрабатывать параллелизм. Если вы хотите запустить весь методgetData()
на планировщикеio
, то вы можете просто применить операторobserveOn
ПередflatMap
, а затем снова после него следующим образом:doSomeBackgroundWork() .observeOn(Schedulers.io()) .flatMap(s -> call() { mApi.getData() } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<..>() { ... })
Видите ли, оператор
subscribeOn
заставляет производителя "вычислять" и выдавать данные по предоставленному планировщику, поэтому не имеет значения, где вы используете его в композиции потока, и он также не имеет эффекта при многократном использовании. Но это не тот случай с операторомobserveOn
. Он скорее говорит следующему потоку выполнять работу на другом планировщике. Это означает,что когда вы используете его снова позже, вы можете снова перенаправить вычисления на другой планировщик.Однако, если вам нужно выполнить только ту работу, которая производится наблюдаемым, возвращенным из метода
getData()
на другом планировщике, вы можете использоватьsubscribeOn
на этом наблюдаемом, а не на основном потоке.doSomeBackgroundWork() .flatMap(s -> call() { mApi.getData().subscribeOn(Schedulers.io()) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<..>() { ... })