Поток данных TPL: как дросселировать весь трубопровод?


Я хочу ограничить количество элементов, размещенных в конвейере потока данных. Количество элементов зависит от производственной среды. Эти объекты потребляют большое количество памяти (изображений), поэтому я хотел бы опубликовать их, когда последний блок конвейера выполнит свою работу.

Я попытался использовать SemaphoreSlim, чтобы задушить производителя и выпустить его в последнем блоке конвейера. Это работает, но если во время процесса возникает исключение, программа ждет вечно, и исключение не возникает. перехваченный.

Вот пример, который выглядит как наш код. Как я могу это сделать ?

static void Main(string[] args)
{
    SemaphoreSlim semaphore = new SemaphoreSlim(1, 2);

    var downloadString = new TransformBlock<string, string>(uri =>
    {
        Console.WriteLine("Downloading '{0}'...", uri);
        return new WebClient().DownloadString(uri);
    });

    var createWordList = new TransformBlock<string, string[]>(text =>
    {
        Console.WriteLine("Creating word list...");

        char[] tokens = text.ToArray();
        for (int i = 0; i < tokens.Length; i++)
        {
            if (!char.IsLetter(tokens[i]))
                tokens[i] = ' ';
        }
        text = new string(tokens);

        return text.Split(new char[] { ' ' },
           StringSplitOptions.RemoveEmptyEntries);
    });

    var filterWordList = new TransformBlock<string[], string[]>(words =>
    {
        Console.WriteLine("Filtering word list...");
        throw new InvalidOperationException("ouch !"); // explicit for test
        return words.Where(word => word.Length > 3).OrderBy(word => word)
           .Distinct().ToArray();
    });

    var findPalindromes = new TransformBlock<string[], string[]>(words =>
    {
        Console.WriteLine("Finding palindromes...");

        var palindromes = new ConcurrentQueue<string>();

        Parallel.ForEach(words, word =>
        {
            string reverse = new string(word.Reverse().ToArray());

            if (Array.BinarySearch<string>(words, reverse) >= 0 &&
                word != reverse)
            {
                palindromes.Enqueue(word);
            }
        });

        return palindromes.ToArray();
    });

    var printPalindrome = new ActionBlock<string[]>(palindromes =>
    {
        try
        {
            foreach (string palindrome in palindromes)
            {
                Console.WriteLine("Found palindrome {0}/{1}",
                   palindrome, new string(palindrome.Reverse().ToArray()));
            }
        }
        finally
        {
            semaphore.Release();
        }
    });

    downloadString.LinkTo(createWordList);
    createWordList.LinkTo(filterWordList);
    filterWordList.LinkTo(findPalindromes);
    findPalindromes.LinkTo(printPalindrome);


    downloadString.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) 
            ((IDataflowBlock)createWordList).Fault(t.Exception);
        else createWordList.Complete();
    });
    createWordList.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) 
            ((IDataflowBlock)filterWordList).Fault(t.Exception);
        else filterWordList.Complete();
    });
    filterWordList.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) 
            ((IDataflowBlock)findPalindromes).Fault(t.Exception); // enter here when an exception throws
        else findPalindromes.Complete();
    });
    findPalindromes.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted)
            ((IDataflowBlock)printPalindrome).Fault(t.Exception); // the fault is propagated here but not catched
        else printPalindrome.Complete();
    });

    try
    {
        for (int i = 0; i < 10; i++)
        {
            Console.WriteLine(i);

            downloadString.Post("http://www.google.com");
            semaphore.Wait(); // waits here when an exception throws
        }

        downloadString.Complete();

        printPalindrome.Completion.Wait();
    }
    catch (AggregateException agg)
    {
        Console.WriteLine("An error has occured : " + agg);
    }
    Console.WriteLine("Done");
    Console.ReadKey();
}
2 5

2 ответа:

Вы должны просто ждать и семафор, и задачу завершения вместе. Таким образом, если блок заканчивается преждевременно (либо по исключению, либо по отмене), то исключение будет перепрофилировано, а если нет, то вы будете ждать на своем семафоре, пока не останется места для размещения дополнительных сообщений.

Это можно сделать с помощью Task.WhenAny и SemaphoreSlim.WaitAsync:

for (int i = 0; i < 10; i++)
{
    Console.WriteLine(i);
    downloadString.Post("http://www.google.com");

    if (printPalindrome.Completion.IsCompleted)
    {
        break;
    }

    Task.WhenAny(semaphore.WaitAsync(), printPalindrome.Completion).Wait();
}

Примечание: использование Task.Wait уместно только в этом случае, поскольку это Main. Обычно это должен быть метод async, и вы должны await возвращать задачу из Task.WhenAny.

Именно так я управлял дросселированием или допуском только 10 элементов в исходном блоке одновременно. Вы можете изменить это, чтобы иметь 1. Убедитесь, что вы также дросселируете любые другие блоки в конвейере, иначе вы можете получить исходный блок с 1 и следующий блок с гораздо большим количеством.

var sourceBlock = new BufferBlock<string>(
    new ExecutionDataflowBlockOptions() { 
        SingleProducerConstrained = true, 
        BoundedCapacity = 10 });

Затем производитель делает следующее:

sourceBlock.SendAsync("value", shutdownToken).Wait(shutdownToken);

Если вы используете async / await, просто дождитесь вызова SendAsync.