Как бы вы определили пул горутин должен быть казнен в Golang?
TL;TR: пожалуйста, просто перейдите к последней части и скажите мне, как вы решите эту проблему.
я начал использовать Golang сегодня утром, исходя из Python. Я хочу вызвать исполняемый файл с закрытым исходным кодом из Go несколько раз, с немного параллелизма, с различными аргументами командной строки. Мой код просто хорошо работает, но я хотел бы получить ваш вклад, чтобы улучшить его. Поскольку я нахожусь на ранней стадии обучения, я также объясню свой рабочий процесс.
для простоты предположим здесь, что эта "внешняя программа с закрытым исходным кодом" является zenity
, средство командной строки Linux, которое может отображать графические окна сообщений из командной строки.
вызов исполняемого файла из Go
Итак, в Go, я бы пошел так:
package main
import "os/exec"
func main() {
cmd := exec.Command("zenity", "--info", "--text='Hello World'")
cmd.Run()
}
это должно работать правильно. Обратите внимание, что .Run()
является функциональным эквивалентом .Start()
следовал по .Wait()
. Это здорово, но если бы я хотел выполните эту программу только один раз, весь программный материал не будет стоить того. Так что давайте сделаем это несколько раз.
вызов исполняемого файла несколько раз
теперь, когда у меня это работает, я хотел бы вызвать свою программу несколько раз, с пользовательскими аргументами командной строки (здесь просто i
для простоты).
package main
import (
"os/exec"
"strconv"
)
func main() {
NumEl := 8 // Number of times the external program is called
for i:=0; i<NumEl; i++ {
cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
cmd.Run()
}
}
Ок, мы сделали это! Но я все еще не вижу преимущества перехода на Python ... Этот кусок кода фактически выполняется в a серийная мода. У меня есть многоядерный процессор, и я хотел бы воспользоваться этим. Итак, давайте добавим некоторый параллелизм с goroutines.
Goroutines, или способ сделать мою программу параллельной
a) первая попытка: просто добавьте"go" s везде
давайте перепишем наш код, чтобы упростить вызов и повторное использование и добавить знаменитый go
ключевые слова:
package main
import (
"os/exec"
"strconv"
)
func main() {
NumEl := 8
for i:=0; i<NumEl; i++ {
go callProg(i) // <--- There!
}
}
func callProg(i int) {
cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
cmd.Run()
}
ничего! В чем проблема? Все горутины выполняются одновременно. Я не знаю почему zenity не выполняется, но AFAIK, программа Go вышла до того, как внешняя программа zenity могла быть даже инициализирована. Это было подтверждено использованием time.Sleep
: ожидания в течение нескольких секунд было достаточно, чтобы позволить 8 экземпляр zenity запустить себя. Я не знаю, если это можно считать ошибкой, хотя.
чтобы сделать это хуже, реальная программа, которую я действительно хотел бы вызвать, занимает некоторое время, чтобы выполнить себя. Если я выполняю 8 экземпляров этой программы параллельно на моем 4-ядерном процессоре, это будет тратить некоторое время на много переключений контекста ... я не знаю, как ведут себя простые гороутины, но exec.Command
будет запустите zenity 8 раз в 8 различных потоках. Чтобы сделать это еще хуже, я хочу выполнить эту программу более 100 000 раз. Делать все это сразу в goroutines не будет эффективным вообще. Тем не менее, я хотел бы использовать свой 4-ядерный процессор!
B) вторая попытка: используйте пулы goroutines
интернет-ресурсы, как правило, рекомендуют использование sync.WaitGroup
для такого рода работы. Проблема с этим подходом заключается в том, что вы в основном работаете с пакетами goroutines: если я создаю группу ожидания из 4 членов, программа Go будет ждать все 4 внешних программы для завершения перед вызовом нового пакета из 4 программ. Это не эффективно: процессор тратится впустую, еще раз.
некоторые другие ресурсы рекомендовали использовать буферизованный канал для выполнения работы:
package main
import (
"os/exec"
"strconv"
)
func main() {
NumEl := 8 // Number of times the external program is called
NumCore := 4 // Number of available cores
c := make(chan bool, NumCore - 1)
for i:=0; i<NumEl; i++ {
go callProg(i, c)
c <- true // At the NumCoreth iteration, c is blocking
}
}
func callProg(i int, c chan bool) {
defer func () {<- c}()
cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
cmd.Run()
}
это кажется уродливым. Каналы не были предназначены для этой цели: я использую побочный эффект. Я люблю концепцию defer
но я ненавижу объявлять функцию (даже лямбда), чтобы вытащить значение из фиктивного канала, который я создал. О, и конечно, использование фиктивного канала само по себе уродливо.
в) Третья попытка: умереть, когда все дети мертвы
теперь мы почти закончили. Я просто должен принять во внимание еще один побочный эффект: программа Go закрывается до того, как все Зенит всплывающие окна закрыты. Это связано с тем, что при завершении цикла (на 8-й итерации) ничто не мешает программе завершить работу. На этот раз, sync.WaitGroup
будет полезно.
package main
import (
"os/exec"
"strconv"
"sync"
)
func main() {
NumEl := 8 // Number of times the external program is called
NumCore := 4 // Number of available cores
c := make(chan bool, NumCore - 1)
wg := new(sync.WaitGroup)
wg.Add(NumEl) // Set the number of goroutines to (0 + NumEl)
for i:=0; i<NumEl; i++ {
go callProg(i, c, wg)
c <- true // At the NumCoreth iteration, c is blocking
}
wg.Wait() // Wait for all the children to die
close(c)
}
func callProg(i int, c chan bool, wg *sync.WaitGroup) {
defer func () {
<- c
wg.Done() // Decrease the number of alive goroutines
}()
cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
cmd.Run()
}
сделано.
мои вопросы
- знаете ли вы какой-либо другой правильный способ ограничить количество goroutines, выполненных сразу?
я не имею в виду потоки; как Go управляет goroutines внутренне не имеет значения. Я действительно имею в виду ограничивая количество горутин сразу начал: exec.Command
создает новый поток каждый раз, когда он вызывается, поэтому я должен контролировать количество раз, когда он вызывается.
- этот код выглядит нормально для вас?
- знаете ли вы, как избежать использования фиктивного канала в этом случае?
я не могу убедить себя, что такие фиктивные каналы-это путь.
3 ответа:
Я бы породил 4 рабочих goroutines, которые читают задачи из общего канала. Goroutines, которые быстрее других (потому что они запланированы по-другому или получают простые задачи), получат больше задач от этого канала, чем другие. В дополнение к этому, я бы использовал синхронизации.WaitGroup ждать, пока все рабочие закончат. Оставшаяся часть - это просто создание задач. Вы можете увидеть пример реализации этого подхода здесь:
package main import ( "os/exec" "strconv" "sync" ) func main() { tasks := make(chan *exec.Cmd, 64) // spawn four worker goroutines var wg sync.WaitGroup for i := 0; i < 4; i++ { wg.Add(1) go func() { for cmd := range tasks { cmd.Run() } wg.Done() }() } // generate some tasks for i := 0; i < 10; i++ { tasks <- exec.Command("zenity", "--info", "--text='Hello from iteration n."+strconv.Itoa(i)+"'") } close(tasks) // wait for the workers to finish wg.Wait() }
вероятно, есть и другие возможные подходы, но я думаю, что это очень чистое решение, которое легко понять.
простой подход к дросселированию (execute
f()
N раз, но не болееmaxConcurrency
одновременно), просто схема:package main import ( "sync" ) const maxConcurrency = 4 // for example var throttle = make(chan int, maxConcurrency) func main() { const N = 100 // for example var wg sync.WaitGroup for i := 0; i < N; i++ { throttle <- 1 // whatever number wg.Add(1) go f(i, &wg, throttle) } wg.Wait() } func f(i int, wg *sync.WaitGroup, throttle chan int) { defer wg.Done() // whatever processing println(i) <-throttle }
Я бы, наверное, назвать
throttle
канал "пустышки". ИМХО это элегантный способ (это не мое изобретение, конечно), как ограничить параллелизм.Кстати: обратите внимание, что вы игнорируете ошибки от
cmd.Run()
.
попробуйте это: https://github.com/korovkin/limiter
limiter := NewConcurrencyLimiter(10) limiter.Execute(func() { zenity(...) }) limiter.Wait()