Осуществление правильного выполнения повторяемая блок


тизер: ребята, это вопрос не о том, как реализовать политику повтора. Речь идет о правильном завершении блока потока данных TPL.

этот вопрос является продолжением моего предыдущего вопроса повторить политику в ITargetBlock. Ответом на этот вопрос было умное решение @svick, которое использует TransformBlock (источник) и TransformManyBlock (цель). Единственная оставшаяся проблема-завершить этот блок в правильно: подождите, пока все сначала повторите попытку, а затем завершите целевой блок. Вот что я закончил (это просто фрагмент, не обращайте слишком много внимания на не-threadsafe retries set):

var retries = new HashSet<RetryingMessage<TInput>>();

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message =>
    {
        try
        {
            var result = new[] { await transform(message.Data) };
            retries.Remove(message);
            return result;
        }
        catch (Exception ex)
        {
            message.Exceptions.Add(ex);
            if (message.RetriesRemaining == 0)
            {
                if (failureHandler != null)
                    failureHandler(message.Exceptions);

                retries.Remove(message);
            }
            else
            {
                retries.Add(message);
                message.RetriesRemaining--;

                Task.Delay(retryDelay)
                    .ContinueWith(_ => target.Post(message));
            }
            return null;
        }
    }, dataflowBlockOptions);

source.LinkTo(target);

source.Completion.ContinueWith(async _ =>
{
    while (target.InputCount > 0 || retries.Any())
        await Task.Delay(100);

    target.Complete();
});

идея состоит в том, чтобы выполнить какой-то опрос и проверить, есть ли еще сообщения, которые ждут обработки, и нет сообщений, требующих повторной попытки. Но в этом решении мне не нравится идея опроса.

Да, я могу инкапсулировать логику добавление / удаление повторов в отдельный класс и даже, например, выполнение некоторых действий, когда набор повторов становится пустым, но как справиться с target.InputCount > 0 состоянии? Нет обратного вызова, который вызывается, когда нет ожидающих сообщений для блока, так и кажется, что проверка target.ItemCount в цикле с небольшой задержкой-это единственный вариант.

кто-нибудь знает более умный способ достичь этого?

2 53

2 ответа:

сочетание ответа hwcverwe и комментария JamieSee может быть идеальным решением.

во-первых, вам нужно создать более одного события:

var signal  = new ManualResetEvent(false);
var completedEvent = new ManualResetEvent(false);

затем, вы должны создать наблюдателя, и подписаться на TransformManyBlock, так что вы будете уведомлены, когда соответствующее событие происходит:

var observer = new RetryingBlockObserver<TOutput>(completedEvent);
var observable = target.AsObservable();
observable.Subscribe(observer);

наблюдаемое может быть довольно легко:

private class RetryingBlockObserver<T> : IObserver<T> {
        private ManualResetEvent completedEvent;

        public RetryingBlockObserver(ManualResetEvent completedEvent) {                
            this.completedEvent = completedEvent;
        }

        public void OnCompleted() {
            completedEvent.Set();
        }

        public void OnError(Exception error) {
            //TODO
        }

        public void OnNext(T value) {
            //TODO
        }
    }

и вы можете ждать либо сигнала, либо завершения (исчерпание всех исходных элементов), либо оба

 source.Completion.ContinueWith(async _ => {

            WaitHandle.WaitAll(completedEvent, signal);
            // Or WaitHandle.WaitAny, depending on your needs!

            target.Complete();
        });

вы можете проверить значение результата WaitAll, чтобы понять, какое событие было установлено, и реагировать соответствующим образом. Вы также можете добавить другие события в код, передавая их наблюдателю, чтобы он мог установить их при необходимости. Вы можете различать свое поведение и реагировать по-разному, когда возникает ошибка, например

может быть, a ManualResetEvent может сделать трюк для вас.

добавить публичное свойство в TransformManyBlock

private ManualResetEvent _signal  = new ManualResetEvent(false);
public ManualResetEvent Signal { get { return _signal; } }

и вот вы идете:

var retries = new HashSet<RetryingMessage<TInput>>();

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message =>
    {
        try
        {
            var result = new[] { await transform(message.Data) };
            retries.Remove(message);

            // Sets the state of the event to signaled, allowing one or more waiting threads to proceed
            if(!retries.Any()) Signal.Set(); 
            return result;
        }
        catch (Exception ex)
        {
            message.Exceptions.Add(ex);
            if (message.RetriesRemaining == 0)
            {
                if (failureHandler != null)
                    failureHandler(message.Exceptions);

                retries.Remove(message);

                // Sets the state of the event to signaled, allowing one or more waiting threads to proceed
                if(!retries.Any()) Signal.Set(); 
            }
            else
            {
                retries.Add(message);
                message.RetriesRemaining--;

                Task.Delay(retryDelay)
                    .ContinueWith(_ => target.Post(message));
            }
            return null;
        }
    }, dataflowBlockOptions);

source.LinkTo(target);

source.Completion.ContinueWith(async _ =>
{
    //Blocks the current thread until the current WaitHandle receives a signal.
    target.Signal.WaitOne();

    target.Complete();
});

Я не уверен, где ваш target.InputCount установлен. Так что на месте вы меняетесь target.InputCount вы можете добавить следующий код:

if(InputCount == 0)  Signal.Set();