Как бы вы определили пул горутин должен быть казнен в Golang?


TL;TR: пожалуйста, просто перейдите к последней части и скажите мне, как вы решите эту проблему.

я начал использовать Golang сегодня утром, исходя из Python. Я хочу вызвать исполняемый файл с закрытым исходным кодом из Go несколько раз, с немного параллелизма, с различными аргументами командной строки. Мой код просто хорошо работает, но я хотел бы получить ваш вклад, чтобы улучшить его. Поскольку я нахожусь на ранней стадии обучения, я также объясню свой рабочий процесс.

для простоты предположим здесь, что эта "внешняя программа с закрытым исходным кодом" является zenity, средство командной строки Linux, которое может отображать графические окна сообщений из командной строки.

вызов исполняемого файла из Go

Итак, в Go, я бы пошел так:

package main
import "os/exec"
func main() {
    cmd := exec.Command("zenity", "--info", "--text='Hello World'")
    cmd.Run()
}

это должно работать правильно. Обратите внимание, что .Run() является функциональным эквивалентом .Start() следовал по .Wait(). Это здорово, но если бы я хотел выполните эту программу только один раз, весь программный материал не будет стоить того. Так что давайте сделаем это несколько раз.

вызов исполняемого файла несколько раз

теперь, когда у меня это работает, я хотел бы вызвать свою программу несколько раз, с пользовательскими аргументами командной строки (здесь просто i для простоты).

package main    
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 // Number of times the external program is called
    for i:=0; i<NumEl; i++ {
        cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
        cmd.Run()
    }
}

Ок, мы сделали это! Но я все еще не вижу преимущества перехода на Python ... Этот кусок кода фактически выполняется в a серийная мода. У меня есть многоядерный процессор, и я хотел бы воспользоваться этим. Итак, давайте добавим некоторый параллелизм с goroutines.

Goroutines, или способ сделать мою программу параллельной

a) первая попытка: просто добавьте"go" s везде

давайте перепишем наш код, чтобы упростить вызов и повторное использование и добавить знаменитый go ключевые слова:

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 
    for i:=0; i<NumEl; i++ {
        go callProg(i)  // <--- There!
    }
}

func callProg(i int) {
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

ничего! В чем проблема? Все горутины выполняются одновременно. Я не знаю почему zenity не выполняется, но AFAIK, программа Go вышла до того, как внешняя программа zenity могла быть даже инициализирована. Это было подтверждено использованием time.Sleep: ожидания в течение нескольких секунд было достаточно, чтобы позволить 8 экземпляр zenity запустить себя. Я не знаю, если это можно считать ошибкой, хотя.

чтобы сделать это хуже, реальная программа, которую я действительно хотел бы вызвать, занимает некоторое время, чтобы выполнить себя. Если я выполняю 8 экземпляров этой программы параллельно на моем 4-ядерном процессоре, это будет тратить некоторое время на много переключений контекста ... я не знаю, как ведут себя простые гороутины, но exec.Commandбудет запустите zenity 8 раз в 8 различных потоках. Чтобы сделать это еще хуже, я хочу выполнить эту программу более 100 000 раз. Делать все это сразу в goroutines не будет эффективным вообще. Тем не менее, я хотел бы использовать свой 4-ядерный процессор!

B) вторая попытка: используйте пулы goroutines

интернет-ресурсы, как правило, рекомендуют использование sync.WaitGroup для такого рода работы. Проблема с этим подходом заключается в том, что вы в основном работаете с пакетами goroutines: если я создаю группу ожидания из 4 членов, программа Go будет ждать все 4 внешних программы для завершения перед вызовом нового пакета из 4 программ. Это не эффективно: процессор тратится впустую, еще раз.

некоторые другие ресурсы рекомендовали использовать буферизованный канал для выполнения работы:

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    for i:=0; i<NumEl; i++ {
        go callProg(i, c)
        c <- true            // At the NumCoreth iteration, c is blocking   
    }
}

func callProg(i int, c chan bool) {
    defer func () {<- c}()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

это кажется уродливым. Каналы не были предназначены для этой цели: я использую побочный эффект. Я люблю концепцию defer но я ненавижу объявлять функцию (даже лямбда), чтобы вытащить значение из фиктивного канала, который я создал. О, и конечно, использование фиктивного канала само по себе уродливо.

в) Третья попытка: умереть, когда все дети мертвы

теперь мы почти закончили. Я просто должен принять во внимание еще один побочный эффект: программа Go закрывается до того, как все Зенит всплывающие окна закрыты. Это связано с тем, что при завершении цикла (на 8-й итерации) ничто не мешает программе завершить работу. На этот раз, sync.WaitGroup будет полезно.

package main
import (
    "os/exec"
    "strconv"
    "sync"
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    wg := new(sync.WaitGroup)
    wg.Add(NumEl)            // Set the number of goroutines to (0 + NumEl)
    for i:=0; i<NumEl; i++ {
        go callProg(i, c, wg)
        c <- true            // At the NumCoreth iteration, c is blocking   
    }
    wg.Wait() // Wait for all the children to die
    close(c)
}

func callProg(i int, c chan bool, wg *sync.WaitGroup) {
    defer func () {
        <- c
        wg.Done() // Decrease the number of alive goroutines
    }()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

сделано.

мои вопросы

  • знаете ли вы какой-либо другой правильный способ ограничить количество goroutines, выполненных сразу?

я не имею в виду потоки; как Go управляет goroutines внутренне не имеет значения. Я действительно имею в виду ограничивая количество горутин сразу начал: exec.Command создает новый поток каждый раз, когда он вызывается, поэтому я должен контролировать количество раз, когда он вызывается.

  • этот код выглядит нормально для вас?
  • знаете ли вы, как избежать использования фиктивного канала в этом случае?

я не могу убедить себя, что такие фиктивные каналы-это путь.

3 61

3 ответа:

Я бы породил 4 рабочих goroutines, которые читают задачи из общего канала. Goroutines, которые быстрее других (потому что они запланированы по-другому или получают простые задачи), получат больше задач от этого канала, чем другие. В дополнение к этому, я бы использовал синхронизации.WaitGroup ждать, пока все рабочие закончат. Оставшаяся часть - это просто создание задач. Вы можете увидеть пример реализации этого подхода здесь:

package main

import (
    "os/exec"
    "strconv"
    "sync"
)

func main() {
    tasks := make(chan *exec.Cmd, 64)

    // spawn four worker goroutines
    var wg sync.WaitGroup
    for i := 0; i < 4; i++ {
        wg.Add(1)
        go func() {
            for cmd := range tasks {
                cmd.Run()
            }
            wg.Done()
        }()
    }

    // generate some tasks
    for i := 0; i < 10; i++ {
        tasks <- exec.Command("zenity", "--info", "--text='Hello from iteration n."+strconv.Itoa(i)+"'")
    }
    close(tasks)

    // wait for the workers to finish
    wg.Wait()
}

вероятно, есть и другие возможные подходы, но я думаю, что это очень чистое решение, которое легко понять.

простой подход к дросселированию (execute f() N раз, но не более maxConcurrency одновременно), просто схема:

package main

import (
        "sync"
)

const maxConcurrency = 4 // for example

var throttle = make(chan int, maxConcurrency)

func main() {
        const N = 100 // for example
        var wg sync.WaitGroup
        for i := 0; i < N; i++ {
                throttle <- 1 // whatever number
                wg.Add(1)
                go f(i, &wg, throttle)
        }
        wg.Wait()
}

func f(i int, wg *sync.WaitGroup, throttle chan int) {
        defer wg.Done()
        // whatever processing
        println(i)
        <-throttle
}

детская площадка

Я бы, наверное, назвать throttle канал "пустышки". ИМХО это элегантный способ (это не мое изобретение, конечно), как ограничить параллелизм.

Кстати: обратите внимание, что вы игнорируете ошибки от cmd.Run().

попробуйте это: https://github.com/korovkin/limiter

 limiter := NewConcurrencyLimiter(10)
 limiter.Execute(func() {
        zenity(...) 
 })
 limiter.Wait()