F# асинхронный рабочий процесс / задачи в сочетании со свободной монадой


Я пытаюсь построить конвейер для обработки сообщений, используя свободный шаблон монады, мой код выглядит так:

module PipeMonad =
type PipeInstruction<'msgIn, 'msgOut, 'a> =
    | HandleAsync of 'msgIn * (Async<'msgOut> -> 'a)
    | SendOutAsync of 'msgOut * (Async -> 'a)

let private mapInstruction f = function
    | HandleAsync (x, next) -> HandleAsync (x, next >> f)
    | SendOutAsync (x, next) -> SendOutAsync (x, next >> f)

type PipeProgram<'msgIn, 'msgOut, 'a> =
    | Act of PipeInstruction<'msgIn, 'msgOut, PipeProgram<'msgIn, 'msgOut, 'a>>
    | Stop of 'a

let rec bind f = function
    | Act x -> x |> mapInstruction (bind f) |> Act
    | Stop x -> f x

type PipeBuilder() =
    member __.Bind (x, f) = bind f x
    member __.Return x = Stop x
    member __.Zero () = Stop ()
    member __.ReturnFrom x = x

let pipe = PipeBuilder()
let handleAsync msgIn = Act (HandleAsync (msgIn, Stop))
let sendOutAsync msgOut = Act (SendOutAsync (msgOut, Stop))

Который я написал в соответствии с этой статьей

Однако для меня важно, чтобы эти методы были асинхронными (Task предпочтительно, но Async приемлемо), но когда я создал конструктор для моего pipeline, я не могу понять, как его использовать - как я могу ждать Task<'msgOut> или Async<'msgOut>, чтобы я мог отправить его и дождаться этой задачи "отправить"?

Теперь у меня есть этот кусок код:

let pipeline log msgIn =
    pipe {
        let! msgOut = handleAsync msgIn
        let result = async {
            let! msgOut = msgOut
            log msgOut
            return sendOutAsync msgOut
        }
        return result
    }

Который возвращает PipeProgram<'b, 'a, Async<PipeProgram<'c, 'a, Async>>>

2 3

2 ответа:

В моем понимании, весь смысл свободной монады заключается в том, что вы не выставляете такие эффекты, как асинхронность, поэтому я не думаю, что они должны использоваться в типе PipeInstruction. Интерпретатор-это место, где добавляются эффекты.

Кроме того, свободная Монада действительно имеет смысл только в Haskell, где все, что вам нужно сделать, это определить функтор, а затем вы получите остальную реализацию автоматически. В F# вы также должны написать остальную часть кода, так что нет большой пользы в использовании Free по более традиционному шаблону интерпретатора. Тот код TurtleProgram, с которым вы связались, был просто экспериментом-я бы вообще не рекомендовал использовать Free для реального кода. Наконец, если вы уже знаете эффекты, которые собираетесь использовать, и у вас не будет более одной интерпретации, то использование этого подхода не имеет смысла. Это имеет смысл только тогда, когда преимущества перевешивают сложность.

В любом случае, если вы хотите написать версию интерпретатора (а не бесплатно) это вот как бы я это сделал:

Во-первых, определите инструкции без каких-либо эффектов.
/// The abstract instruction set
module PipeProgram =

    type PipeInstruction<'msgIn, 'msgOut,'state> =
        | Handle of 'msgIn * ('msgOut -> PipeInstruction<'msgIn, 'msgOut,'state>)
        | SendOut of 'msgOut * (unit -> PipeInstruction<'msgIn, 'msgOut,'state>)
        | Stop of 'state

Тогда вы можете написать для него вычислительное выражение:

/// A computation expression for a PipeProgram
module PipeProgramCE =
    open PipeProgram

    let rec bind f instruction =
        match instruction with
        | Handle (x,next) ->  Handle (x, (next >> bind f))
        | SendOut (x, next) -> SendOut (x, (next >> bind f))
        | Stop x -> f x

    type PipeBuilder() =
        member __.Bind (x, f) = bind f x
        member __.Return x = Stop x
        member __.Zero () = Stop ()
        member __.ReturnFrom x = x

let pipe = PipeProgramCE.PipeBuilder()

А затем вы можете начать писать свои вычислительные выражения. Это поможет очистить дизайн перед началом работы над интерпретатором.

// helper functions for CE
let stop x = PipeProgram.Stop x
let handle x = PipeProgram.Handle (x,stop)
let sendOut x  = PipeProgram.SendOut (x, stop)

let exampleProgram : PipeProgram.PipeInstruction<string,string,string> = pipe {
    let! msgOut1 = handle "In1"
    do! sendOut msgOut1
    let! msgOut2 = handle "In2"
    do! sendOut msgOut2
    return msgOut2
    }
После того, как вы описали инструкции, вы можете написать интерпретаторы. И, как я уже сказал, Если вы не пишете несколько интерпретаторов, то, возможно, вы не нужно этого делать вообще.

Вот интерпретатор для несинхронной версии ("Id monad", так сказать):

module PipeInterpreterSync =
    open PipeProgram

    let handle msgIn =
        printfn "In: %A"  msgIn
        let msgOut = System.Console.ReadLine()
        msgOut

    let sendOut msgOut =
        printfn "Out: %A"  msgOut
        ()

    let rec interpret instruction =
        match instruction with
        | Handle (x, next) ->
            let result = handle x
            result |> next |> interpret
        | SendOut (x, next) ->
            let result = sendOut x
            result |> next |> interpret
        | Stop x ->
            x

А вот асинхронная версия:

module PipeInterpreterAsync =
    open PipeProgram

    /// Implementation of "handle" uses async/IO
    let handleAsync msgIn = async {
        printfn "In: %A"  msgIn
        let msgOut = System.Console.ReadLine()
        return msgOut
        }

    /// Implementation of "sendOut" uses async/IO
    let sendOutAsync msgOut = async {
        printfn "Out: %A"  msgOut
        return ()
        }

    let rec interpret instruction =
        match instruction with
        | Handle (x, next) -> async {
            let! result = handleAsync x
            return! result |> next |> interpret
            }
        | SendOut (x, next) -> async {
            do! sendOutAsync x
            return! () |> next |> interpret
            }
        | Stop x -> x

Прежде всего, я думаю, что использование свободных монад в F# очень близко к тому, чтобы быть анти-паттерном. Это очень абстрактная конструкция, которая не очень хорошо сочетается с идиоматическим стилем F#, но это вопрос предпочтения, и если вы (и ваша команда) найдете этот способ написания кода читаемым и легким для понимания, то вы, безусловно, можете пойти в этом направлении.

Из любопытства я потратил немного времени, играя с вашим примером - хотя я еще не совсем понял, как исправить вашу ошибку. пример полностью, я надеюсь, что следующее может помочь направить вас в правильном направлении. Резюмируя, я думаю, что вам нужно будет интегрировать Async в ваш PipeProgram так, чтобы программа канала была изначально асинхронной:

type PipeInstruction<'msgIn, 'msgOut, 'a> =
    | HandleAsync of 'msgIn * (Async<'msgOut> -> 'a)
    | SendOutAsync of 'msgOut * (Async<unit> -> 'a)
    | Continue of 'a 

type PipeProgram<'msgIn, 'msgOut, 'a> =
    | Act of Async<PipeInstruction<'msgIn, 'msgOut, PipeProgram<'msgIn, 'msgOut, 'a>>>
    | Stop of Async<'a>

Обратите внимание, что мне пришлось добавить Continue, чтобы сделать мои функции type-check, но я думаю, что это, вероятно, неправильный Хак, и вам, возможно, потребуется удалить его. С помощью этих определений вы можете сделать следующее:

let private mapInstruction f = function
    | HandleAsync (x, next) -> HandleAsync (x, next >> f)
    | SendOutAsync (x, next) -> SendOutAsync (x, next >> f)
    | Continue v -> Continue v

let rec bind (f:'a -> PipeProgram<_, _, _>) = function
    | Act x -> 
        let w = async { 
          let! x = x 
          return mapInstruction (bind f) x }
        Act w
    | Stop x -> 
        let w = async {
          let! x = x
          let pg = f x
          return Continue pg
        }
        Act w

type PipeBuilder() =
    member __.Bind (x, f) = bind f x
    member __.Return x = Stop x
    member __.Zero () = Stop (async.Return())
    member __.ReturnFrom x = x

let pipe = PipeBuilder()
let handleAsync msgIn = Act (async.Return(HandleAsync (msgIn, Stop)))
let sendOutAsync msgOut = Act (async.Return(SendOutAsync (msgOut, Stop)))

let pipeline log msgIn =
    pipe {
        let! msgOut = handleAsync msgIn
        log msgOut
        return! sendOutAsync msgOut
    }

pipeline ignore 0 

Теперь это дает вам просто PipeProgram<int, unit, unit>, которые вы должны быть в состоянии вычислите, имея рекурсивные асинхронные функции, которые действуют на команды.