Метод дросселирования вызывает M запросов за N секунд


Мне нужен компонент / класс, который регулирует выполнение некоторого метода до максимума M вызовов за N секунд (или ms или nanos, не имеет значения).

другими словами Мне нужно убедиться, что мой метод выполняется не более M раз в скользящем окне N секунд.

Если вы не знаете существующий класс, не стесняйтесь публиковать свои решения / идеи, как бы вы это реализовали.

15 109

15 ответов:

Я бы использовать кольцевой буфер временных меток с фиксированным размером M. каждый раз, когда вызывается метод, вы проверяете самую старую запись, и если она меньше N секунд в прошлом, вы выполняете и добавляете другую запись, иначе вы спите для разницы во времени.

то, что сработало из коробки для меня было Google Guava RateLimiter.

// Allow one request per second
private RateLimiter throttle = RateLimiter.create(1.0);

private void someMethod() {
    throttle.acquire();
    // Do something
}

в конкретные сроки, вы должны быть в состоянии реализовать это с помощью DelayQueue. Инициализируйте очередь с помощью MDelayed экземпляры с их задержкой изначально равны нулю. По мере поступления запросов к методу,take маркер, который вызывает блокировку метода до тех пор, пока не будет выполнено требование регулирования. Когда жетон был взят,add новый токен в очередь с задержкой N.

Это зависит от приложения.

представьте себе случай, в котором несколько потоков нужен маркер, чтобы сделать некоторые глобальный тариф-ограниченное действие С взрыв запрещен (т. е. вы хотите ограничить 10 действий за 10 секунд, но вы не хотите, чтобы 10 действий произошли в первую секунду, а затем оставались 9 секунд остановленными).

DelayedQueue имеет недостаток: порядок, в котором потоки запрашивают токены, может не быть порядком в что они получают их запрос выполнен. Если несколько потоков заблокированы в ожидании маркера, неясно, какой из них будет принимать следующий доступный маркер. С моей точки зрения, у вас даже могут быть нити, ждущие вечно.

одно решение-иметь минимальный интервал времени между двумя последовательными действиями, и принять меры, в том же порядке, как они были запрошены.

вот реализация:

public class LeakyBucket {
    protected float maxRate;
    protected long minTime;
    //holds time of last action (past or future!)
    protected long lastSchedAction = System.currentTimeMillis();

    public LeakyBucket(float maxRate) throws Exception {
        if(maxRate <= 0.0f) {
            throw new Exception("Invalid rate");
        }
        this.maxRate = maxRate;
        this.minTime = (long)(1000.0f / maxRate);
    }

    public void consume() throws InterruptedException {
        long curTime = System.currentTimeMillis();
        long timeLeft;

        //calculate when can we do the action
        synchronized(this) {
            timeLeft = lastSchedAction + minTime - curTime;
            if(timeLeft > 0) {
                lastSchedAction += minTime;
            }
            else {
                lastSchedAction = curTime;
            }
        }

        //If needed, wait for our time
        if(timeLeft <= 0) {
            return;
        }
        else {
            Thread.sleep(timeLeft);
        }
    }
}

Если вам нужен ограничитель скорости скользящего окна на основе Java, который будет работать в распределенной системе, вы можете взглянуть на https://github.com/mokies/ratelimitj проект.

конфигурация с поддержкой Redis, ограничивающая запросы по IP до 50 в минуту, будет выглядеть следующим образом:

import com.lambdaworks.redis.RedisClient;
import es.moki.ratelimitj.core.LimitRule;

RedisClient client = RedisClient.create("redis://localhost");
Set<LimitRule> rules = Collections.singleton(LimitRule.of(1, TimeUnit.MINUTES, 50)); // 50 request per minute, per key
RedisRateLimit requestRateLimiter = new RedisRateLimit(client, rules);

boolean overLimit = requestRateLimiter.overLimit("ip:127.0.0.2");

посмотреть https://github.com/mokies/ratelimitj/tree/master/ratelimitj-redis план дополнительную информацию о конфигурации Redis для.

хотя это не то, что вы просили, ThreadPoolExecutor, который предназначен для ограничения до M одновременных запросов вместо M запросов за N секунд, также может быть полезен.

оригинальный вопрос звучит очень похоже на проблему, решенную в этом блоге: Java Многоканальный Асинхронный Дроссель.

для скорости M вызовов в N секундах дроссель, обсуждаемый в этом блоге, гарантирует, что любой интервал длины N на временной шкале не будет содержать более M вызовов.

я реализовал простой алгоритм дросселирования.Попробуйте эту ссылку, http://krishnaprasadas.blogspot.in/2012/05/throttling-algorithm.html

краткое описание алгоритма,

этот алгоритм использует возможности Java Отложенная Очередь. Создайте задержана объект с ожидаемой задержкой (здесь 1000/м за миллисекунду TimeUnit). Поместите тот же объект в задержанную очередь, которая будет интерн предоставляет движущееся окно для нас. Затем перед каждым вызовом метода взять объект формирует очередь, take-это блокирующий вызов, который вернется только после указанной задержки, а после вызова метода не забудьте поставить объект в очередь с обновленным временем(здесь текущие миллисекунды).

здесь мы также можем иметь несколько задержанных объектов с разной задержкой. Такой подход также обеспечит высокую пропускную способность.

Мне нужно убедиться, что мой метод выполняется не более M раз в раздвижное окно N секунд.

недавно я написал сообщение в блоге о том, как это сделать в .NET. вы можете создать что-то подобное в Java.

лучшее ограничение скорости в .NET

попробуйте использовать этот простой подход:

public class SimpleThrottler {

private static final int T = 1; // min
private static final int N = 345;

private Lock lock = new ReentrantLock();
private Condition newFrame = lock.newCondition();
private volatile boolean currentFrame = true;

public SimpleThrottler() {
    handleForGate();
}

/**
 * Payload
 */
private void job() {
    try {
        Thread.sleep(Math.abs(ThreadLocalRandom.current().nextLong(12, 98)));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.err.print(" J. ");
}

public void doJob() throws InterruptedException {
    lock.lock();
    try {

        while (true) {

            int count = 0;

            while (count < N && currentFrame) {
                job();
                count++;
            }

            newFrame.await();
            currentFrame = true;
        }

    } finally {
        lock.unlock();
    }
}

public void handleForGate() {
    Thread handler = new Thread(() -> {
        while (true) {
            try {
                Thread.sleep(1 * 900);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                currentFrame = false;

                lock.lock();
                try {
                    newFrame.signal();
                } finally {
                    lock.unlock();
                }
            }
        }
    });
    handler.start();
}

}

Apache Camel также поддерживает поставляется с удава механизм следующим образом:

from("seda:a").throttle(100).asyncDelayed().to("seda:b");

вы можете использовать redis для этого, когда блокировка необходима в распределенной системе. Второй алгоритм в https://redis.io/commands/incr

Это обновление, чтобы приведенный выше код LeakyBucket. Это работает для более чем 1000 запросов в секунду.

import lombok.SneakyThrows;
import java.util.concurrent.TimeUnit;

class LeakyBucket {
  private long minTimeNano; // sec / billion
  private long sched = System.nanoTime();

  /**
   * Create a rate limiter using the leakybucket alg.
   * @param perSec the number of requests per second
   */
  public LeakyBucket(double perSec) {
    if (perSec <= 0.0) {
      throw new RuntimeException("Invalid rate " + perSec);
    }
    this.minTimeNano = (long) (1_000_000_000.0 / perSec);
  }

  @SneakyThrows public void consume() {
    long curr = System.nanoTime();
    long timeLeft;

    synchronized (this) {
      timeLeft = sched - curr + minTimeNano;
      sched += minTimeNano;
    }
    if (timeLeft <= minTimeNano) {
      return;
    }
    TimeUnit.NANOSECONDS.sleep(timeLeft);
  }
}

и unittest для выше:

import com.google.common.base.Stopwatch;
import org.junit.Ignore;
import org.junit.Test;

import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class LeakyBucketTest {
  @Test @Ignore public void t() {
    double numberPerSec = 10000;
    LeakyBucket b = new LeakyBucket(numberPerSec);
    Stopwatch w = Stopwatch.createStarted();
    IntStream.range(0, (int) (numberPerSec * 5)).parallel().forEach(
        x -> b.consume());
    System.out.printf("%,d ms%n", w.elapsed(TimeUnit.MILLISECONDS));
  }
}

Проверьте [TimerTask1 класса. Или ScheduledExecutor.