rxjava: могу ли я использовать retry (), но с задержкой?


Я использую rxjava в моем Android-приложении для асинхронной обработки сетевых запросов. Теперь я хотел бы повторить неудачный сетевой запрос только после того, как прошло определенное время.

есть ли способ использовать retry() на наблюдаемом, но повторить попытку только после определенной задержки?

есть ли способ сообщить наблюдаемому, что в настоящее время повторяется (в отличие от попытки в первый раз)?

Я посмотрел на debounce()/throttleWithTimeout () но они кажется, он делает что-то другое.

Edit:

Я думаю, что нашел один способ сделать это, но мне было бы интересно либо подтвердить, что это правильный способ сделать это, либо для других, лучших способов.

то, что я делаю, это: в методе call() моего Observable.OnSubscribe, прежде чем я вызову метод Subscribers onError (), я просто позволяю потоку спать в течение желаемого времени. Итак, чтобы повторить каждые 1000 миллисекунд, я делаю что-то вот так:

@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
    try {
        Log.d(TAG, "trying to load all products with pid: " + pid);
        subscriber.onNext(productClient.getProductNodesForParentId(pid));
        subscriber.onCompleted();
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e.printStackTrace();
        }
        subscriber.onError(e);
    }
}

Так как этот метод работает на потоке ввода-вывода в любом случае он не блокирует пользовательский интерфейс. Единственная проблема, которую я вижу, заключается в том, что даже первая ошибка с задержка так задержка есть даже если нет повтора(). Мне бы хотелось, чтобы задержка не применялась после ошибка, но вместо этого до повторите попытку (но не до первой попытки, очевидно).

12 81

12 ответов:

можно использовать retryWhen() оператор для добавления логики повтора к любому наблюдаемому.

следующий класс содержит логику:

RxJava 2.x

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> apply(final Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Function<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> apply(final Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

RxJava 1.x

public class RetryWithDelay implements
        Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

использование:

// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
    .retryWhen(new RetryWithDelay(3, 2000));

вдохновленный Павла и если вы не связаны с retryWhen проблемы, заявил Абхиджит Саркар, самый простой способ отложить повторную подписку с RxJava2 безусловно:

source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))

вы можете увидеть больше образцов и объяснений на retryWhen и repeatWhen.

это решение основано на фрагментах Бена Кристенсена, которые я видел,RetryWhen Пример и RetryWhenTestsConditional (Я должен был изменить n.getThrowable() to n для его работы). Я использовал evant / gradle-retrolambda чтобы лямбда-нотация работала на Android, но вам не нужно использовать лямбды (хотя это настоятельно рекомендуется). Для задержки я реализовал экспоненциальное отступление, но вы можете подключить любую логику отступления, которую вы хотите там. Для полноты Я добавил subscribeOn и observeOn операторы. Я использую ReactiveX / RxAndroid на AndroidSchedulers.mainThread().

int ATTEMPT_COUNT = 10;

public class Tuple<X, Y> {
    public final X x;
    public final Y y;

    public Tuple(X x, Y y) {
        this.x = x;
        this.y = y;
    }
}


observable
    .subscribeOn(Schedulers.io())
    .retryWhen(
            attempts -> {
                return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i))
                .flatMap(
                        ni -> {
                            if (ni.y > ATTEMPT_COUNT)
                                return Observable.error(ni.x);
                            return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS);
                        });
            })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

вместо использования MyRequestObservable.повторите попытку я использую функцию-оболочку retryObservable(MyRequestObservable, retrycount, seconds), которая возвращает новую наблюдаемую, которая обрабатывает косвенность для задержки, поэтому я могу сделать

retryObservable(restApi.getObservableStuff(), 3, 30)
    .subscribe(new Action1<BonusIndividualList>(){
        @Override
        public void call(BonusIndividualList arg0) 
        {
            //success!
        }
    }, 
    new Action1<Throwable>(){
        @Override
        public void call(Throwable arg0) { 
           // failed after the 3 retries !
        }}); 


// wrapper code
private static <T> Observable<T> retryObservable(
        final Observable<T> requestObservable, final int nbRetry,
        final long seconds) {

    return Observable.create(new Observable.OnSubscribe<T>() {

        @Override
        public void call(final Subscriber<? super T> subscriber) {
            requestObservable.subscribe(new Action1<T>() {

                @Override
                public void call(T arg0) {
                    subscriber.onNext(arg0);
                    subscriber.onCompleted();
                }
            },

            new Action1<Throwable>() {
                @Override
                public void call(Throwable error) {

                    if (nbRetry > 0) {
                        Observable.just(requestObservable)
                                .delay(seconds, TimeUnit.SECONDS)
                                .observeOn(mainThread())
                                .subscribe(new Action1<Observable<T>>(){
                                    @Override
                                    public void call(Observable<T> observable){
                                        retryObservable(observable,
                                                nbRetry - 1, seconds)
                                                .subscribe(subscriber);
                                    }
                                });
                    } else {
                        // still fail after retries
                        subscriber.onError(error);
                    }

                }
            });

        }

    });

}

теперь с RxJava версии 1.0+ вы можете использовать zipWith для достижения повтора с задержкой.

добавление изменений в kjones ответ.

изменен

public class RetryWithDelay implements 
                            Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int MAX_RETRIES;
    private final int DELAY_DURATION;
    private final int START_RETRY;

    /**
     * Provide number of retries and seconds to be delayed between retry.
     *
     * @param maxRetries             Number of retries.
     * @param delayDurationInSeconds Seconds to be delays in each retry.
     */
    public RetryWithDelay(int maxRetries, int delayDurationInSeconds) {
        MAX_RETRIES = maxRetries;
        DELAY_DURATION = delayDurationInSeconds;
        START_RETRY = 1;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> observable) {
        return observable
                .delay(DELAY_DURATION, TimeUnit.SECONDS)
                .zipWith(Observable.range(START_RETRY, MAX_RETRIES), 
                         new Func2<Throwable, Integer, Integer>() {
                             @Override
                             public Integer call(Throwable throwable, Integer attempt) {
                                  return attempt;
                             }
                         });
    }
}

вы можете добавить задержку в наблюдаемый возвращается в Retrywhen оператор

          /**
 * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
 */
@Test
public void observableOnErrorResumeNext() {
    Subscription subscription = Observable.just(null)
                                          .map(Object::toString)
                                          .doOnError(failure -> System.out.println("Error:" + failure.getCause()))
                                          .retryWhen(errors -> errors.doOnNext(o -> count++)
                                                                     .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
                                                     Schedulers.newThread())
                                          .onErrorResumeNext(t -> {
                                              System.out.println("Error after all retries:" + t.getCause());
                                              return Observable.just("I save the world for extinction!");
                                          })
                                          .subscribe(s -> System.out.println(s));
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}

вы можете увидеть здесь примеры. https://github.com/politrons/reactive

retryWhen - Это сложный, возможно, даже багги, оператор. Официальный документ и по крайней мере один ответ здесь используют range оператор, который завершится неудачей, если не будут сделаны повторные попытки. Смотрите мой обсуждение С членом ReactiveX Дэвидом Карноком.

я улучшил ответ kjones, изменив flatMap до concatMap и добавив RetryDelayStrategy класса. flatMap не сохраняет порядок эмиссии, в то время как concatMap делает, что важно для задержек с отступлением. Элемент RetryDelayStrategy, как следует из названия, давайте пользователю выбирать из различных режимов генерации задержек повтора, включая back-off. Код доступен на моем GitHub выполнить следующие тесты:

  1. успешно с 1-й попытки (без повторных попыток)
  2. сбой после 1 попытки
  3. пытается повторить 3 раза, но успешно на 2-й, следовательно, не повторяет 3-й раз
  4. успешно при 3-й попытке

посмотреть setRandomJokes метод.

тот же ответ, что и от kjones но обновлен до последней версии Ибо RxJava 2.x версия: ('io.реактивекс.rxjava2:rxjava:2.1.3')

public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> {
private final int maxRetries;
private final long retryDelayMillis;
private int retryCount;

public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
    this.maxRetries = maxRetries;
    this.retryDelayMillis = retryDelayMillis;
    this.retryCount = 0;
}

@Override
public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
    return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
        @Override
        public Publisher<?> apply(Throwable throwable) throws Exception {
            if (++retryCount < maxRetries) {
                // When this Observable calls onNext, the original
                // Observable will be retried (i.e. re-subscribed).
                return Flowable.timer(retryDelayMillis,
                        TimeUnit.MILLISECONDS);
            }

            // Max retries hit. Just pass the error along.
            return Flowable.error(throwable);
        }
    });
}

}

использование:

/ / добавить логику повтора к существующим наблюдаемым. // Повтора максимум 3 раза с задержкой в 2 секунды.

observable
    .retryWhen(new RetryWithDelay(3, 2000));

для версии Kotlin & RxJava1

class RetryWithDelay(private val MAX_RETRIES: Int, private val DELAY_DURATION_IN_SECONDS: Long)
    : Function1<Observable<out Throwable>, Observable<*>> {

    private val START_RETRY: Int = 1

    override fun invoke(observable: Observable<out Throwable>): Observable<*> {
        return observable.delay(DELAY_DURATION_IN_SECONDS, TimeUnit.SECONDS)
            .zipWith(Observable.range(START_RETRY, MAX_RETRIES),
                object : Function2<Throwable, Int, Int> {
                    override fun invoke(throwable: Throwable, attempt: Int): Int {
                        return attempt
                    }
                })
    }
}

(Kotlin) я немного улучшил код с экспоненциальным отступом и прикладной защитой, излучающей наблюдаемое.диапазон():

    fun testOnRetryWithDelayExponentialBackoff() {
    val interval = 1
    val maxCount = 3
    val ai = AtomicInteger(1);
    val source = Observable.create<Unit> { emitter ->
        val attempt = ai.getAndIncrement()
        println("Subscribe ${attempt}")
        if (attempt >= maxCount) {
            emitter.onNext(Unit)
            emitter.onComplete()
        }
        emitter.onError(RuntimeException("Test $attempt"))
    }

    // Below implementation of "retryWhen" function, remove all "println()" for real code.
    val sourceWithRetry: Observable<Unit> = source.retryWhen { throwableRx ->
        throwableRx.doOnNext({ println("Error: $it") })
                .zipWith(Observable.range(1, maxCount)
                        .concatMap { Observable.just(it).delay(0, TimeUnit.MILLISECONDS) },
                        BiFunction { t1: Throwable, t2: Int -> t1 to t2 }
                )
                .flatMap { pair ->
                    if (pair.second >= maxCount) {
                        Observable.error(pair.first)
                    } else {
                        val delay = interval * 2F.pow(pair.second)
                        println("retry delay: $delay")
                        Observable.timer(delay.toLong(), TimeUnit.SECONDS)
                    }
                }
    }

    //Code to print the result in terminal.
    sourceWithRetry
            .doOnComplete { println("Complete") }
            .doOnError({ println("Final Error: $it") })
            .blockingForEach { println("$it") }
}

этот пример работает с jxjava 2.2.2:

повторите попытку без задержки:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retry(5)
   .doOnSuccess(status -> log.info("Yay! {}", status);

повторите попытку с задержкой:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS))
   .doOnSuccess(status -> log.info("Yay! {}", status)
   .doOnError((Throwable error) 
                -> log.error("I tried five times with a 300ms break" 
                             + " delay in between. But it was in vain."));

наш исходный сингл терпит неудачу, если someConnection.отправить () не удается. Когда это происходит, наблюдаемые сбои внутри retryWhen выдает ошибку. Мы задерживаем это излучение на 300 мс и отправляем его обратно, чтобы сигнализировать о повторной попытке. возьмите (5) гарантии того, что наша наблюдаемая сигнализация завершится после того, как мы получим пять ошибок. retryWhen видит прекращение и не делает повторите попытку после пятой неудачи.

просто сделайте это так:

                  Observable.just("")
                            .delay(2, TimeUnit.SECONDS) //delay
                            .flatMap(new Func1<String, Observable<File>>() {
                                @Override
                                public Observable<File> call(String s) {
                                    L.from(TAG).d("postAvatar=");

                                    File file = PhotoPickUtil.getTempFile();
                                    if (file.length() <= 0) {
                                        throw new NullPointerException();
                                    }
                                    return Observable.just(file);
                                }
                            })
                            .retry(6)
                            .subscribe(new Action1<File>() {
                                @Override
                                public void call(File file) {
                                    postAvatar(file);
                                }
                            }, new Action1<Throwable>() {
                                @Override
                                public void call(Throwable throwable) {

                                }
                            });