производитель / гибридный потребитель в C#, использующий классы платформы 4.0 и блокирующую коллекцию


У меня есть ситуация, в которой у меня есть сценарий производителя/потребителя. Производитель никогда не останавливается, что означает, что даже если есть время, когда нет элементов в BC, другие элементы могут быть добавлены позже.

Переходя от .NET Framework 3.5 к 4.0, я решил использовать BlockingCollection в качестве параллельной очереди между потребителем и производителем. Я даже добавил некоторые параллельные расширения, чтобы я мог использовать BC с Parallel.ForEach.

Проблема в том, что в потоке потребителей мне нужно иметь вид гибридной модели:

  1. я всегда проверяю BC, чтобы обработать любой элемент, который прибыл с a Parallel.ForEach(bc.GetConsumingEnumerable(), item => etc
  2. внутри этого foreach я выполняю все задачи, которые не зависят друг от друга.
  3. вот в чем проблема. После паралелизации предыдущих заданий мне нужно управлять их результатами в том же порядке FIFO, в котором они были в БК. Обработка этих результатов должна производиться в потоке синхронизации.

Небольшой пример в псевдокоде образом:

Продюсер:

//This event is triggered each time a page is scanned. Any batch of new pages can be added at any time at the scanner
private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{          
     //The object to add has a property with the sequence number
    _concurrentCollection.TryAdd(scannedPage);
}

Потребитель:

private void Init()
{
    _cancelTasks = false;
    _checkTask = Task.Factory.StartNew(() =>
            {
                while (!_cancelTasks)
                {
                    //BlockingCollections with Paralell ForEach
                    var bc = _concurrentCollection;
                    Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
                    {
                        ScannedPage currentPage = item;
                        // process a batch of images from the bc and check if an image has a valid barcode. T
                    });
                    //Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.

                }
            });
}
Очевидно, что это не может работать так, как есть, потому что .GetConsumingEnumerable() блокирует, пока не появится другой элемент в БК. Я предполагаю, что я мог бы сделать это с задачами и просто запустить 4 или 5 задач в одном пакете, но:
  1. Как я мог бы сделать это с задачами и все еще иметь точку ожидания перед началом задач, которая блокирует до тех пор, пока не появится элемент, который будет потребляться в BC (я не хочу начинать обработку, если ничего нет. Когда-то есть что-то в БК я бы просто запустил пакет из 4 задач и использовал TryTake внутри каждой из них, так что если нет ничего, чтобы взять, они не блокируют, потому что я не знаю, всегда ли я могу достичь количества элементов из БК как пакет задач, например, только один элемент остался в БК и пакет из 4 задач) ?
  2. Как я мог бы сделать это и воспользоваться эффективностью этой параллели.Для предложений?
  3. Как я мог бы сохранить результаты задач в том же порядке FIFO, в котором предметы были извлечены из БК?
  4. Существует ли какой-либо другой класс параллелизма, более подходящий для такого рода гибридной обработки элементов в потребителе?
  5. Кроме того, это мой первый вопрос, когда-либо сделанный в StackOverflow, поэтому, если вам нужны дополнительные данные или вы просто думаете, что мой вопрос не правильный, просто дайте мне знать.
1 4

1 ответ:

Я думаю, что понимаю, о чем вы спрашиваете, почему бы не создать ConcurrentBag и не добавить к нему во время обработки вот так:

while (!_cancelTasks)
{
   //BlockingCollections with Paralell ForEach
   var bc = _concurrentCollection;
   var q = new ConcurrentBag<ScannedPage>();
   Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
   {
      ScannedPage currentPage = item;
      q.Add(item);
      // process a batch of images from the bc and check if an image has a valid barcode. T
   });
 //Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.


  //process items in your list here by sorting using some sequence key
  var items = q.OrderBy( o=> o.SeqNbr).ToList();
  foreach( var item in items){
     ...
  }
}

Очевидно, что это не ставит их в очередь в точном порядке, в котором они были добавлены к BC, но вы можете добавить некоторую последовательность nbr к объекту ScannedPage, как предложил Алекс, а затем отсортировать результаты после.

Вот как я буду обрабатывать последовательность:

Добавьте это в класс ScannedPage:

public static int _counter;  //public because this is just an example but it would work.

Получаем последовательность nbr и присваиваем здесь:

private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{          
    lock( this){   //to single thread this process.. not necessary if it's already single threaded of course.
    System.Threading.Interlocked.Increment( ref ScannedPage._counter);
    scannedPage.SeqNbr = ScannedPage._counter;
    ...
    }
}