При параллельном вызове ограничьте выполнение в секунду


Используя TPL / Parallel.Например, существует простой способ ограничить количество вызовов метода в единицу времени (то есть не более 50 вызовов в секунду). Это отличается от ограничения количества потоков. Может быть, есть какой-то простой хак, чтобы заставить это работать?

3 3

3 ответа:

Одним из решений является создание потокобезопасной версии следующего https://stackoverflow.com/a/7728872/356790

/// <summary>
/// This class limits the number of requests (method calls, events fired, etc.) that can occur in a given unit of time.
/// </summary>
class RequestLimiter
{

    #region Constructors

    /// <summary>
    /// Initializes an instance of the RequestLimiter class.
    /// </summary>
    /// <param name="maxRequests">The maximum number of requests that can be made in a given unit of time.</param>
    /// <param name="timeSpan">The unit of time that the maximum number of requests is limited to.</param>
    /// <exception cref="ArgumentException">maxRequests &lt;= 0</exception>
    /// <exception cref="ArgumentException">timeSpan.TotalMilliseconds &lt;= 0</exception>
    public RequestLimiter( int maxRequests , TimeSpan timeSpan )
    {
        // check parameters
        if ( maxRequests <= 0 )
        {
            throw new ArgumentException( "maxRequests <= 0" , "maxRequests" );
        }
        if ( timeSpan.TotalMilliseconds <= 0 )
        {
            throw new ArgumentException( "timeSpan.TotalMilliseconds <= 0" , "timeSpan" );
        }

        // initialize instance vars
        _maxRequests = maxRequests;
        _timeSpan = timeSpan;
        _requestTimes = new Queue<DateTime>( maxRequests );

        // sleep for 1/10th timeSpan
        _sleepTimeInMs = Convert.ToInt32( Math.Ceiling( timeSpan.TotalMilliseconds / 10 ) );
    }

    #endregion

    /// <summary>
    /// Waits until an request can be made
    /// </summary>
    public void WaitUntilRequestCanBeMade()
    {
        while ( !TryEnqueueRequest() )
        {
            Thread.Sleep( _sleepTimeInMs );
        }
    }

    #region Private Members

    private readonly Queue<DateTime> _requestTimes;
    private readonly object _requestTimesLock = new object();
    private readonly int _maxRequests;
    private readonly TimeSpan _timeSpan;
    private readonly int _sleepTimeInMs;

    /// <summary>
    /// Remove requests that are older than _timeSpan
    /// </summary>
    private void SynchronizeQueue()
    {
        while ( ( _requestTimes.Count > 0 ) && ( _requestTimes.Peek().Add( _timeSpan ) < DateTime.Now ) )
        {
            _requestTimes.Dequeue();
        }
    }

    /// <summary>
    /// Attempts to enqueue a request.
    /// </summary>
    /// <returns>
    /// Returns true if the request was successfully enqueued.  False if not.
    /// </returns>
    private bool TryEnqueueRequest()
    {
        lock ( _requestTimesLock )
        {
            SynchronizeQueue();
            if ( _requestTimes.Count < _maxRequests )
            {
                _requestTimes.Enqueue( DateTime.Now );
                return true;
            }
            return false;
        }
    }

    #endregion

}

Готовые примеры кода с использованием таймера:

Примеры кода с использованием реактивных расширений (Rx):

Это решение обеспечивает задержку между запуском каждого потока и может быть использовано для выполнения ваших требований.

    private SemaphoreSlim CooldownLock = new SemaphoreSlim(1, 1);
    private DateTime lastAction;

    private void WaitForCooldown(TimeSpan delay)
    {
        CooldownLock.Wait();

        var waitTime = delay - (DateTime.Now - lastAction);

        if (waitTime > TimeSpan.Zero)
        {
            Task.Delay(waitTime).Wait();
            lastAction = DateTime.Now;
        }

        lastAction = DateTime.Now;

        CooldownLock.Release();
    }

    public void Execute(Action[] actions, int concurrentThreadLimit, TimeSpan threadDelay)
    {
        if (actions.Any())
        {
            Parallel.ForEach(actions, 
                             new ParallelOptions() { MaxDegreeOfParallelism = concurrentThreadLimit}, 
                            (currentAction) =>
                            {
                                WaitForCooldown(threadDelay);
                                currentAction();
                            });
        }
    }