Связывание нескольких наблюдаемых объектов с различными типами излучения


Я пытаюсь понять, как связать воедино наблюдаемые объекты. У меня есть существующий метод: public static Observable<Data> getData(). В моем другом классе у меня есть такой существующий код:

doSomeBackgroundWork()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })

Теперь я хотел бы связать вызов getData() с этим вызовом. Как бы я это сделал? Я попробовал Это изначально:

doSomeBackgroundWork()
.flatMap(s -> call() {
   mApi.getData()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })

Но это не работает, потому что код getData () фактически выполняется в главном потоке.

Даже это не работает:

doSomeBackgroundWork()
.concatMap(s -> call() {
   mApi.getData()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })

Кроме того, когда я пытаюсь это сделать, проблема заключается в том, что zipWith означает два наблюдаемые объекты идут параллельно, и мне очень хочется, чтобы они шли один за другим.

doSomeBackgroundWork()
.zipWith(mApi.getData()),
    new Func2<BgWork, DataResponse,DataResponse>() {
    @Override
    public DataResponse call(BgWork bgWork, DatResponse data) {
       return data;
    }})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<..>() { ... })
1 2

1 ответ:

flatMap оператор-это путь, по которому нужно идти здесь, вам просто нужно обрабатывать параллелизм. Если вы хотите запустить весь метод getData() на планировщике io, то вы можете просто применить оператор observeOn Перед flatMap, а затем снова после него следующим образом:

doSomeBackgroundWork()
  .observeOn(Schedulers.io())
  .flatMap(s -> call() {
       mApi.getData()
  }
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Subscriber<..>() { ... })

Видите ли, оператор subscribeOn заставляет производителя "вычислять" и выдавать данные по предоставленному планировщику, поэтому не имеет значения, где вы используете его в композиции потока, и он также не имеет эффекта при многократном использовании. Но это не тот случай с оператором observeOn. Он скорее говорит следующему потоку выполнять работу на другом планировщике. Это означает,что когда вы используете его снова позже, вы можете снова перенаправить вычисления на другой планировщик.

Однако, если вам нужно выполнить только ту работу, которая производится наблюдаемым, возвращенным из метода getData() на другом планировщике, вы можете использовать subscribeOn на этом наблюдаемом, а не на основном потоке.

doSomeBackgroundWork()
  .flatMap(s -> call() {
       mApi.getData().subscribeOn(Schedulers.io())
  }
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Subscriber<..>() { ... })