Поведение дросселя RxJS; получите первое значение немедленно


Пример Plunkr: https://plnkr.co/edit/NZwb3ol8CbZFtSc6Q9zm?p=preview

Я знаю, что существуют эти 3 метода дроссельной заслонки для rxjs (5.0 beta.4):

auditTime(), throttleTime() и debounceTime()

Поведение, которое я ищу, является тем, которое лодаш делает по умолчанию на дроссельной заслонке:

    1) немедленно дайте мне первое значение!
  • 2) на последовательных значениях удерживайте значения для заданной задержки, затем выделите последнее возникшее значение
  • 3) когда истекла задержка дроссельной заслонки, вернитесь в состояние (1)

Теоретически это должно выглядеть так:

inputObservable
  .do(() => cancelPreviousRequest())
  .throttleTime(500)
  .subscribe((value) => doNextRequest(value))

Но

  • throttleTime никогда не дает мне последнее значение, если испускается в таймаут дроссельной заслонки
  • debounceTime срабатывает не сразу
  • auditTime срабатывает не сразу

Могу ли я объединить любой из методов rxjs для достижения описанного поведения?

2 6

2 ответа:

Для старых RxJs я написал оператор concatLatest, который делает большую часть того, что вы хотите. С его помощью вы можете получить свое поведение регулирования с помощью этого кода:

const delay = Rx.Observable.empty().delay(500);
inputObservable
    .map(value => Rx.Observable.of(value).concat(delay))
    .concatLatest()
    .subscribe(...);

Вот оператор. Я попытался обновить его для работы с RxJS5:

Rx.Observable.prototype.concatLatest = function () {
    /// <summary>
    /// Concatenates an observable sequence of observable sequences, skipping sequences that arrive while the current sequence is being observed.
    /// If N new observables arrive while the current observable is being observed, the first N-1 new observables will be thrown
    /// away and only the Nth will be observed.
    /// </summary>
    /// <returns type="Rx.Observable"></returns>
    var source = this;

    return Rx.Observable.create(function (observer) {
        var latest,
            isStopped,
            isBusy,
            outerSubscription,
            innerSubscription,
            subscriptions = new Rx.Subscription(function () {
              if (outerSubscription) {
                outerSubscription.unsubscribe();
              }
              if (innerSubscription) {
                innerSubscription.unsubscribe();
              }
            }),
            onError = observer.error.bind(observer),
            onNext = observer.next.bind(observer),
            innerOnComplete = function () {
                var inner = latest;
                if (inner) {
                    latest = undefined;
                    if (innerSubscription) {
                      innerSubscription.unsubscribe();
                    }
                    innerSubscription = inner.subscribe(onNext, onError, innerOnComplete);
                }
                else {
                    isBusy = false;
                    if (isStopped) {
                        observer.complete();
                    }
                }
            };

        outerSubscription = source.subscribe(function (newInner) {
            if (isBusy) {
                latest = newInner;
            }
            else {
                isBusy = true;
                if (innerSubscription) {
                  innerSubscription.unsubscribe();
                }
                innerSubscription = newInner.subscribe(onNext, onError, innerOnComplete);
            }
        }, onError, function () {
            isStopped = true;
            if (!isBusy) {
                observer.complete();
            }
        });

        return subscriptions;
    });
};

И вот обновленный plunkr: https://plnkr.co/edit/DSVmSPRijJwj9msefjRi?p=preview

Примечание я обновил вашу версию lodash до последней версии. В лодашь 4.7, я переделал дроссель операторов/дребезга исправить какой-нибудь крайний случай жуки. Вы использовали 4.6.1, в котором все еще были некоторые из этих ошибок, хотя я не думаю, что они повлияли на ваш тест.

Я взял оператора auditTime и изменил 2 строки, чтобы добиться желаемого поведения.

Новый плунжер: https://plnkr.co/edit/4NkXsOeJOSrLUP9WEtp0?p=preview

Оригинал:

Изменения:

Из (auditTime):

protected _next(value: T): void {
  this.value = value;
  this.hasValue = true;
  if (!this.throttled) {
    this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
  }
}

clearThrottle(): void {
  const { value, hasValue, throttled } = this;
  if (throttled) {
    this.remove(throttled);
    this.throttled = null;
    throttled.unsubscribe();
  }
  if (hasValue) {
    this.value = null;
    this.hasValue = false;
    this.destination.next(value);
  }
}

To (auditTimeImmediate):

protected _next(value: T): void {
    this.value = value;
    this.hasValue = true;
    if (!this.throttled) {
        // change 1:
        this.clearThrottle();
    }
}

clearThrottle(): void {
    const { value, hasValue, throttled } = this;
    if (throttled) {
        this.remove(throttled);
        this.throttled = null;
        throttled.unsubscribe();
    }
    if (hasValue) {
        this.value = null;
        this.hasValue = false;
        this.destination.next(value);
        // change 2:
        this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
    }
}

Поэтому я начинаю тайм-аут после того, как значение было nextизд.

Использование:

inputObservable
  .do(() => cancelPreviousRequest())
  .auditTimeImmediate(500)
  .subscribe((value) => doNextRequest(value))