Поведение дросселя RxJS; получите первое значение немедленно
Пример Plunkr: https://plnkr.co/edit/NZwb3ol8CbZFtSc6Q9zm?p=preview
Я знаю, что существуют эти 3 метода дроссельной заслонки для rxjs (5.0 beta.4):
auditTime()
, throttleTime()
и debounceTime()
Поведение, которое я ищу, является тем, которое лодаш делает по умолчанию на дроссельной заслонке:
- 1) немедленно дайте мне первое значение!
- 2) на последовательных значениях удерживайте значения для заданной задержки, затем выделите последнее возникшее значение
- 3) когда истекла задержка дроссельной заслонки, вернитесь в состояние (1)
Теоретически это должно выглядеть так:
inputObservable
.do(() => cancelPreviousRequest())
.throttleTime(500)
.subscribe((value) => doNextRequest(value))
Но
-
throttleTime
никогда не дает мне последнее значение, если испускается в таймаут дроссельной заслонки -
debounceTime
срабатывает не сразу -
auditTime
срабатывает не сразу
Могу ли я объединить любой из методов rxjs для достижения описанного поведения?
2 ответа:
Для старых RxJs я написал оператор
concatLatest
, который делает большую часть того, что вы хотите. С его помощью вы можете получить свое поведение регулирования с помощью этого кода:const delay = Rx.Observable.empty().delay(500); inputObservable .map(value => Rx.Observable.of(value).concat(delay)) .concatLatest() .subscribe(...);
Вот оператор. Я попытался обновить его для работы с RxJS5:
Rx.Observable.prototype.concatLatest = function () { /// <summary> /// Concatenates an observable sequence of observable sequences, skipping sequences that arrive while the current sequence is being observed. /// If N new observables arrive while the current observable is being observed, the first N-1 new observables will be thrown /// away and only the Nth will be observed. /// </summary> /// <returns type="Rx.Observable"></returns> var source = this; return Rx.Observable.create(function (observer) { var latest, isStopped, isBusy, outerSubscription, innerSubscription, subscriptions = new Rx.Subscription(function () { if (outerSubscription) { outerSubscription.unsubscribe(); } if (innerSubscription) { innerSubscription.unsubscribe(); } }), onError = observer.error.bind(observer), onNext = observer.next.bind(observer), innerOnComplete = function () { var inner = latest; if (inner) { latest = undefined; if (innerSubscription) { innerSubscription.unsubscribe(); } innerSubscription = inner.subscribe(onNext, onError, innerOnComplete); } else { isBusy = false; if (isStopped) { observer.complete(); } } }; outerSubscription = source.subscribe(function (newInner) { if (isBusy) { latest = newInner; } else { isBusy = true; if (innerSubscription) { innerSubscription.unsubscribe(); } innerSubscription = newInner.subscribe(onNext, onError, innerOnComplete); } }, onError, function () { isStopped = true; if (!isBusy) { observer.complete(); } }); return subscriptions; }); };
И вот обновленный plunkr: https://plnkr.co/edit/DSVmSPRijJwj9msefjRi?p=preview
Примечание я обновил вашу версию lodash до последней версии. В лодашь 4.7, я переделал дроссель операторов/дребезга исправить какой-нибудь крайний случай жуки. Вы использовали 4.6.1, в котором все еще были некоторые из этих ошибок, хотя я не думаю, что они повлияли на ваш тест.
Я взял оператора auditTime и изменил 2 строки, чтобы добиться желаемого поведения.
Новый плунжер: https://plnkr.co/edit/4NkXsOeJOSrLUP9WEtp0?p=preview
Оригинал:
Изменения:
Из (auditTime):
protected _next(value: T): void { this.value = value; this.hasValue = true; if (!this.throttled) { this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this)); } } clearThrottle(): void { const { value, hasValue, throttled } = this; if (throttled) { this.remove(throttled); this.throttled = null; throttled.unsubscribe(); } if (hasValue) { this.value = null; this.hasValue = false; this.destination.next(value); } }
To (auditTimeImmediate):
protected _next(value: T): void { this.value = value; this.hasValue = true; if (!this.throttled) { // change 1: this.clearThrottle(); } } clearThrottle(): void { const { value, hasValue, throttled } = this; if (throttled) { this.remove(throttled); this.throttled = null; throttled.unsubscribe(); } if (hasValue) { this.value = null; this.hasValue = false; this.destination.next(value); // change 2: this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this)); } }
Поэтому я начинаю тайм-аут после того, как значение было
next
изд.Использование:
inputObservable .do(() => cancelPreviousRequest()) .auditTimeImmediate(500) .subscribe((value) => doNextRequest(value))