Выполнение параллельных задач в C#
У меня есть сервис, который должен читать сообщения от Amazon SQS как можно быстрее. Мы ожидаем интенсивный трафик, и я хотел бы иметь возможность читать более 10 тысяч сообщений в секунду. К сожалению, в настоящее время я нахожусь на уровне около 10 сообщений в секунду. Ясно, что у меня есть работа.
Вот что я использую (преобразовано в консольное приложение, чтобы упростить тестирование):
private static int _concurrentRequests;
private static int _maxConcurrentRequests;
public static void Main(string[] args) {
_concurrentRequests = 0;
_maxConcurrentRequests = 100;
var timer = new Timer();
timer.Elapsed += new ElapsedEventHandler(OnTimedEvent);
timer.Interval = 10;
timer.Enabled = true;
Console.ReadLine();
timer.Dispose();
}
public static void OnTimedEvent(object s, ElapsedEventArgs e) {
if (_concurrentRequests < _maxConcurrentRequests) {
_concurrentRequests++;
ProcessMessages();
}
}
public static async Task ProcessMessages() {
var manager = new MessageManager();
manager.ProcessMessages(); // this is an async method that reads in the messages from SQS
_concurrentRequests--;
}
Я не получаю даже близко 100 одновременных запросов, и кажется, что он не запускает OnTimedEvent
каждые 10 миллисекунды.
Я не уверен, что Timer
здесь правильный подход. У меня нет большого опыта в такого рода кодировании. В данный момент я готов попробовать все, что угодно.
Обновить
Благодаря калеббойду я немного приблизился к достижению своей цели. Вот действительно плохой код:private static SemaphoreSlim _locker;
public static void Main(string[] args) {
_manager = new MessageManager();
RunBatchProcessingForeverAsync();
}
private static async Task RunBatchProcessingForeverAsync() {
_locker = new SemaphoreSlim(10, 10);
while (true) {
Thread thread = new Thread(new ParameterizedThreadStart(Process));
thread.Start();
}
}
private static async void Process(object args) {
_locker.WaitAsync();
try {
await _manager.ProcessMessages();
}
finally {
_locker.Release();
}
}
Я могу приблизиться к чтению приличного количества сообщений в секунду с этим, но проблема в том, что мой ProcessMessages
вызов никогда не заканчивается (или, возможно, это будет после очень долгого времени время). Я думаю, что мне, вероятно, нужно ограничить количество потоков, которые я запускаю в любой момент времени.
Какие-нибудь предложения о том, как я могу улучшить этот код, чтобы ProcessMessages
имел шанс закончить?
3 ответа:
Как и предлагал @calebboyd, сначала необходимо сделать поток асинхронным. А теперь, если ты пойдешь сюда ... - где использовать параллелизм при вызове API, вы увидите, что одного асинхронного потока достаточно для быстрого объединения сетевого ресурса. Если вы можете получить от amazon несколько сообщений в одном запросе, то ваш поток - производитель (тот, который делает асинхронные вызовы amazon) будет просто прекрасен-он может отправлять сотни запросов в секунду. Это не будет ваше бутылочное горлышко. Однако, задачи продолжения, в которых обрабатываются полученные данные, передаются в пул потоков. Здесь у вас есть шанс на бутылочное горлышко - предположим, что 100 ответов приходят каждую секунду, каждый ответ содержит 100 сообщений (чтобы достичь вашего приближения 10K msgs/sec). Каждую секунду у вас появляется 100 новых задач, каждая из которых потребует от вашего потока обработки 100 сообщений. Теперь есть два варианта: (1) обработка этих сообщений не связана с ЦП - вы просто отправляете их в свою БД, или (2), вы выполняете ЦП потребляя вычисления, например научные вычисления, сериализации или некоторые тяжелые бизнес-логики. Если (1) - это ваш случай, то узкое место отодвигается назад к вам DB. Если (2), то у вас нет выбора, кроме как увеличить / уменьшить масштаб или оптимизировать вычисления. Но ваше узкое место, вероятно, не является производящим потоком - если он реализован правильно (см. приведенную выше ссылку для примеров).
Поскольку ваш метод
ProcessMessages
на вашем объекте MessageManager не ожидается, Я буду считать, что он привязан к тому же потоку, в котором он выполняется. Простая маркировка функции какasync
не передает работу новому потоку. При таком предположении этот код фактически не выполняется с несколькими потоками. Вы можете использовать следующий код для выполнения кода в большем количестве пула потоков.Вполне вероятно, что объект manager не может обрабатывать одновременное использование. Поэтому я создаю его в задаче.Бежать лямбда. Это также может быть дорого и, следовательно, непрактично.
async Task RunBatchProcessingForeverAsync () { var lock = new SemaphoreSlim(initialCount: 10); while (true) { await lock.WaitAsync(); Task.Run(() => { try { var manager = new MessageManager(); manager.ProcessMessages(); } finally { lock.Release(); } }); } }
Я давно не писал C#, но это должно запускать ваш метод 10 раз одновременно, многократно, навсегда.