производитель / гибридный потребитель в C#, использующий классы платформы 4.0 и блокирующую коллекцию
У меня есть ситуация, в которой у меня есть сценарий производителя/потребителя. Производитель никогда не останавливается, что означает, что даже если есть время, когда нет элементов в BC, другие элементы могут быть добавлены позже.
Переходя от .NET Framework 3.5 к 4.0, я решил использоватьBlockingCollection
в качестве параллельной очереди между потребителем и производителем. Я даже добавил некоторые параллельные расширения, чтобы я мог использовать BC с Parallel.ForEach
.
Проблема в том, что в потоке потребителей мне нужно иметь вид гибридной модели:
- я всегда проверяю BC, чтобы обработать любой элемент, который прибыл с a
Parallel.ForEach(bc.GetConsumingEnumerable(), item => etc
- внутри этого
foreach
я выполняю все задачи, которые не зависят друг от друга. - вот в чем проблема. После паралелизации предыдущих заданий мне нужно управлять их результатами в том же порядке FIFO, в котором они были в БК. Обработка этих результатов должна производиться в потоке синхронизации.
Небольшой пример в псевдокоде образом:
Продюсер:
//This event is triggered each time a page is scanned. Any batch of new pages can be added at any time at the scanner
private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{
//The object to add has a property with the sequence number
_concurrentCollection.TryAdd(scannedPage);
}
Потребитель:
private void Init()
{
_cancelTasks = false;
_checkTask = Task.Factory.StartNew(() =>
{
while (!_cancelTasks)
{
//BlockingCollections with Paralell ForEach
var bc = _concurrentCollection;
Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
{
ScannedPage currentPage = item;
// process a batch of images from the bc and check if an image has a valid barcode. T
});
//Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.
}
});
}
Очевидно, что это не может работать так, как есть, потому что .GetConsumingEnumerable()
блокирует, пока не появится другой элемент в БК. Я предполагаю, что я мог бы сделать это с задачами и просто запустить 4 или 5 задач в одном пакете, но:
- Как я мог бы сделать это с задачами и все еще иметь точку ожидания перед началом задач, которая блокирует до тех пор, пока не появится элемент, который будет потребляться в BC (я не хочу начинать обработку, если ничего нет. Когда-то есть что-то в БК я бы просто запустил пакет из 4 задач и использовал
TryTake
внутри каждой из них, так что если нет ничего, чтобы взять, они не блокируют, потому что я не знаю, всегда ли я могу достичь количества элементов из БК как пакет задач, например, только один элемент остался в БК и пакет из 4 задач) ? - Как я мог бы сделать это и воспользоваться эффективностью этой параллели.Для предложений?
- Как я мог бы сохранить результаты задач в том же порядке FIFO, в котором предметы были извлечены из БК? Существует ли какой-либо другой класс параллелизма, более подходящий для такого рода гибридной обработки элементов в потребителе?
- Кроме того, это мой первый вопрос, когда-либо сделанный в StackOverflow, поэтому, если вам нужны дополнительные данные или вы просто думаете, что мой вопрос не правильный, просто дайте мне знать.
1 ответ:
Я думаю, что понимаю, о чем вы спрашиваете, почему бы не создать ConcurrentBag и не добавить к нему во время обработки вот так:
while (!_cancelTasks) { //BlockingCollections with Paralell ForEach var bc = _concurrentCollection; var q = new ConcurrentBag<ScannedPage>(); Parallel.ForEach(bc.GetConsumingEnumerable(), item => { ScannedPage currentPage = item; q.Add(item); // process a batch of images from the bc and check if an image has a valid barcode. T }); //Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread. //process items in your list here by sorting using some sequence key var items = q.OrderBy( o=> o.SeqNbr).ToList(); foreach( var item in items){ ... } }
Очевидно, что это не ставит их в очередь в точном порядке, в котором они были добавлены к BC, но вы можете добавить некоторую последовательность nbr к объекту ScannedPage, как предложил Алекс, а затем отсортировать результаты после.
Вот как я буду обрабатывать последовательность:
Добавьте это в класс
ScannedPage
:public static int _counter; //public because this is just an example but it would work.
Получаем последовательность nbr и присваиваем здесь:
private void Current_OnPageScanned(object sender, ScannedPage scannedPage) { lock( this){ //to single thread this process.. not necessary if it's already single threaded of course. System.Threading.Interlocked.Increment( ref ScannedPage._counter); scannedPage.SeqNbr = ScannedPage._counter; ... } }