Могут Ли Ограниченные Коллекции BlockingCollections Терять Данные Во Время Добавления


У меня есть BlockingCollection (ConcurrentBag, 50000), где я пытаюсь использовать очень маленькую ограниченную емкость в 50 000 для потоков-производителей, чтобы максимизировать количество записей, которые я могу обработать в ConcurrentDictionary потока-потребителя. Производитель работает намного быстрее, чем потребитель, и в противном случае будет потреблять большую часть памяти.

К сожалению, я сразу заметил, что общее количество записей в моем ConcurrentDictionary теперь существенно ниже, чем это должно быть после добавления ограниченной емкости 50,000, когда мои тестовые данные выполняются. Я читал, что блокирующая коллекция .метод add должен блокироваться бесконечно, пока в коллекции не останется места для выполнения add. Однако, по-видимому, это не так.

Вопросы:

  1. Будет блокирующая коллекция .метод add в конечном итоге тайм-аут или молчаливый сбой, если слишком много add вызываются до освобождения емкости в BlockingCollection вверх?

  2. Если ответ на вопрос № 1-да, то сколько добавлений можно выполнить после превышения ограничительной емкости без потери данных?

  3. Если много блокирующих коллекций .вызываются методы add (), которые ожидают / блокируют емкость, и вызывается метод CompleteAdding (), будут ли эти ожидающие / блокирующие добавления продолжать ждать и затем в конечном итоге добавлять или они молча завершатся неудачей?

1 5

1 ответ:

Убедитесь, что, если вы используете BlockingCollection вместе с ConcurrentDictionary, у вас нет BlockingCollection.Метод TryAdd (myobject) спрятан где-то в вашем коде и ошибочно принимается за ConcurrentDictionary.Метод TryAdd (). Коллекции Blockingcollection.TryAdd (myobject) возвратит false и отбросит запрос add, производящий "тихий сбой", если ограничивающая способность BlockingCollection была превышена.

  1. блокирующая коллекция .Метод add не кажется, что "silenty терпит неудачу" или теряет add после превышения ограничивающей способности на большую сумму. Метод add() в конечном итоге приведет к тому, что процесс будет работать без памяти, если их слишком много .add ()'s ждут, чтобы быть добавленными к коллекции BlockingCollection, которая является избыточной емкостью. (Это должно быть очень экстремальным случаем проблем управления потоком)
  2. см. № 1.
  3. мои собственные тесты, похоже, указывают на то, что после вызова метода CompleteAdding() все последующие добавления завершаются неудачей, как описано в MSDN доктора.

Заключительное Замечание Относительно Производительности

Оказывается, что (в моем собственном случае, во всяком случае), используя ограничивающую способность И.Add () на BlockingCollection является очень медленным по сравнению с использованием без ограничения емкости и .TryAdd () в том же самом процессе.

Я добился гораздо лучших результатов в работе, реализовав свою собственную стратегию ограничения пропускной способности. Есть много способов сделать это. Три варианта включают в себя поток.Sleep (), Поток.Spinwait (), или монитор.Wait() используется вместе с монитором.PulseAll (). Когда используется одна из этих стратегий, можно также использовать BlockingCollection.TryAdd () вместо BlockingCollection.Добавьте () и не имейте никакой ограничивающей емкости без потери каких-либо данных или исчерпания памяти. Этот метод также, по-видимому, дает лучшую производительность.

Вы можете выбрать один из трех примеров, основываясь на том, какой сценарий лучше всего подходит для различий в скорости в потоках производителя и потребителя.

Нить.Подождите() Пример:

//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{   //If the bounded capacity has been exceeded
    //place the thread in wait mode 
    Thread.Sleep(SleepTime);
}

Нить.Пример SpinWait ():

//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{   //If the capacity has been exceeded
    //place the thread in wait mode 
    Thread.SpinWait(SpinCount);
}  

Монитор.Wait () Пример

Этот пример требует наличия крючка как со стороны производителя, так и со стороны потребителя.

Код Производителя

//Check to see BlockingCollection capacity has been exceeded.
if (Tokens.Count > 50000)
{ 
    lock (syncLock)
    {   //Double check before waiting
        if (Tokens.Count > 50000)
        {
            Monitor.Wait(syncLock, 1000);
        }
    }
}

Потребительский Код

//Check to see BlockingCollection capacity is back a normal range.
if (Tokens.Count <= 40000)
{ 
    lock (syncLock)
    {   //Double check before waiting
        if (Tokens.Count < 40000)
        {
            Monitor.PulseAll(syncLock);
        }
    }
}