Блок потока данных TPL потребляет всю доступную память


У меня есть TransformManyBlock со следующей конструкцией:

  • ввод: путь к файлу
  • вывод: IEnumerable содержимого файла, по одной строке за раз
Я запускаю этот блок на огромном файле (61 ГБ), который слишком велик, чтобы поместиться в оперативной памяти. Чтобы избежать неограниченного роста памяти, я установил BoundedCapacity очень низкое значение (например, 1) для этого блока и всех последующих блоков. Тем не менее, блок, по-видимому, повторяет IEnumerable жадно, который потребляет все доступные память на компьютере, перемалывая каждый процесс до упора. Количество выходов блока продолжает расти без привязки, пока я не убью процесс.

Что я могу сделать, чтобы блок не потреблял IEnumerable таким образом?

EDIT: вот пример программы, иллюстрирующий эту проблему:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
    static IEnumerable<string> GetSequence(char c)
    {
        for (var i = 0; i < 1024 * 1024; ++i)
            yield return new string(c, 1024 * 1024);
    }

    static void Main(string[] args)
    {
        var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 };
        var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options);
        var secondBlock = new ActionBlock<string>(str =>
            {
                Console.WriteLine(str.Substring(0, 10));
                Thread.Sleep(1000);
            }, options);

        firstBlock.LinkTo(secondBlock);
        firstBlock.Completion.ContinueWith(task =>
            {
                if (task.IsFaulted) ((IDataflowBlock) secondBlock).Fault(task.Exception);
                else secondBlock.Complete();
            });

        firstBlock.Post('A');
        firstBlock.Complete();
        for (; ; )
        {
            Console.WriteLine("OutputCount: {0}", firstBlock.OutputCount);
            Thread.Sleep(3000);
        }
    }
}

Если вы используете 64-разрядный блок, убедитесь, что в Visual Studio снят флажок "предпочесть 32-разрядный". У меня есть 16 ГБ оперативной памяти на моем компьютере, и эта программа немедленно потребляет каждый доступный байт.

1 5

1 ответ:

Похоже, вы не совсем понимаете, как работает поток данных TPL.

BoundedCapacity ограничивает количество элементов, которые можно разнести в блок. В вашем случае это означает один char в TransformManyBlock и один string в ActionBlock.

Таким образом, вы отправляете один элемент в TransformManyBlock, который затем возвращает строки 1024*1024 и пытается передать их в ActionBlock, который будет принимать только один за один раз. Остальные строки будут просто сидеть в очереди вывода TransformManyBlock.

Чего вы, вероятно, хотите для этого нужно создать один блок и разместить в нем элементы потоковым способом, ожидая (синхронно или иначе), когда его емкость будет достигнута:

private static void Main()
{
    MainAsync().Wait();
}

private static async Task MainAsync()
{
    var block = new ActionBlock<string>(async item =>
    {
        Console.WriteLine(item.Substring(0, 10));
        await Task.Delay(1000);
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

    foreach (var item in GetSequence('A'))
    {
        await block.SendAsync(item);
    }

    block.Complete();
    await block.Completion;
}