Могут Ли Ограниченные Коллекции BlockingCollections Терять Данные Во Время Добавления
У меня есть BlockingCollection (ConcurrentBag, 50000), где я пытаюсь использовать очень маленькую ограниченную емкость в 50 000 для потоков-производителей, чтобы максимизировать количество записей, которые я могу обработать в ConcurrentDictionary потока-потребителя. Производитель работает намного быстрее, чем потребитель, и в противном случае будет потреблять большую часть памяти.
К сожалению, я сразу заметил, что общее количество записей в моем ConcurrentDictionary теперь существенно ниже, чем это должно быть после добавления ограниченной емкости 50,000, когда мои тестовые данные выполняются. Я читал, что блокирующая коллекция .метод add должен блокироваться бесконечно, пока в коллекции не останется места для выполнения add. Однако, по-видимому, это не так.
Вопросы:
-
Будет блокирующая коллекция .метод add в конечном итоге тайм-аут или молчаливый сбой, если слишком много add вызываются до освобождения емкости в BlockingCollection вверх?
-
Если ответ на вопрос № 1-да, то сколько добавлений можно выполнить после превышения ограничительной емкости без потери данных?
-
Если много блокирующих коллекций .вызываются методы add (), которые ожидают / блокируют емкость, и вызывается метод CompleteAdding (), будут ли эти ожидающие / блокирующие добавления продолжать ждать и затем в конечном итоге добавлять или они молча завершатся неудачей?
1 ответ:
Убедитесь, что, если вы используете BlockingCollection вместе с ConcurrentDictionary, у вас нет BlockingCollection.Метод TryAdd (myobject) спрятан где-то в вашем коде и ошибочно принимается за ConcurrentDictionary.Метод TryAdd (). Коллекции Blockingcollection.TryAdd (myobject) возвратит false и отбросит запрос add, производящий "тихий сбой", если ограничивающая способность BlockingCollection была превышена.
- блокирующая коллекция .Метод add не кажется, что "silenty терпит неудачу" или теряет add после превышения ограничивающей способности на большую сумму. Метод add() в конечном итоге приведет к тому, что процесс будет работать без памяти, если их слишком много .add ()'s ждут, чтобы быть добавленными к коллекции BlockingCollection, которая является избыточной емкостью. (Это должно быть очень экстремальным случаем проблем управления потоком)
- см. № 1.
- мои собственные тесты, похоже, указывают на то, что после вызова метода CompleteAdding() все последующие добавления завершаются неудачей, как описано в MSDN доктора.
Заключительное Замечание Относительно Производительности
Оказывается, что (в моем собственном случае, во всяком случае), используя ограничивающую способность И.Add () на BlockingCollection является очень медленным по сравнению с использованием без ограничения емкости и .TryAdd () в том же самом процессе.
Я добился гораздо лучших результатов в работе, реализовав свою собственную стратегию ограничения пропускной способности. Есть много способов сделать это. Три варианта включают в себя поток.Sleep (), Поток.Spinwait (), или монитор.Wait() используется вместе с монитором.PulseAll (). Когда используется одна из этих стратегий, можно также использовать BlockingCollection.TryAdd () вместо BlockingCollection.Добавьте () и не имейте никакой ограничивающей емкости без потери каких-либо данных или исчерпания памяти. Этот метод также, по-видимому, дает лучшую производительность.
Вы можете выбрать один из трех примеров, основываясь на том, какой сценарий лучше всего подходит для различий в скорости в потоках производителя и потребителя.
Нить.Подождите() Пример:
//Check to see if the BlockingCollection's bounded capacity has been exceeded. while (Tokens.Count > 50000) { //If the bounded capacity has been exceeded //place the thread in wait mode Thread.Sleep(SleepTime); }
Нить.Пример SpinWait ():
//Check to see if the BlockingCollection's bounded capacity has been exceeded. while (Tokens.Count > 50000) { //If the capacity has been exceeded //place the thread in wait mode Thread.SpinWait(SpinCount); }
Монитор.Wait () Пример
Этот пример требует наличия крючка как со стороны производителя, так и со стороны потребителя.
Код Производителя
//Check to see BlockingCollection capacity has been exceeded. if (Tokens.Count > 50000) { lock (syncLock) { //Double check before waiting if (Tokens.Count > 50000) { Monitor.Wait(syncLock, 1000); } } }
Потребительский Код
//Check to see BlockingCollection capacity is back a normal range. if (Tokens.Count <= 40000) { lock (syncLock) { //Double check before waiting if (Tokens.Count < 40000) { Monitor.PulseAll(syncLock); } } }