Akka-Http Websockets: как отправлять потребителям один и тот же поток данных
У меня есть WebSocket, к которому клиенты могут подключиться, у меня также есть поток данных с помощью akka-streams. Как я могу сделать так, чтобы все клиенты получали одни и те же данные? В данный момент они, похоже, гонятся за данными.
Спасибо
1 ответ:
Один из способов, который вы могли бы сделать, это иметь актера, который расширяет ActorPublisher и подписывает его к какому-то посланию.
class MyPublisher extends ActorPublisher[MyData]{ override def preStart = { context.system.eventStream.subscribe(self, classOf[MyData]) } override def receive: Receive = { case msg: MyData ⇒ if (isActive && totalDemand > 0) { // Pushes the message onto the stream onNext(msg) } } } object MyPublisher { def props(implicit ctx: ExecutionContext): Props = Props(new MyPublisher()) } case class MyData(data:String)
Затем вы можете использовать этого актора в качестве источника для потока:
val dataSource = Source.actorPublisher[MyData](MyPublisher.props(someExcutionContext))
Затем вы можете создать поток из этого источника данных и применить преобразование для преобразования данных в сообщение websocket
val myFlow = Flow.fromSinkAndSource(Sink.ignore, dataSource map {d => TextMessage.Strict(d.data)})
Затем вы можете использовать этот поток в обработке маршрута.
path("readings") { handleWebsocketMessages(myFlow) }
После обработки исходного потока вы можете опубликовать данные в поток событий и любой экземпляр этого актора поднимет его и поместит в поток, из которого обслуживается их websocket.
Каждый сокет будет иметь свой собственный экземпляр актора, чтобы обеспечить его данными, поступающими из одного источника.val actorSystem = ActorSystem("foo") val otherSource = Source.fromIterator(() => List(MyData("a"), MyData("b")).iterator) otherSource.runForeach { msg ⇒ actorSystem.eventStream.publish(MyData("data"))}