Осуществление правильного выполнения повторяемая блок
тизер: ребята, это вопрос не о том, как реализовать политику повтора. Речь идет о правильном завершении блока потока данных TPL.
этот вопрос является продолжением моего предыдущего вопроса повторить политику в ITargetBlock. Ответом на этот вопрос было умное решение @svick, которое использует TransformBlock
(источник) и TransformManyBlock
(цель). Единственная оставшаяся проблема-завершить этот блок в правильно: подождите, пока все сначала повторите попытку, а затем завершите целевой блок. Вот что я закончил (это просто фрагмент, не обращайте слишком много внимания на не-threadsafe retries
set):
var retries = new HashSet<RetryingMessage<TInput>>();
TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
async message =>
{
try
{
var result = new[] { await transform(message.Data) };
retries.Remove(message);
return result;
}
catch (Exception ex)
{
message.Exceptions.Add(ex);
if (message.RetriesRemaining == 0)
{
if (failureHandler != null)
failureHandler(message.Exceptions);
retries.Remove(message);
}
else
{
retries.Add(message);
message.RetriesRemaining--;
Task.Delay(retryDelay)
.ContinueWith(_ => target.Post(message));
}
return null;
}
}, dataflowBlockOptions);
source.LinkTo(target);
source.Completion.ContinueWith(async _ =>
{
while (target.InputCount > 0 || retries.Any())
await Task.Delay(100);
target.Complete();
});
идея состоит в том, чтобы выполнить какой-то опрос и проверить, есть ли еще сообщения, которые ждут обработки, и нет сообщений, требующих повторной попытки. Но в этом решении мне не нравится идея опроса.
Да, я могу инкапсулировать логику добавление / удаление повторов в отдельный класс и даже, например, выполнение некоторых действий, когда набор повторов становится пустым, но как справиться с target.InputCount > 0
состоянии? Нет обратного вызова, который вызывается, когда нет ожидающих сообщений для блока, так и кажется, что проверка target.ItemCount
в цикле с небольшой задержкой-это единственный вариант.
кто-нибудь знает более умный способ достичь этого?
2 ответа:
сочетание ответа hwcverwe и комментария JamieSee может быть идеальным решением.
во-первых, вам нужно создать более одного события:
var signal = new ManualResetEvent(false); var completedEvent = new ManualResetEvent(false);
затем, вы должны создать наблюдателя, и подписаться на
TransformManyBlock
, так что вы будете уведомлены, когда соответствующее событие происходит:var observer = new RetryingBlockObserver<TOutput>(completedEvent); var observable = target.AsObservable(); observable.Subscribe(observer);
наблюдаемое может быть довольно легко:
private class RetryingBlockObserver<T> : IObserver<T> { private ManualResetEvent completedEvent; public RetryingBlockObserver(ManualResetEvent completedEvent) { this.completedEvent = completedEvent; } public void OnCompleted() { completedEvent.Set(); } public void OnError(Exception error) { //TODO } public void OnNext(T value) { //TODO } }
и вы можете ждать либо сигнала, либо завершения (исчерпание всех исходных элементов), либо оба
source.Completion.ContinueWith(async _ => { WaitHandle.WaitAll(completedEvent, signal); // Or WaitHandle.WaitAny, depending on your needs! target.Complete(); });
вы можете проверить значение результата WaitAll, чтобы понять, какое событие было установлено, и реагировать соответствующим образом. Вы также можете добавить другие события в код, передавая их наблюдателю, чтобы он мог установить их при необходимости. Вы можете различать свое поведение и реагировать по-разному, когда возникает ошибка, например
может быть, a ManualResetEvent может сделать трюк для вас.
добавить публичное свойство в
TransformManyBlock
private ManualResetEvent _signal = new ManualResetEvent(false); public ManualResetEvent Signal { get { return _signal; } }
и вот вы идете:
var retries = new HashSet<RetryingMessage<TInput>>(); TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null; target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>( async message => { try { var result = new[] { await transform(message.Data) }; retries.Remove(message); // Sets the state of the event to signaled, allowing one or more waiting threads to proceed if(!retries.Any()) Signal.Set(); return result; } catch (Exception ex) { message.Exceptions.Add(ex); if (message.RetriesRemaining == 0) { if (failureHandler != null) failureHandler(message.Exceptions); retries.Remove(message); // Sets the state of the event to signaled, allowing one or more waiting threads to proceed if(!retries.Any()) Signal.Set(); } else { retries.Add(message); message.RetriesRemaining--; Task.Delay(retryDelay) .ContinueWith(_ => target.Post(message)); } return null; } }, dataflowBlockOptions); source.LinkTo(target); source.Completion.ContinueWith(async _ => { //Blocks the current thread until the current WaitHandle receives a signal. target.Signal.WaitOne(); target.Complete(); });
Я не уверен, где ваш
target.InputCount
установлен. Так что на месте вы меняетесьtarget.InputCount
вы можете добавить следующий код:if(InputCount == 0) Signal.Set();