Shutdown "worker" go подпрограмма после того, как буфер пуст
Я хочу, чтобы мой GO routine worker (ProcessToDo()
в приведенном ниже коде) ждал, пока все "поставленные в очередь" работы будут обработаны, прежде чем завершать работу.
В приведенном ниже примере кода я хочу, чтобы все 20 сообщений были напечатаны...
package main
import (
"time"
"fmt"
)
func ProcessToDo(done chan struct{}, todo chan string) {
for {
select {
case work, ok := <-todo:
if !ok {
fmt.Printf("Shutting down ProcessToDo - todo channel closed!n")
return
}
fmt.Printf("todo: %qn", work)
time.Sleep(100 * time.Millisecond)
case _, ok := <-done:
if ok {
fmt.Printf("Shutting down ProcessToDo - done message received!n")
} else {
fmt.Printf("Shutting down ProcessToDo - done channel closed!n")
}
close(todo)
return
}
}
}
func main() {
done := make(chan struct{})
todo := make(chan string, 100)
go ProcessToDo(done, todo)
for i := 0; i < 20; i++ {
todo <- fmt.Sprintf("Message %02d", i)
}
fmt.Println("*** all messages queued ***")
time.Sleep(1 * time.Second)
close(done)
time.Sleep(4 * time.Second)
}
2 ответа:
done
канал в вашем случае совершенно не нужен, так как вы можете подать сигнал об отключении, закрыв сам каналtodo
.И используйте
for range
на канале, который будет повторяться до тех пор, пока канал не закроется и его буфер не опустеет.У вас должен быть Канал
done
, но только для того, чтобы сама горутина могла сигнализировать, что она закончила работу, и чтобы основная горутина могла продолжить или выйти.Этот вариант эквивалентен вашему, гораздо проще и не требует
time.Sleep()
призывает ждать другие горотины (что было бы слишком ошибочным и неопределенным в любом случае). Попробуйте это на Go Playground :func ProcessToDo(done chan struct{}, todo chan string) { for work := range todo { fmt.Printf("todo: %q\n", work) time.Sleep(100 * time.Millisecond) } fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n") done <- struct{}{} // Signal that we processed all jobs } func main() { done := make(chan struct{}) todo := make(chan string, 100) go ProcessToDo(done, todo) for i := 0; i < 20; i++ { todo <- fmt.Sprintf("Message %02d", i) } fmt.Println("*** all messages queued ***") close(todo) <-done // Wait until the other goroutine finishes all jobs }
Также обратите внимание, что рабочие goroutines должны сигнализировать о завершении с помощью
defer
, чтобы основная goroutine не застряла в ожидании рабочего, если он вернется каким-то неожиданным образом или запаникует. Поэтому лучше начать так:defer func() { done <- struct{}{} // Signal that we processed all jobs }()
Вы также можете использовать
sync.WaitGroup
синхронизировать основную горутину с рабочим (дождаться его). На самом деле, если вы планируете использовать несколько рабочих горотин, что чище, чем считывать несколько значений из каналаdone
. Кроме того, проще сигнализировать о завершении с помощьюWaitGroup
, поскольку он имеетDone()
метод (который является вызовом функции), поэтому вам не нужна анонимная функция:defer wg.Done()
Смотритеanwser Джимба для полного примера с
WaitGroup
.Использование
for range
также идиоматично, если вы хотите использовать несколько рабочих линий: каналы синхронизированы, поэтому вы не нужен любой дополнительный код, который бы синхронизировал доступ к каналуtodo
или задания, полученные от него. И если вы закроете каналtodo
вmain()
, Это будет правильно сигнализировать всем рабочим горутинам. Но, конечно, все поставленные в очередь задания будут получены и обработаны ровно один раз.Теперь возьмем вариант, который использует
WaitGroup
, чтобы заставить основную горутину ждать работника( ответ Джимба): что делать, если вы хотите, чтобы более 1 рабочего горутина; обрабатывать ваши задания одновременно (и, скорее всего, параллель)?Единственное, что вам нужно добавить / изменить в своем коде, это: чтобы действительно запустить несколько из них:
for i := 0; i < 10; i++ { wg.Add(1) go ProcessToDo(todo) }
Не меняя больше ничего, теперь у вас есть правильное параллельное приложение, которое получает и обрабатывает ваши задания, используя 10 параллельных goroutines. И мы не использовали никаких "уродливых"
time.Sleep()
(мы использовали один, но только для имитации медленной обработки, а не для ожидания других горотин), и вам не нужна дополнительная синхронизация.
Обычно плохая идея, чтобы потребитель канала закрыл его, так как отправка по закрытому каналу-это паника.
В этом случае, если вы никогда не хотите прерывать потребителя до того, как все сообщения были отправлены, просто используйте цикл
for...range
и закройте канал, когда вы закончите. Вам также понадобится сигнал, подобныйWaitGroup
, чтобы дождаться окончания goroutine (вместо использования времени.Сон)Http://play.golang.org/p/r97vRPsxEb
var wg sync.WaitGroup func ProcessToDo(todo chan string) { defer wg.Done() for work := range todo { fmt.Printf("todo: %q\n", work) time.Sleep(100 * time.Millisecond) } fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n") } func main() { todo := make(chan string, 100) wg.Add(1) go ProcessToDo(todo) for i := 0; i < 20; i++ { todo <- fmt.Sprintf("Message %02d", i) } fmt.Println("*** all messages queued ***") close(todo) wg.Wait() }