Использование Rxjs для вызова одновременных блоков запросов параллельно с задержкой?
У меня есть очередь задач (длиной 20), где каждая задача-это ajax-запрос, который нужно вызвать .
Я хочу:
1) создать блоки из 5 (20/5 =4 блока)
2) выполнить каждый фрагмент, где каждый элемент в фрагменте будет выполнен с задержкой 1000 мс.
3) когда каждый элемент кусков завершен , подождите 3 секунды .
Итак:
1..1 сек (↦зеленый) ..2..1 сек (↦зеленый) ..3..1 сек (↦зеленый) ..4..1 сек( ↦ зеленый) ..5.....................3сек ..........
6..1 сек (↦зеленый) ..7..1 сек (↦зеленый) ..8..1 сек (↦зеленый) ..9..1 сек (↦зеленый) ..10..................... 3сек ..........
...
11..1 сек (↦зеленый) ..12..1 сек (↦зеленый) ..13..1 сек (↦зеленый) ..14..1 сек (↦зеленый) ..15..................... 3сек ..........
16..1 сек (↦зеленый) ..17..1 сек( ↦ зеленый) ..18..1 сек (↦зеленый) ..19..1 сек (↦зеленый) ..20
Мне удалось сделать что-то близкое:
С :
from(this.httpCodes)
.pipe(bufferCount(5),
concatMap((i, y) => from(i).pipe(mergeMap(f => {
this.httpCodes[f.index].wasExecuted = true;
return this.ajaxAlike(f.data).pipe(
catchError(() => { return of(-1) }),
map((r) => ({ res: r, data: f }))
)
})
,delay(3000) )),
)
Но он не выполняется так, как я намеревался. Я не вижу задержек между каждым элементом в chunk
Вопрос:
Почему я вижу так много запросов , и как я могу изменить свой код так, чтобы каждый элемент в блоке выполнялся с задержкой 1 сек (зеленый должен появляться после каждого второй), и - после каждого куска, ждать 3 секунды?
Онлайн Демо
2 ответа:
Операторdelay задерживает излучаемый элемент. Похоже, вы ожидаете, что он испустит элемент, а затем "спит" в течение 3 секунд, прежде чем испустить следующий. Чтобы достичь этого, вы можете объединить пустую задержанную наблюдаемую.
Вы можете создать следующие трубопроводы оператор sleep :
const sleep = ms => concat(Rx.Observable.empty().pipe(delay(ms)))
И использовать его следующим образом:
const {concatMap, concat, delay, bufferCount} = Rx.operators; const sleep = ms => concat(Rx.Observable.empty().pipe(delay(ms))); const ajaxAlike = call => Rx.Observable.of(call).pipe(delay(500)); Rx.Observable.range(0, 20).pipe( bufferCount(5), concatMap(calls => Rx.Observable.from(calls).pipe( concatMap(call => ajaxAlike(call).pipe(sleep(1000))), sleep(3000) ) ) ) .subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.7/Rx.js"></script>
EDIT : Хорошо, теперь я понимаю, что вы имеете в виду. Я обновил скрипку, так что все части куска выполняются параллельно, но куски ждут друг друга для завершения. Так что это должно сделать трюк:
const chunkSize = 5; const requestsWithDelay = httpRequests.map(obs => obs.delay(1000)); let chunks = []; for (let i = 0; i < requestsWithDelay.length; i += chunkSize) { chunks.push(requestsWithDelay.slice(i, i + chunkSize)); } const chunkedRequests = chunks.map(chunk => Rx.Observable.forkJoin(...chunk).delay(3000)); const parallelChunkRequest = Rx.Observable.concat(...chunkedRequests); parallelChunkRequest.subscribe();
Оригинальный Ответ:
Что-то вроде этого даст вам желаемые задержки (учитывая httpRequests как массив наблюдаемых объектов):
Это должно сработать, но не будет "реальных" кусков наблюдаемых объектов. Запросы в каждом куске не будут выполняется параллельно (как и при использовании mergeMap), но последовательно.const requestsWithDelay = httpRequests.map((obs, idx) => { let msDelay = 1000; if ((idx + 1) % 5 === 0 && idx < httpRequests.length - 1) { msDelay = 3000; } return obs.delay(msDelay); }); const request = Rx.Observable.concat(...requestsWithDelay);
Чтобы получить abservable из httpRequests вы могли бы что-то вроде этого (но без задержки в канале):
const httpRequests = this.httpCodes.map(data => this.ajaxAlike(data));
Но если вы хотите, чтобы куски выполнялись параллельно, вы можете сделать что-то вроде этого:
let chunks = []; for (let i = 0; i < requestsWithDelay.length; i += 5) { chunks.push(requestsWithDelay.slice(i, i + 5)); } const chunkedRequests = chunks.map(chunk => Rx.Observable.concat(...chunk)); const parallelChunkRequest = Rx.Observable.merge(...chunkedRequests);
Я создал демо-скрипку
Но зачем вам нужна задержка в 3 секунды после каждого фрагмента, если они выполняются параллельно и не ждут друг друга?