используя поток данных TPL, могу ли я отменить все записи, а затем добавить одну?
С библиотекой потоков данных TPL я хотел бы сделать что-то вроде этого:
myActionBlock.Post(newValue, cancelAllPreviousPosts: true);
Похоже, что маркер отмены на ActionBlock отменяет все это; мне придется создать новый блок действий, если я его установлю. Можно ли сделать частичную отмену с помощью ActionBlock?
Сообщения, которые еще не обработаны, не должны быть использованы. Было бы неплохо, если бы был какой-то маркер отмены, доступный для проверки в текущей выполняемой должности.
3 ответа:
Взгляните на
BroadcastBlock<T>
, который содержит только самый последний элемент, опубликованный в нем. Вы можете поместить широковещательный блок передActionBlock<T>
.При отправке нового элемента в широковещательный блок не отменяется элемент, обрабатываемый в данный момент блоком действий, а перезаписывается любой существующий элемент, уже находящийся в широковещательном блоке; фактически отбрасываются все старые сообщения, еще не обработанные блоком действий. Когда блок действий завершит свой текущий элемент, он займет больше всего времени. последний пункт, опубликованный в блоке широковещания.
В дополнение к ответу Монро Томаса важно понимать, что блоку действий, следующему за блоком широковещания, требуется его ограниченная пропускная способность, ограниченная 1, или он будет хранить и обрабатывать каждое сообщение широковещательного блока, даже когда он все еще выполняется.
Пример кода можно найти здесь:ActionBlock<int> ExecuteBlock = new ActionBlock<int>(async ThisNumber => { await Task.Delay(100); Console.WriteLine($">{ThisNumber}"); }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); BroadcastBlock<int> ThrottleBlock = new BroadcastBlock<int>(null); ThrottleBlock.LinkTo(ExecuteBlock, new DataflowLinkOptions { PropagateCompletion = true }); for(int IX = 0; IX < 128; IX++) { await ThrottleBlock.SendAsync(IX); await Task.Delay(10); }
Это приводит к следующему:
>0 >6 >12 >20 >27 >34 >41 >48 >55 >62 >68 >75 >82 >88 >95 >101 >108 >115 >122 >127
Наслаждайтесь!
- Симон
Нет ничего подобного непосредственно в потоке данных TPL, но я вижу несколько способов, как вы могли бы реализовать его самостоятельно:
Если вам не нужно обрабатывать модифицированный блок как обычный блок потока данных (например, нет поддержки
LinkTo()
), то простой способ-написать тип, который обертываетActionBlock
, но элементы которого также содержат флаг, который говорит, следует ли их обрабатывать. Когда вы задаетеcancelAllPreviousPosts: true
, Все эти флаги сбрасываются, поэтому эти элементы будут пропущенный.Код может выглядеть примерно так:
class CancellableActionBlock<T> { private class Item { public T Data { get; private set; } public bool ShouldProcess { get; set; } public Item(T data) { Data = data; ShouldProcess = true; } } private readonly ActionBlock<Item> actionBlock; private readonly ConcurrentDictionary<Item, bool> itemSet; public CancellableActionBlock(Action<T> action) { itemSet = new ConcurrentDictionary<Item, bool>(); actionBlock = new ActionBlock<Item>(item => { bool ignored; itemSet.TryRemove(item, out ignored); if (item.ShouldProcess) { action(item.Data); } }); } public bool Post(T data, bool cancelAllPreviousPosts = false) { if (cancelAllPreviousPosts) { foreach (var item in itemSet.Keys) { item.ShouldProcess = false; } itemSet.Clear(); } var newItem = new Item(data); itemSet.TryAdd(newItem, true); return actionBlock.Post(newItem); } // probably other members that wrap actionBlock members, // like Complete() and Completion }
Если вы хотите создать что-то более составное и многоразовое, вы можете создать специальный блок только для этой отмены. Вы можете реализовать это, используя thee
BufferBlock
S, связанные вместе, где третий будет иметь емкость 1, а второй-неограниченную емкость. Таким образом, почти все элементы в очереди будут находиться во втором блоке, поэтому вы можете выполнить отмену, просто поменяв этот блок на новый. Вся структура будет представленаEncapsulate()
в первом и третьем блоке.Проблема с этим подходом заключается в том, что отмена имеет задержку в 1 пункт (тот, который находится в третьем блоке). Кроме того, я не нашел хорошего интерфейса для этого.