Обрабатывать первый элемент Акка трансляция специально


Существует ли идиоматический способ обработки первого элемента потока Akka Source особым образом? То, что у меня сейчас есть:

    var firstHandled = false
    source.map { elem =>
      if(!firstHandled) {
        //handle specially
        firstHandled = true
      } else {
        //handle normally
      }
    }

Спасибо

2 6

2 ответа:

Хотя я обычно соглашусь с ответом Рамона, вы также можете использовать prefixAndTail, с префиксом 1, вместе с flatMapConcat чтобы достичь чего-то подобного:

val src = Source(List(1, 2, 3, 4, 5))
val fst = Flow[Int].map(i => s"First: $i")
val rst = Flow[Int].map(i => s"Rest:  $i")

val together = src.prefixAndTail(1).flatMapConcat { case (head, tail) =>
  // `head` is a Seq of the prefix elements, which in our case is
  // just the first one. We can convert it to a source of just
  // the first element, processed via our fst flow, and then
  // concatenate `tail`, which is the remainder...
  Source(head).via(fst).concat(tail.via(rst))
}

Await.result(together.runForeach(println), 10.seconds)
// First: 1
// Rest:  2
// Rest:  3
// Rest:  4
// Rest:  5
Это, конечно, работает не только для первого элемента, но и для первыхN элементов, с условием, что эти элементы будут взяты как строгая коллекция.

Использование zipWith

Вы можете сжать оригинал Source с источником булевых символов, который возвращает только true в первый раз. Затем этот архивированный источник может быть обработан.

Сначала нам понадобится источник, который выдает логические значения:

//true, false, false, false, ...
def firstTrueIterator() : Iterator[Boolean] = 
  (Iterator single true) ++ (Iterator continually false)

def firstTrueSource : Source[Boolean, _] = 
  Source fromIterator firstTrueIterator
Затем мы можем определить функцию, которая обрабатывает два различных случая:
type Data = ???
type OutputData = ???

def processData(data : Data, firstRun : Boolean) : OutputData = 
  if(firstRun) { ... }
  else { ... }

Эта функция затем может быть использована в zipWith из вашего первоисточника:

val originalSource : Source[Data,_] = ???    

val contingentSource : Source[OutputData,_] =
  originalSource.zipWith(firstTrueSource)(processData)

Использование Stateful Поток

Вы можете создать Flow, который содержит состояние, подобное примеру в вопросе, но с более функциональным подходом:

def firstRunner(firstCall : (Data) => OutputData,
                otherCalls : (Data) => OutputData) : (Data) => OutputData = {
  var firstRun = true
  (data : Data) => {
    if(firstRun) {
      firstRun = false
      firstCall(data)
    }
    else
      otherCalls(data)
  }
}//end def firstRunner

def firstRunFlow(firstCall :  (Data) => OutputData, 
                 otherCalls : (Data) => OutputData) : Flow[Data, OutputData, _] = 
  Flow[Data] map firstRunner(firstCall, otherCalls)

Затем этот поток можно применить к исходному источнику:

def firstElementFunc(data : Data) : OutputData = ???
def remainingElsFunc(data : Data) : OutputData = ???

val firstSource : Source[OutputData, _] = 
  originalSource via firstRunFlow(firstElementFunc,remainingElseFunc)

"Идиоматический Способ"

Ответ на ваш вопрос напрямую требует диктовки "идиоматического способа". Я отвечаю на эту часть последней, потому что она наименее проверяема компилятором и поэтому ближе к мнению. Я бы никогда не стал утверждать, что я ... допустимый классификатор идиоматического кода. Мой личный опыт работы с Akka-streams показал, что лучше всего переключить свою перспективу на представление реального потока (я думаю о поезде с товарными вагонами) из элементов Data. Нужно ли разбивать его на несколько поездов фиксированного размера? Проходят ли через него только некоторые товарные вагоны? Могу ли я прикрепить другой поезд бок о бок, который содержит вагоны Boolean, которые могут сигнализировать о фронте? Я бы предпочел метод zipWith из-за моего отношения к потокам (поездам). Мой первоначальный подход всегда заключается в использовании других частей потока, соединенных вместе.

Кроме того, я считаю, что лучше всего встраивать как можно меньше кода в компонент потока akka. firstTrueIterator и processData вообще не зависят от акка. В то же время определения firstTrueSource и contingentSource практически не имеют логики. Это позволяет проверить логику, независимую от неуклюжей актерской системы, и кишки могут быть использованы в фьючерсах или актерах.