WedX - журнал о программировании и компьютерных науках

Экспоненциальный откат в RxJava

У меня есть API, который принимает Observable, запускающий событие.

Я хочу вернуть Observable, который выдает значение каждые defaultDelay секунды, если обнаружено подключение к Интернету, и задерживает numberOfFailedAttempts^2 раз, если подключение отсутствует.

Я пробовал кучу различных стилей, самая большая проблема, с которой я столкнулся, - это retryWhen's наблюдаемое оценивается только один раз:

Observable
    .interval(defaultDelay,TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.io())
    .repeatWhen((observable) ->
         observable.concatMap(repeatObservable -> {
             if(internetConnectionDetector.isInternetConnected()){
                 consecutiveRetries = 0;
                 return observable;
             } else {
                 consecutiveRetries++;
                 int backoffDelay = (int)Math.pow(consecutiveRetries,2);
                 return observable.delay(backoffDelay, TimeUnit.SECONDS);
                }
         }).onBackpressureDrop())
    .onBackpressureDrop();

Есть ли способ сделать то, что я пытаюсь сделать? Я нашел связанный вопрос (сейчас не могу найти его в поиске), но выбранный подход, похоже, не работает с динамическим значением.


Ответы:


1

В вашем коде есть две ошибки:

  1. Чтобы повторить наблюдаемую последовательность, она должна быть конечной. Т.е. вместо interval вам лучше использовать что-то вроде just или fromCallable, как я сделал в примере ниже.
  2. Из внутренней функции repeatWhen вам нужно вернуть новый отсроченный наблюдаемый источник, поэтому вместо observable.delay() вы должны вернуть Observable.timer().

Рабочий код:

public void testRepeat() throws InterruptedException {
    logger.info("test start");

    int DEFAULT_DELAY = 100; // ms
    int ADDITIONAL_DELAY = 100; // ms
    AtomicInteger generator = new AtomicInteger(0);
    AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive

    Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
            .repeatWhen(counts -> {
                AtomicInteger retryCounter = new AtomicInteger(0);
                return counts.flatMap(c -> {
                    int retry = 0;
                    if (connectionAlive.get()) {
                        retryCounter.set(0); // reset counter
                    } else {
                        retry = retryCounter.incrementAndGet();
                    }
                    int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2);
                    logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay);
                    return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS);
                });
            })
            .subscribe(v -> logger.info("got {}", v));

    Thread.sleep(220);
    logger.info("connection dropped");
    connectionAlive.set(false);
    Thread.sleep(2000);
    logger.info("connection is back alive");
    connectionAlive.set(true);
    Thread.sleep(2000);
    subscription.dispose();
    logger.info("test complete");
}

См. Подробную статью о repeatWhen здесь.

17.01.2017
  • Образец в вопросе, вероятно, был из середины моих попыток, поскольку он, кажется, смешивает два подхода, которые я использовал (один таймер + на основе повторных попыток, один интервал + на основе отложенной подписки), проблема была на самом деле из той статьи, в которой говорится, что ввод observable to retry / repeat следует использовать снова. Не вызывает ли эта наблюдаемая причина проблем с утечкой подписок? 17.01.2017
  • @AssortedTrailmix Это было о вводе первого уровня, а не о внутреннем flatMap. См. Последний пример в этой статье для очень похожего шаблона. 17.01.2017
  • о, я вижу, извини, что я пропустил counts было то, что получило FlatMap'ed 17.01.2017

  • 2

    Я всегда считал retryWhen несколько низкоуровневым, поэтому для экспоненциального отката я использую конструктор (например, Abhijit), который протестирован и доступен для RxJava 1.x по адресу rxjava-extras. Я бы предложил использовать версию с ограничением, чтобы экспоненциальное увеличение задержки не выходило за пределы максимального значения, которое вы определяете.

    Вот как вы его используете:

    observable.retryWhen(
        RetryWhen.exponentialBackoff(
            delay, maxDelay, TimeUNIT.SECONDS)
        .build());
    

    Я не согласен с тем, что retryWhen содержит ошибки, но если вы обнаружите ошибку, сообщите об этом в RxJava. Ошибки исправляются быстро!

    Вам понадобится rxjava-extras 0.8.0.6 или более поздняя версия, которая находится на Maven Central:

    <dependency>
        <groupId>com.github.davidmoten</groupId>
        <artifactId>rxjava-extras</artifactId>
        <version>0.8.0.6</version>
    </dependency>
    

    Сообщите мне, нужна ли вам версия RxJava 2.x. Те же функции доступны в rxjava2-extras начиная с версии 0.1.4.

    17.01.2017
  • Я знал, что где-то это видел! Я не хочу изобретать велосипед здесь, поэтому я, вероятно, пойду с этим и посмотрю на реализацию, чтобы увидеть, как я должен был это сделать. 17.01.2017
  • Сегодня я заметил, что забыл реализовать максимальное отступление, но похоже, что эта сигнатура метода не существует для меня при использовании версии 0.8.0.6 20.01.2017
  • На днях я очень спешил, поэтому я сказал себе, что вернусь к фрагменту кода, для которого он мне нужен, похоже, это решение не имеет того поведения, которое я ожидал, и повторная попытка должна быть сброшена после успешного вызова (что имеет смысл, поскольку для этого требуется внеполосная связь). Я думаю, что подход, описанный ниже, - это то, что мне нужно в моей текущей ситуации, этот подход кажется оптимизированным для более распространенной ситуации повторных попыток, пока он не сработает, по сравнению с повторными попытками всегда, и откладывать дольше, если он не работает. 20.01.2017
  • Как насчет repeat ().retryWhen() для выполнения требования "всегда повторять попытки"? 25.01.2017
  • Это то, к чему я пришел, основываясь на ответе retryWhen observable + flatMap. Я думаю, что мне не хватало двух вещей: flatMap при retryWhen observable вместо того, чтобы просто возвращать новый, и not возвращать наблюдаемый источник изнутри flatMap (что вызвало небольшую утечку памяти и не т работать правильно) 25.01.2017

  • 3

    Вы можете использовать оператор retryWhen для настройки задержки при отсутствии соединения. Как периодически выдавать элементы - отдельная тема (поищите операторы interval или timer). Если не можете разобраться, задайте отдельный вопрос.

    В моем Github есть обширный пример, но здесь я расскажу вам суть.

    RetryWithDelay retryWithDelay = RetryWithDelay.builder()
        .retryDelayStrategy(RetryDelayStrategy.RETRY_COUNT)
        .build()
    
    Single.fromCallable(() -> {
        ...
    }).retryWhen(retryWithDelay)
    .subscribe(j -> {
        ...
    })
    

    RetryWithDelay определяется следующим образом. Я использовал RxJava 2.x, поэтому, если вы используете 1.x, подпись должна быть Func1<Observable<? extends Throwable>, Observable<Object>>.

    public class RetryWithDelay implements
            Function<Flowable<? extends Throwable>, Publisher<Object>> {
        ...
    }
    

    RetryWithDelay класс.

    RetryStrategy перечисление.

    Это позволяет мне настраивать различные виды тайм-аутов, постоянные, линейные, экспоненциальные, на основе RetryDelayStrategy. Для вашего варианта использования вы должны выбрать CONSTANT_DELAY_TIMES_RETRY_COUNT стратегию задержки и вызвать retryDelaySeconds(2) при создании RetryWithDelay.

    retryWhen - сложный, возможно, даже глючный оператор. В большинстве примеров, которые вы найдете в Интернете, используется оператор range, который завершится ошибкой, если не будет предпринято никаких повторных попыток. Подробнее см. Мой ответ здесь.

    17.01.2017
    Новые материалы

    Объяснение документов 02: BERT
    BERT представил двухступенчатую структуру обучения: предварительное обучение и тонкая настройка. Во время предварительного обучения модель обучается на неразмеченных данных с помощью..

    Как проанализировать работу вашего классификатора?
    Не всегда просто знать, какие показатели использовать С развитием глубокого обучения все больше и больше людей учатся обучать свой первый классификатор. Но как только вы закончите..

    Работа с цепями Маркова, часть 4 (Машинное обучение)
    Нелинейные цепи Маркова с агрегатором и их приложения (arXiv) Автор : Бар Лайт Аннотация: Изучаются свойства подкласса случайных процессов, называемых дискретными нелинейными цепями Маркова..

    Crazy Laravel Livewire упростил мне создание электронной коммерции (панель администратора и API) [Часть 3]
    Как вы сегодня, ребята? В этой части мы создадим CRUD для данных о продукте. Думаю, в этой части я не буду слишком много делиться теорией, но чаще буду делиться своим кодом. Потому что..

    Использование машинного обучения и Python для классификации 1000 сезонов новичков MLB Hitter
    Чему может научиться машина, глядя на сезоны новичков 1000 игроков MLB? Это то, что исследует это приложение. В этом процессе мы будем использовать неконтролируемое обучение, чтобы..

    Учебные заметки: создание моего первого пакета Node.js
    Это мои обучающие заметки, когда я научился создавать свой самый первый пакет Node.js, распространяемый через npm. Оглавление Глоссарий I. Новый пакет 1.1 советы по инициализации..

    Забудьте о Matplotlib: улучшите визуализацию данных с помощью умопомрачительных функций Seaborn!
    Примечание. Эта запись в блоге предполагает базовое знакомство с Python и концепциями анализа данных. Привет, энтузиасты данных! Добро пожаловать в мой блог, где я расскажу о невероятных..


    Для любых предложений по сайту: [email protected]