Бассейн рабочих с ручьями Акка


Как описано в документации Akka streams я попытался создать пул рабочих (потоков):

def balancer[In, Out](worker: Flow[In, Out, NotUsed], workerCount: Int): Flow[In, Out, NotUsed] = {
    import GraphDSL.Implicits._

    Flow.fromGraph(GraphDSL.create() { implicit b =>
      val balancer = b.add(Balance[In](workerCount))
      val merge = b.add(Merge[Out](workerCount))

      for (_ <- 1 to workerCount) {
        balancer ~> worker ~> merge
      }
      FlowShape(balancer.in, merge.out)
    })
  }

Затем я использовал эту функцию для параллельного запуска потока:

def main(args: Array[String]) {
    val system = ActorSystem()
    implicit val mat = ActorMaterializer.create(system)

    val flow = Flow[Int].map(e => {
      println(e)
      Thread.sleep(1000) // 1 second
      e
    })

    Source(Range.apply(1, 10).toList)
      .via(balancer(flow, 3))
      .runForeach(e => {})
  }

Я получаю ожидаемый результат 1, 2, 3, 4, 5, 6, 7, 8, 9, но числа появляются со скоростью 1 в секунду (без параллелизма). Что я делаю не так?

2 5

2 ответа:

Документы в этом разделе устарели, что будет исправлено в следующем выпуске. В принципе, все, что вам нужно, - это вызвать .async на самом потоке. Делая это, вы как бы рисуете "коробку" вокруг потока (которую вы можете представить как коробку с одним входным и выходным портом), которая предотвратит слияние через эту коробку. Делая это, по существу, все рабочие будут на преданных актеров. Остальная часть графика (этапы трансляции и слияния) будет совместно использоваться другим актором (они не будут выполняться на отдельных акторах, асинхронная коробка только защищает поток, вещи снаружи все равно будут слиты).

Как указывал Эндре Варга, сам поток должен быть помечен символом .async.

Но даже в этом случае поведение не является детерминированным, поскольку асинхронные этапы имеют размер буфера по умолчанию 16, и балансировщик может отправлять все сообщения одному и тому же работнику.

В результате, balancer ~> worker.async.addAttributes(Attributes.inputBuffer(1, 1)) ~> merge приведет к желаемому поведению.

Ответ, данный участником проекта, см.: https://github.com/akka/akka/issues/20146#issuecomment-201381356