C# производитель / потребитель / наблюдатель?


У меня есть очередь производителя / потребителя, за исключением того, что есть определенные типы объектов. Таким образом, не только любой потребитель может потреблять добавленный объект. Я не хочу делать определенную очередь для каждого типа, так как их слишком много. (Это как бы растягивает определение производителя / потребителя, но я не уверен, что это правильный термин.)

Существует ли такая вещь, как EventWaitHandle, которая позволяет импульсы с параметром? например, myHandle.Set(AddedType = "foo"). Прямо сейчас я использую Monitor.Wait , а затем каждый потребитель проверяет, если пульс на самом деле предназначался для них, но это кажется бессмысленным.

Версия pseduocode того, что у меня есть сейчас:

class MyWorker {
    public string MyType {get; set;}
    public static Dictionary<string, MyInfo> data;

    public static void DoWork(){
        while(true){
             if(Monitor.Wait(data, timeout)){
                   if (data.ContainsKey(MyType)){
                        // OK, do work
                   }
             }
        }
    }
}
Как вы можете видеть, я могу получить импульсы, когда к дикту добавляется другой материал. Меня волнует только то, когда MyType добавляется к дикту. Есть ли способ сделать это? Это не так уж много, но, например, теперь мне приходится вручную обрабатывать таймауты, потому что каждое получение блокировки может быть успешным в течение таймаута, но MyType никогда не добавляется к dict в timeout.
2 6

2 ответа:

Это интересный вопрос. Похоже, что ключом к решению является блокирующий вариант приоритетной очереди . Java имеет PriorityBlockingQueue, но, к сожалению, эквивалент для .NET BCL не существует. Однако, как только у вас есть один, реализация проста.

class MyWorker 
{
    public string MyType {get; set;}
    public static PriorityBlockingQueue<string, MyInfo> data; 

    public static void DoWork()
    {
        while(true)
        {
            MyInfo value;
            if (data.TryTake(MyType, timeout, out value))
            {
                // OK, do work
            }
        }
    }
}

Реализовать PriorityBlockingQueue не так уж и сложно. Следуя той же схеме, что и BlockingCollection, используя методы стилей Add и Take, я придумал следующий код.

public class PriorityBlockingQueue<TKey, TValue>
{
    private SortedDictionary<TKey, TValue> m_Dictionary = new SortedDictionary<TKey,TValue>();

    public void Add(TKey key, TValue value)
    {
        lock (m_Dictionary)
        {
            m_Dictionary.Add(key, value);
            Monitor.Pulse(m_Dictionary);
        }
    }

    public TValue Take(TKey key)
    {
        TValue value;
        TryTake(key, TimeSpan.FromTicks(long.MaxValue), out value);
        return value;
    }

    public bool TryTake(TKey key, TimeSpan timeout, out TValue value)
    {
        value = default(TValue);
        DateTime initial = DateTime.UtcNow;
        lock (m_Dictionary)
        {
            while (!m_Dictionary.TryGetValue(key, out value))
            {
                if (m_Dictionary.Count > 0) Monitor.Pulse(m_Dictionary); // Important!
                TimeSpan span = timeout - (DateTime.UtcNow - initial);
                if (!Monitor.Wait(m_Dictionary, span))
                {
                    return false;
                }
            }
            m_Dictionary.Remove(key);
            return true;
        }
    }
}

Это было быстро. реализация и это имеет пару проблем. Во-первых, я его вообще не испытывал. Во-вторых, он использует красно-черное дерево (via SortedDictionary) в качестве базовой структуры данных. Это означает, что метод TryTake будет иметь сложность O(log(n)). Очереди приоритетов обычно имеют сложность удаления O (1). Типичная структура данных для очередей приоритетов-это куча , но я нахожу, чтосписки пропусков на самом деле лучше на практике по нескольким причинам. Ни один из них не существует в .NET BCL, который вот почему я использовал SortedDictionary вместо этого, несмотря на его низкую производительность в этом сценарии.

Я должен указать здесь, что это на самом деле не решает бессмысленное поведение Wait/Pulse. Он просто инкапсулируется в класс PriorityBlockingQueue. Но, по крайней мере, это, безусловно, очистит основную часть вашего кода.

Не похоже, чтобы ваш код обрабатывал несколько объектов на ключ, но это было бы легко добавить с помощью Queue<MyInfo> вместо простого старого MyInfo При добавлении к ключу. словарь.

Похоже, что вы хотите объединить очередь производителя/потребителя с шаблоном наблюдателя-общий поток потребителя или потоки считывают из очереди, а затем передают событие в требуемый код. В этом случае вы фактически не будете сигнализировать наблюдателю, а просто вызовете его, когда поток-потребитель определит, кто заинтересован в данном рабочем элементе.

Шаблон наблюдателя в .Net обычно реализуется с помощью событий C#. Вам просто нужно будет вызвать обработчик событий для объекта и один или несколько наблюдателей будут вызваны через него. Целевой код сначала должен был бы зарегистрировать себя с наблюдаемым объектом, добавив себя к событию для уведомления о прибытии работы.