Обрабатывать первый элемент Акка трансляция специально
Существует ли идиоматический способ обработки первого элемента потока Akka Source
особым образом? То, что у меня сейчас есть:
var firstHandled = false
source.map { elem =>
if(!firstHandled) {
//handle specially
firstHandled = true
} else {
//handle normally
}
}
Спасибо
2 ответа:
Хотя я обычно соглашусь с ответом Рамона, вы также можете использовать
prefixAndTail
, с префиксом 1, вместе сflatMapConcat
чтобы достичь чего-то подобного:Это, конечно, работает не только для первого элемента, но и для первыхN элементов, с условием, что эти элементы будут взяты как строгая коллекция.val src = Source(List(1, 2, 3, 4, 5)) val fst = Flow[Int].map(i => s"First: $i") val rst = Flow[Int].map(i => s"Rest: $i") val together = src.prefixAndTail(1).flatMapConcat { case (head, tail) => // `head` is a Seq of the prefix elements, which in our case is // just the first one. We can convert it to a source of just // the first element, processed via our fst flow, and then // concatenate `tail`, which is the remainder... Source(head).via(fst).concat(tail.via(rst)) } Await.result(together.runForeach(println), 10.seconds) // First: 1 // Rest: 2 // Rest: 3 // Rest: 4 // Rest: 5
Использование zipWith
Вы можете сжать оригинал
Source
с источником булевых символов, который возвращает толькоtrue
в первый раз. Затем этот архивированный источник может быть обработан.Сначала нам понадобится источник, который выдает логические значения:
Затем мы можем определить функцию, которая обрабатывает два различных случая://true, false, false, false, ... def firstTrueIterator() : Iterator[Boolean] = (Iterator single true) ++ (Iterator continually false) def firstTrueSource : Source[Boolean, _] = Source fromIterator firstTrueIterator
type Data = ??? type OutputData = ??? def processData(data : Data, firstRun : Boolean) : OutputData = if(firstRun) { ... } else { ... }
Эта функция затем может быть использована в
zipWith
из вашего первоисточника:val originalSource : Source[Data,_] = ??? val contingentSource : Source[OutputData,_] = originalSource.zipWith(firstTrueSource)(processData)
Использование Stateful Поток
Вы можете создать
Flow
, который содержит состояние, подобное примеру в вопросе, но с более функциональным подходом:def firstRunner(firstCall : (Data) => OutputData, otherCalls : (Data) => OutputData) : (Data) => OutputData = { var firstRun = true (data : Data) => { if(firstRun) { firstRun = false firstCall(data) } else otherCalls(data) } }//end def firstRunner def firstRunFlow(firstCall : (Data) => OutputData, otherCalls : (Data) => OutputData) : Flow[Data, OutputData, _] = Flow[Data] map firstRunner(firstCall, otherCalls)
Затем этот поток можно применить к исходному источнику:
def firstElementFunc(data : Data) : OutputData = ??? def remainingElsFunc(data : Data) : OutputData = ??? val firstSource : Source[OutputData, _] = originalSource via firstRunFlow(firstElementFunc,remainingElseFunc)
"Идиоматический Способ"
Ответ на ваш вопрос напрямую требует диктовки "идиоматического способа". Я отвечаю на эту часть последней, потому что она наименее проверяема компилятором и поэтому ближе к мнению. Я бы никогда не стал утверждать, что я ... допустимый классификатор идиоматического кода. Мой личный опыт работы с Akka-streams показал, что лучше всего переключить свою перспективу на представление реального потока (я думаю о поезде с товарными вагонами) из элементовData
. Нужно ли разбивать его на несколько поездов фиксированного размера? Проходят ли через него только некоторые товарные вагоны? Могу ли я прикрепить другой поезд бок о бок, который содержит вагоныBoolean
, которые могут сигнализировать о фронте? Я бы предпочел метод zipWith из-за моего отношения к потокам (поездам). Мой первоначальный подход всегда заключается в использовании других частей потока, соединенных вместе.Кроме того, я считаю, что лучше всего встраивать как можно меньше кода в компонент потока akka.
firstTrueIterator
иprocessData
вообще не зависят от акка. В то же время определенияfirstTrueSource
иcontingentSource
практически не имеют логики. Это позволяет проверить логику, независимую от неуклюжей актерской системы, и кишки могут быть использованы в фьючерсах или актерах.