Поток данных TPL: создание пользовательского splitblock


Просто нужна помощь в создании пользовательского splitblock с помощью библиотеки потоков данных, которая является частью TPL in. Net.

Все, чего я хочу добиться, - это простой пользовательский блок, который принимает входные данные и разбивает их на несколько блоков преобразования. Это необходимо для фильтрации данных, где я мог бы регистрировать отрицательные записи и продолжать с хорошим.

Для моих нужд должно быть достаточно разделить вход на два разных выхода. Заголовок класса должен выглядеть примерно так этот...

public abstract class SplitBlock<TInput, TOutputLeft, TOutputRight>
Моя проблема в том, что я действительно не знаю, как идти дальше. Все, что я знаю, это то, что мне нужны два блока преобразования:
var leftBlock  = new TransformBlock<TInput, TOutputLeft>(...)
var rightblock = new TransformBlock<TInput, TOutputRight>(...)

Во всех моих попытках у меня было несколько ITargetBlocks для хранения ввода левого и правого блока, но это не может быть правильным, не так ли?

Я ценю каждый намек, который вы можете дать.
1 2

1 ответ:

Я бы начал с размышлений о том, как должен выглядеть общий интерфейс этого класса. Я думаю, что самым простым решением было бы что-то вроде:

public class SplitBlock<TInput, TOutputLeft, TOutputRight>
{
    public ITargetBlock<TInput> Input { get; }
    public ISourceBlock<TOutputLeft> LeftOutput { get; }
    public ISourceBlock<TOutputRight> RightOutput { get; }
}

При этом реализация происходит естественным образом: один входной блок соединяется с двумя выходными блоками. Единственный вопрос заключается в том, должна ли фактическая обработка выполняться в выходных блоках (как вы предложили с вашими двумя TransformBlock) или во входном блоке.

Если вы хотите иметь обработку в выходных блоках, то входной сигнал блок может быть ActionBlock, который посылает вход на оба выхода, и выходы будут TransformBlock s, как вы и предлагали.

public class SplitBlock<TInput, TOutputLeft, TOutputRight>
{
    private ActionBlock<TInput> input;
    private TransformBlock<TInput, TOutputLeft> leftOutput;
    private TransformBlock<TInput, TOutputRight> rightOutput;

    public ITargetBlock<TInput> Input { get { return input; } }
    public ISourceBlock<TOutputLeft> LeftOutput { get { return leftOutput; } }
    public ISourceBlock<TOutputRight> RightOutput { get { return rightOutput; } }

    public SplitBlock(
        Func<TInput, TOutputLeft> leftTransform,
        Func<TInput, TOutputRight> rightTransform)
    {
        input = new ActionBlock<TInput>(
            x =>
            {
                leftOutput.Post(x);
                rightOutput.Post(x);
            });
        leftOutput = new TransformBlock<TInput, TOutputLeft>(leftTransform);
        rightOutput = new TransformBlock<TInput, TOutputRight>(rightTransform);

        // TODO handle fault in input correctly
        input.Completion.ContinueWith(
            _ =>
            {
                leftOutput.Complete();
                rightOutput.Complete();
            });
    }
}

(это предполагает, что левое и правое преобразования могут обрабатывать одни и те же входные данные одновременно.)

С другой стороны, если бы вы хотели выполнить обработку в блоке ввода (что имеет больше смысла для меня), то вы могли бы иметь ActionBlock в качестве входных данных и BufferBlockв качестве выходных данных, причем входной блок обрабатывает входные данные и затем отправляет результаты на выход. блоки:
public class SplitBlock<TInput, TOutputLeft, TOutputRight>
{
    private ActionBlock<TInput> input;
    private BufferBlock<TOutputLeft> leftOutput;
    private BufferBlock<TOutputRight> rightOutput;

    public ITargetBlock<TInput> Input { get { return input; } }
    public ISourceBlock<TOutputLeft> LeftOutput { get { return leftOutput; } }
    public ISourceBlock<TOutputRight> RightOutput { get { return rightOutput; } }

    public SplitBlock(
        Func<TInput, Tuple<TOutputLeft, TOutputRight>> combinedTransform)
    {
        input = new ActionBlock<TInput>(
            value =>
            {
                var result = combinedTransform(value);
                leftOutput.Post(result.Item1);
                rightOutput.Post(result.Item2);
            });
        leftOutput = new BufferBlock<TOutputLeft>();
        rightOutput = new BufferBlock<TOutputRight>();

        // TODO handle fault in input correctly
        input.Completion.ContinueWith(
            _ =>
            {
                leftOutput.Complete();
                rightOutput.Complete();
            });
    }

    public SplitBlock(
        Func<TInput, TOutputLeft> leftTransform,
        Func<TInput, TOutputRight> rightTransform)
        : this(x => Tuple.Create(leftTransform(x), rightTransform(x)))
    {}
}