Использование Rxjs для вызова одновременных блоков запросов параллельно с задержкой?


У меня есть очередь задач (длиной 20), где каждая задача-это ajax-запрос, который нужно вызвать .

Я хочу:

1) создать блоки из 5 (20/5 =4 блока)
2) выполнить каждый фрагмент, где каждый элемент в фрагменте будет выполнен с задержкой 1000 мс.
3) когда каждый элемент кусков завершен , подождите 3 секунды .

Итак:

1..1 сек (↦зеленый) ..2..1 сек (↦зеленый) ..3..1 сек (↦зеленый) ..4..1 сек( ↦ зеленый) ..5.....................3сек ..........
6..1 сек (↦зеленый) ..7..1 сек (↦зеленый) ..8..1 сек (↦зеленый) ..9..1 сек (↦зеленый) ..10..................... 3сек .......... ... 11..1 сек (↦зеленый) ..12..1 сек (↦зеленый) ..13..1 сек (↦зеленый) ..14..1 сек (↦зеленый) ..15..................... 3сек ..........
16..1 сек (↦зеленый) ..17..1 сек( ↦ зеленый) ..18..1 сек (↦зеленый) ..19..1 сек (↦зеленый) ..20

Мне удалось сделать что-то близкое:

Введите описание изображения здесь

С :

from(this.httpCodes)
      .pipe(bufferCount(5),
       concatMap((i, y) => from(i).pipe(mergeMap(f => {
                                    this.httpCodes[f.index].wasExecuted = true;
                                     return this.ajaxAlike(f.data).pipe(
                                                               catchError(() => { return of(-1) }),
                                                               map((r) => ({ res: r, data: f }))
                                                                      )
                                                      }) 
                                        ,delay(3000) )),

      )

Но он не выполняется так, как я намеревался. Я не вижу задержек между каждым элементом в chunk

Вопрос:

Почему я вижу так много запросов , и как я могу изменить свой код так, чтобы каждый элемент в блоке выполнялся с задержкой 1 сек (зеленый должен появляться после каждого второй), и - после каждого куска, ждать 3 секунды?

Онлайн Демо

2 3

2 ответа:

Операторdelay задерживает излучаемый элемент. Похоже, вы ожидаете, что он испустит элемент, а затем "спит" в течение 3 секунд, прежде чем испустить следующий. Чтобы достичь этого, вы можете объединить пустую задержанную наблюдаемую.

Вы можете создать следующие трубопроводы оператор sleep :

const sleep = ms => concat(Rx.Observable.empty().pipe(delay(ms)))

И использовать его следующим образом:

const {concatMap, concat, delay, bufferCount} = Rx.operators;

const sleep = ms => concat(Rx.Observable.empty().pipe(delay(ms)));

const ajaxAlike = call => Rx.Observable.of(call).pipe(delay(500));

Rx.Observable.range(0, 20).pipe(
  bufferCount(5),
  concatMap(calls => 
    Rx.Observable.from(calls).pipe(
      concatMap(call => ajaxAlike(call).pipe(sleep(1000))),
      sleep(3000)
    )
  )
)
.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.7/Rx.js"></script>

EDIT : Хорошо, теперь я понимаю, что вы имеете в виду. Я обновил скрипку, так что все части куска выполняются параллельно, но куски ждут друг друга для завершения. Так что это должно сделать трюк:

const chunkSize = 5;
const requestsWithDelay = httpRequests.map(obs => obs.delay(1000));

let chunks = [];
for (let i = 0; i < requestsWithDelay.length; i += chunkSize) {
    chunks.push(requestsWithDelay.slice(i, i + chunkSize));
}

const chunkedRequests = chunks.map(chunk => Rx.Observable.forkJoin(...chunk).delay(3000));
const parallelChunkRequest = Rx.Observable.concat(...chunkedRequests);
parallelChunkRequest.subscribe();

Оригинальный Ответ:

Что-то вроде этого даст вам желаемые задержки (учитывая httpRequests как массив наблюдаемых объектов):

const requestsWithDelay = httpRequests.map((obs, idx) => { 
  let msDelay = 1000;
  if ((idx + 1) % 5 === 0 && idx < httpRequests.length - 1) {
    msDelay = 3000;
  }

  return obs.delay(msDelay);
});

const request = Rx.Observable.concat(...requestsWithDelay);
Это должно сработать, но не будет "реальных" кусков наблюдаемых объектов. Запросы в каждом куске не будут выполняется параллельно (как и при использовании mergeMap), но последовательно.

Чтобы получить abservable из httpRequests вы могли бы что-то вроде этого (но без задержки в канале):

const httpRequests = this.httpCodes.map(data => this.ajaxAlike(data));

Но если вы хотите, чтобы куски выполнялись параллельно, вы можете сделать что-то вроде этого:

let chunks = [];
for (let i = 0; i < requestsWithDelay.length; i += 5) {
    chunks.push(requestsWithDelay.slice(i, i + 5));
}

const chunkedRequests = chunks.map(chunk => Rx.Observable.concat(...chunk));
const parallelChunkRequest = Rx.Observable.merge(...chunkedRequests);

Я создал демо-скрипку

Но зачем вам нужна задержка в 3 секунды после каждого фрагмента, если они выполняются параллельно и не ждут друг друга?