C# производитель / потребитель / наблюдатель?
У меня есть очередь производителя / потребителя, за исключением того, что есть определенные типы объектов. Таким образом, не только любой потребитель может потреблять добавленный объект. Я не хочу делать определенную очередь для каждого типа, так как их слишком много. (Это как бы растягивает определение производителя / потребителя, но я не уверен, что это правильный термин.)
Существует ли такая вещь, как EventWaitHandle, которая позволяет импульсы с параметром? например,myHandle.Set(AddedType = "foo")
. Прямо сейчас я использую Monitor.Wait
, а затем каждый потребитель проверяет, если пульс на самом деле предназначался для них, но это кажется бессмысленным.
Версия pseduocode того, что у меня есть сейчас:
class MyWorker {
public string MyType {get; set;}
public static Dictionary<string, MyInfo> data;
public static void DoWork(){
while(true){
if(Monitor.Wait(data, timeout)){
if (data.ContainsKey(MyType)){
// OK, do work
}
}
}
}
}
Как вы можете видеть, я могу получить импульсы, когда к дикту добавляется другой материал. Меня волнует только то, когда MyType добавляется к дикту. Есть ли способ сделать это? Это не так уж много, но, например, теперь мне приходится вручную обрабатывать таймауты, потому что каждое получение блокировки может быть успешным в течение таймаута, но MyType
никогда не добавляется к dict в timeout
.2 ответа:
Это интересный вопрос. Похоже, что ключом к решению является блокирующий вариант приоритетной очереди . Java имеет
PriorityBlockingQueue
, но, к сожалению, эквивалент для .NET BCL не существует. Однако, как только у вас есть один, реализация проста.class MyWorker { public string MyType {get; set;} public static PriorityBlockingQueue<string, MyInfo> data; public static void DoWork() { while(true) { MyInfo value; if (data.TryTake(MyType, timeout, out value)) { // OK, do work } } } }
Реализовать
PriorityBlockingQueue
не так уж и сложно. Следуя той же схеме, что иBlockingCollection
, используя методы стилейAdd
иTake
, я придумал следующий код.public class PriorityBlockingQueue<TKey, TValue> { private SortedDictionary<TKey, TValue> m_Dictionary = new SortedDictionary<TKey,TValue>(); public void Add(TKey key, TValue value) { lock (m_Dictionary) { m_Dictionary.Add(key, value); Monitor.Pulse(m_Dictionary); } } public TValue Take(TKey key) { TValue value; TryTake(key, TimeSpan.FromTicks(long.MaxValue), out value); return value; } public bool TryTake(TKey key, TimeSpan timeout, out TValue value) { value = default(TValue); DateTime initial = DateTime.UtcNow; lock (m_Dictionary) { while (!m_Dictionary.TryGetValue(key, out value)) { if (m_Dictionary.Count > 0) Monitor.Pulse(m_Dictionary); // Important! TimeSpan span = timeout - (DateTime.UtcNow - initial); if (!Monitor.Wait(m_Dictionary, span)) { return false; } } m_Dictionary.Remove(key); return true; } } }
Это было быстро. реализация и это имеет пару проблем. Во-первых, я его вообще не испытывал. Во-вторых, он использует красно-черное дерево (via
Я должен указать здесь, что это на самом деле не решает бессмысленное поведениеSortedDictionary
) в качестве базовой структуры данных. Это означает, что методTryTake
будет иметь сложность O(log(n)). Очереди приоритетов обычно имеют сложность удаления O (1). Типичная структура данных для очередей приоритетов-это куча , но я нахожу, чтосписки пропусков на самом деле лучше на практике по нескольким причинам. Ни один из них не существует в .NET BCL, который вот почему я использовалSortedDictionary
вместо этого, несмотря на его низкую производительность в этом сценарии.Wait/Pulse
. Он просто инкапсулируется в классPriorityBlockingQueue
. Но, по крайней мере, это, безусловно, очистит основную часть вашего кода.Не похоже, чтобы ваш код обрабатывал несколько объектов на ключ, но это было бы легко добавить с помощью
Queue<MyInfo>
вместо простого старогоMyInfo
При добавлении к ключу. словарь.
Похоже, что вы хотите объединить очередь производителя/потребителя с шаблоном наблюдателя-общий поток потребителя или потоки считывают из очереди, а затем передают событие в требуемый код. В этом случае вы фактически не будете сигнализировать наблюдателю, а просто вызовете его, когда поток-потребитель определит, кто заинтересован в данном рабочем элементе.
Шаблон наблюдателя в .Net обычно реализуется с помощью событий C#. Вам просто нужно будет вызвать обработчик событий для объекта и один или несколько наблюдателей будут вызваны через него. Целевой код сначала должен был бы зарегистрировать себя с наблюдаемым объектом, добавив себя к событию для уведомления о прибытии работы.