Shutdown "worker" go подпрограмма после того, как буфер пуст


Я хочу, чтобы мой GO routine worker (ProcessToDo() в приведенном ниже коде) ждал, пока все "поставленные в очередь" работы будут обработаны, прежде чем завершать работу.

Рабочая подпрограмма имеет канал "делать" (буферизованный), через который в нее посылается работа. И у него есть канал "сделано", чтобы сказать ему начать выключение. В документации говорится, что выбор на каналах будет выбирать "псевдослучайное значение", если выполняется более одного выбора... это означает, что завершение работы (возврат) запускается до того, как все буферизованные работа завершена.

В приведенном ниже примере кода я хочу, чтобы все 20 сообщений были напечатаны...

package main

import (
    "time"
    "fmt"
)


func ProcessToDo(done chan struct{}, todo chan string) {
    for {
        select {
        case work, ok := <-todo:
            if !ok {
                fmt.Printf("Shutting down ProcessToDo - todo channel closed!n")
                return
            }
            fmt.Printf("todo: %qn", work)
            time.Sleep(100 * time.Millisecond)
        case _, ok := <-done:
            if ok {
                fmt.Printf("Shutting down ProcessToDo - done message received!n")
            } else {
                fmt.Printf("Shutting down ProcessToDo - done channel closed!n")
            }
            close(todo)
            return
        }
    }
}

func main() {

    done := make(chan struct{})
    todo := make(chan string, 100)

    go ProcessToDo(done, todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    time.Sleep(1 * time.Second)
    close(done)
    time.Sleep(4 * time.Second)
}
2 2

2 ответа:

done канал в вашем случае совершенно не нужен, так как вы можете подать сигнал об отключении, закрыв сам канал todo.

И используйте for range на канале, который будет повторяться до тех пор, пока канал не закроется и его буфер не опустеет.

У вас должен быть Канал done, но только для того, чтобы сама горутина могла сигнализировать, что она закончила работу, и чтобы основная горутина могла продолжить или выйти.

Этот вариант эквивалентен вашему, гораздо проще и не требует time.Sleep() призывает ждать другие горотины (что было бы слишком ошибочным и неопределенным в любом случае). Попробуйте это на Go Playground :

func ProcessToDo(done chan struct{}, todo chan string) {
    for work := range todo {
        fmt.Printf("todo: %q\n", work)
        time.Sleep(100 * time.Millisecond)
    }
    fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
    done <- struct{}{} // Signal that we processed all jobs
}

func main() {
    done := make(chan struct{})
    todo := make(chan string, 100)

    go ProcessToDo(done, todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    close(todo)
    <-done // Wait until the other goroutine finishes all jobs
}

Также обратите внимание, что рабочие goroutines должны сигнализировать о завершении с помощью defer, чтобы основная goroutine не застряла в ожидании рабочего, если он вернется каким-то неожиданным образом или запаникует. Поэтому лучше начать так:

defer func() {
    done <- struct{}{} // Signal that we processed all jobs
}()

Вы также можете использовать sync.WaitGroup синхронизировать основную горутину с рабочим (дождаться его). На самом деле, если вы планируете использовать несколько рабочих горотин, что чище, чем считывать несколько значений из канала done. Кроме того, проще сигнализировать о завершении с помощью WaitGroup, поскольку он имеет Done() метод (который является вызовом функции), поэтому вам не нужна анонимная функция:

defer wg.Done()

Смотритеanwser Джимба для полного примера с WaitGroup.

Использование for range также идиоматично, если вы хотите использовать несколько рабочих линий: каналы синхронизированы, поэтому вы не нужен любой дополнительный код, который бы синхронизировал доступ к каналу todo или задания, полученные от него. И если вы закроете канал todo в main(), Это будет правильно сигнализировать всем рабочим горутинам. Но, конечно, все поставленные в очередь задания будут получены и обработаны ровно один раз.

Теперь возьмем вариант, который использует WaitGroup, чтобы заставить основную горутину ждать работника( ответ Джимба): что делать, если вы хотите, чтобы более 1 рабочего горутина; обрабатывать ваши задания одновременно (и, скорее всего, параллель)?

Единственное, что вам нужно добавить / изменить в своем коде, это: чтобы действительно запустить несколько из них:

for i := 0; i < 10; i++ {
    wg.Add(1)
    go ProcessToDo(todo)
}

Не меняя больше ничего, теперь у вас есть правильное параллельное приложение, которое получает и обрабатывает ваши задания, используя 10 параллельных goroutines. И мы не использовали никаких "уродливых" time.Sleep() (мы использовали один, но только для имитации медленной обработки, а не для ожидания других горотин), и вам не нужна дополнительная синхронизация.

Обычно плохая идея, чтобы потребитель канала закрыл его, так как отправка по закрытому каналу-это паника.

В этом случае, если вы никогда не хотите прерывать потребителя до того, как все сообщения были отправлены, просто используйте цикл for...range и закройте канал, когда вы закончите. Вам также понадобится сигнал, подобный WaitGroup, чтобы дождаться окончания goroutine (вместо использования времени.Сон)

Http://play.golang.org/p/r97vRPsxEb

var wg sync.WaitGroup

func ProcessToDo(todo chan string) {
    defer wg.Done()
    for work := range todo {
        fmt.Printf("todo: %q\n", work)
        time.Sleep(100 * time.Millisecond)

    }
    fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")

}

func main() {
    todo := make(chan string, 100)
    wg.Add(1)
    go ProcessToDo(todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    close(todo)
    wg.Wait()
}