используя поток данных TPL, могу ли я отменить все записи, а затем добавить одну?


С библиотекой потоков данных TPL я хотел бы сделать что-то вроде этого:

myActionBlock.Post(newValue, cancelAllPreviousPosts: true);

Похоже, что маркер отмены на ActionBlock отменяет все это; мне придется создать новый блок действий, если я его установлю. Можно ли сделать частичную отмену с помощью ActionBlock?

Сообщения, которые еще не обработаны, не должны быть использованы. Было бы неплохо, если бы был какой-то маркер отмены, доступный для проверки в текущей выполняемой должности.

3 7

3 ответа:

Взгляните на BroadcastBlock<T>, который содержит только самый последний элемент, опубликованный в нем. Вы можете поместить широковещательный блок перед ActionBlock<T>.

При отправке нового элемента в широковещательный блок не отменяется элемент, обрабатываемый в данный момент блоком действий, а перезаписывается любой существующий элемент, уже находящийся в широковещательном блоке; фактически отбрасываются все старые сообщения, еще не обработанные блоком действий. Когда блок действий завершит свой текущий элемент, он займет больше всего времени. последний пункт, опубликованный в блоке широковещания.

В дополнение к ответу Монро Томаса важно понимать, что блоку действий, следующему за блоком широковещания, требуется его ограниченная пропускная способность, ограниченная 1, или он будет хранить и обрабатывать каждое сообщение широковещательного блока, даже когда он все еще выполняется.
Пример кода можно найти здесь:

ActionBlock<int> ExecuteBlock = new ActionBlock<int>(async ThisNumber =>
{
  await Task.Delay(100);
  Console.WriteLine($">{ThisNumber}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

BroadcastBlock<int> ThrottleBlock = new BroadcastBlock<int>(null);
ThrottleBlock.LinkTo(ExecuteBlock, new DataflowLinkOptions { PropagateCompletion = true });

for(int IX = 0; IX < 128; IX++)
{
  await ThrottleBlock.SendAsync(IX);
  await Task.Delay(10);
}

Это приводит к следующему:

>0
>6
>12
>20
>27
>34
>41
>48
>55
>62
>68
>75
>82
>88
>95
>101
>108
>115
>122
>127

Наслаждайтесь!
- Симон

Нет ничего подобного непосредственно в потоке данных TPL, но я вижу несколько способов, как вы могли бы реализовать его самостоятельно:

  1. Если вам не нужно обрабатывать модифицированный блок как обычный блок потока данных (например, нет поддержки LinkTo()), то простой способ-написать тип, который обертывает ActionBlock, но элементы которого также содержат флаг, который говорит, следует ли их обрабатывать. Когда вы задаете cancelAllPreviousPosts: true, Все эти флаги сбрасываются, поэтому эти элементы будут пропущенный.

    Код может выглядеть примерно так:

    class CancellableActionBlock<T>
    {
        private class Item
        {
            public T Data { get; private set; }
            public bool ShouldProcess { get; set; }
    
            public Item(T data)
            {
                Data = data;
                ShouldProcess = true;
            }
        }
    
        private readonly ActionBlock<Item> actionBlock;
        private readonly ConcurrentDictionary<Item, bool> itemSet;
    
        public CancellableActionBlock(Action<T> action)
        {
            itemSet = new ConcurrentDictionary<Item, bool>();
            actionBlock = new ActionBlock<Item>(item =>
            {
                bool ignored;
                itemSet.TryRemove(item, out ignored);
    
                if (item.ShouldProcess)
                {
                    action(item.Data);
                }
            });
        }
    
        public bool Post(T data, bool cancelAllPreviousPosts = false)
        {
            if (cancelAllPreviousPosts)
            {
                foreach (var item in itemSet.Keys)
                {
                    item.ShouldProcess = false;
                }
                itemSet.Clear();
            }
    
            var newItem = new Item(data);
            itemSet.TryAdd(newItem, true);
            return actionBlock.Post(newItem);
        }
    
        // probably other members that wrap actionBlock members,
        // like Complete() and Completion
    }
    
  2. Если вы хотите создать что-то более составное и многоразовое, вы можете создать специальный блок только для этой отмены. Вы можете реализовать это, используя thee BufferBlockS, связанные вместе, где третий будет иметь емкость 1, а второй-неограниченную емкость. Таким образом, почти все элементы в очереди будут находиться во втором блоке, поэтому вы можете выполнить отмену, просто поменяв этот блок на новый. Вся структура будет представлена Encapsulate() в первом и третьем блоке.

    Проблема с этим подходом заключается в том, что отмена имеет задержку в 1 пункт (тот, который находится в третьем блоке). Кроме того, я не нашел хорошего интерфейса для этого.