Akka-Http Websockets: как отправлять потребителям один и тот же поток данных


У меня есть WebSocket, к которому клиенты могут подключиться, у меня также есть поток данных с помощью akka-streams. Как я могу сделать так, чтобы все клиенты получали одни и те же данные? В данный момент они, похоже, гонятся за данными.

Спасибо

1 3

1 ответ:

Один из способов, который вы могли бы сделать, это иметь актера, который расширяет ActorPublisher и подписывает его к какому-то посланию.

class MyPublisher extends ActorPublisher[MyData]{

  override def preStart = {
    context.system.eventStream.subscribe(self, classOf[MyData])
  }

  override def receive: Receive = {

    case msg: MyData ⇒
      if (isActive && totalDemand > 0) {
        // Pushes the message onto the stream
        onNext(msg)
      }
  }
}

object MyPublisher {
  def props(implicit ctx: ExecutionContext): Props = Props(new MyPublisher())
}

case class MyData(data:String)

Затем вы можете использовать этого актора в качестве источника для потока:

val dataSource = Source.actorPublisher[MyData](MyPublisher.props(someExcutionContext))

Затем вы можете создать поток из этого источника данных и применить преобразование для преобразования данных в сообщение websocket

val myFlow = Flow.fromSinkAndSource(Sink.ignore, dataSource map {d => TextMessage.Strict(d.data)})

Затем вы можете использовать этот поток в обработке маршрута.

path("readings") {
  handleWebsocketMessages(myFlow)
} 

После обработки исходного потока вы можете опубликовать данные в поток событий и любой экземпляр этого актора поднимет его и поместит в поток, из которого обслуживается их websocket.

  val actorSystem = ActorSystem("foo")

  val otherSource = Source.fromIterator(()  => List(MyData("a"), MyData("b")).iterator)

  otherSource.runForeach { msg ⇒ actorSystem.eventStream.publish(MyData("data"))}
Каждый сокет будет иметь свой собственный экземпляр актора, чтобы обеспечить его данными, поступающими из одного источника.