Выполнение параллельных задач в C#


У меня есть сервис, который должен читать сообщения от Amazon SQS как можно быстрее. Мы ожидаем интенсивный трафик, и я хотел бы иметь возможность читать более 10 тысяч сообщений в секунду. К сожалению, в настоящее время я нахожусь на уровне около 10 сообщений в секунду. Ясно, что у меня есть работа.

Вот что я использую (преобразовано в консольное приложение, чтобы упростить тестирование):

private static int _concurrentRequests;
private static int _maxConcurrentRequests;

public static void Main(string[] args) {
    _concurrentRequests = 0;
    _maxConcurrentRequests = 100;

    var timer = new Timer();
    timer.Elapsed += new ElapsedEventHandler(OnTimedEvent);
    timer.Interval = 10;
    timer.Enabled = true;

    Console.ReadLine();
    timer.Dispose();
}

public static void OnTimedEvent(object s, ElapsedEventArgs e) {
    if (_concurrentRequests < _maxConcurrentRequests) {
        _concurrentRequests++;
        ProcessMessages();
    }
}

public static async Task ProcessMessages() {
    var manager = new MessageManager();
    manager.ProcessMessages();  // this is an async method that reads in the messages from SQS

    _concurrentRequests--;
}

Я не получаю даже близко 100 одновременных запросов, и кажется, что он не запускает OnTimedEvent каждые 10 миллисекунды.

Я не уверен, что Timer здесь правильный подход. У меня нет большого опыта в такого рода кодировании. В данный момент я готов попробовать все, что угодно.

Обновить

Благодаря калеббойду я немного приблизился к достижению своей цели. Вот действительно плохой код:
private static SemaphoreSlim _locker;

public static void Main(string[] args) {
    _manager = new MessageManager();

    RunBatchProcessingForeverAsync();
}
private static async Task RunBatchProcessingForeverAsync() {
    _locker = new SemaphoreSlim(10, 10);
    while (true) {
        Thread thread = new Thread(new ParameterizedThreadStart(Process));
        thread.Start();
    }
}

private static async void Process(object args) {
    _locker.WaitAsync();
    try {
        await _manager.ProcessMessages();
    }
    finally {
        _locker.Release();
    }

}

Я могу приблизиться к чтению приличного количества сообщений в секунду с этим, но проблема в том, что мой ProcessMessages вызов никогда не заканчивается (или, возможно, это будет после очень долгого времени время). Я думаю, что мне, вероятно, нужно ограничить количество потоков, которые я запускаю в любой момент времени.

Какие-нибудь предложения о том, как я могу улучшить этот код, чтобы ProcessMessages имел шанс закончить?

3 2

3 ответа:

Как и предлагал @calebboyd, сначала необходимо сделать поток асинхронным. А теперь, если ты пойдешь сюда ... - где использовать параллелизм при вызове API, вы увидите, что одного асинхронного потока достаточно для быстрого объединения сетевого ресурса. Если вы можете получить от amazon несколько сообщений в одном запросе, то ваш поток - производитель (тот, который делает асинхронные вызовы amazon) будет просто прекрасен-он может отправлять сотни запросов в секунду. Это не будет ваше бутылочное горлышко. Однако, задачи продолжения, в которых обрабатываются полученные данные, передаются в пул потоков. Здесь у вас есть шанс на бутылочное горлышко - предположим, что 100 ответов приходят каждую секунду, каждый ответ содержит 100 сообщений (чтобы достичь вашего приближения 10K msgs/sec). Каждую секунду у вас появляется 100 новых задач, каждая из которых потребует от вашего потока обработки 100 сообщений. Теперь есть два варианта: (1) обработка этих сообщений не связана с ЦП - вы просто отправляете их в свою БД, или (2), вы выполняете ЦП потребляя вычисления, например научные вычисления, сериализации или некоторые тяжелые бизнес-логики. Если (1) - это ваш случай, то узкое место отодвигается назад к вам DB. Если (2), то у вас нет выбора, кроме как увеличить / уменьшить масштаб или оптимизировать вычисления. Но ваше узкое место, вероятно, не является производящим потоком - если он реализован правильно (см. приведенную выше ссылку для примеров).

Поскольку ваш метод ProcessMessages на вашем объекте MessageManager не ожидается, Я буду считать, что он привязан к тому же потоку, в котором он выполняется. Простая маркировка функции как async не передает работу новому потоку. При таком предположении этот код фактически не выполняется с несколькими потоками. Вы можете использовать следующий код для выполнения кода в большем количестве пула потоков.

Вполне вероятно, что объект manager не может обрабатывать одновременное использование. Поэтому я создаю его в задаче.Бежать лямбда. Это также может быть дорого и, следовательно, непрактично.

async Task RunBatchProcessingForeverAsync () {
    var lock = new SemaphoreSlim(initialCount: 10);
    while (true) {
        await lock.WaitAsync();
        Task.Run(() => {
            try {
                var manager = new MessageManager();
                manager.ProcessMessages();
            } finally {
                lock.Release();
            }
        });
    }
}

Я давно не писал C#, но это должно запускать ваш метод 10 раз одновременно, многократно, навсегда.

Я бы предположил, что асинхронные методы помещены в очередь в пуле потоков, который имеет только столько потоков, сколько у вас есть доступных процессоров. Вы можете генерировать 100 запросов, но они все равно выполняются 8 потоками. Попробуйте создать массив из N потоков и использовать их.