Блок потока данных TPL потребляет всю доступную память
У меня есть TransformManyBlock
со следующей конструкцией:
- ввод: путь к файлу
- вывод: IEnumerable содержимого файла, по одной строке за раз
BoundedCapacity
очень низкое значение (например, 1) для этого блока и всех последующих блоков. Тем не менее, блок, по-видимому, повторяет IEnumerable жадно, который потребляет все доступные память на компьютере, перемалывая каждый процесс до упора. Количество выходов блока продолжает расти без привязки, пока я не убью процесс.
Что я могу сделать, чтобы блок не потреблял IEnumerable
таким образом?
EDIT: вот пример программы, иллюстрирующий эту проблему:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
static IEnumerable<string> GetSequence(char c)
{
for (var i = 0; i < 1024 * 1024; ++i)
yield return new string(c, 1024 * 1024);
}
static void Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 };
var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options);
var secondBlock = new ActionBlock<string>(str =>
{
Console.WriteLine(str.Substring(0, 10));
Thread.Sleep(1000);
}, options);
firstBlock.LinkTo(secondBlock);
firstBlock.Completion.ContinueWith(task =>
{
if (task.IsFaulted) ((IDataflowBlock) secondBlock).Fault(task.Exception);
else secondBlock.Complete();
});
firstBlock.Post('A');
firstBlock.Complete();
for (; ; )
{
Console.WriteLine("OutputCount: {0}", firstBlock.OutputCount);
Thread.Sleep(3000);
}
}
}
Если вы используете 64-разрядный блок, убедитесь, что в Visual Studio снят флажок "предпочесть 32-разрядный". У меня есть 16 ГБ оперативной памяти на моем компьютере, и эта программа немедленно потребляет каждый доступный байт.
1 ответ:
Похоже, вы не совсем понимаете, как работает поток данных TPL.
BoundedCapacity
ограничивает количество элементов, которые можно разнести в блок. В вашем случае это означает одинchar
вTransformManyBlock
и одинstring
вActionBlock
.Таким образом, вы отправляете один элемент в
TransformManyBlock
, который затем возвращает строки1024*1024
и пытается передать их вActionBlock
, который будет принимать только один за один раз. Остальные строки будут просто сидеть в очереди выводаTransformManyBlock
.Чего вы, вероятно, хотите для этого нужно создать один блок и разместить в нем элементы потоковым способом, ожидая (синхронно или иначе), когда его емкость будет достигнута:
private static void Main() { MainAsync().Wait(); } private static async Task MainAsync() { var block = new ActionBlock<string>(async item => { Console.WriteLine(item.Substring(0, 10)); await Task.Delay(1000); }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); foreach (var item in GetSequence('A')) { await block.SendAsync(item); } block.Complete(); await block.Completion; }