Это Акка задать блокируя текущий поток


У меня есть сценарий, в котором я должен получить сведения о пользователе по его id. Это HTTP-запрос, который приходит в мой уровень обработчика HTTP, я использую идентификатор, который я получаю из запроса, посылаю сообщение актору, который затем обращается к службе базы данных, чтобы получить пользователя.

Теперь, поскольку это HTTP-запрос, мне нужно удовлетворить запрос, отправив ответ обратно. Поэтому я подумал об использовании шаблона Akka ask, но у меня есть следующие вопросы в УМ:

  1. Это будет блокировать мой текущий поток?

  2. Использует задать шаблон для выборки пользователей в моем случае масштабируемое решение? Я имею в виду, что у меня может быть от нескольких сотен до миллиона пользователей, вызывающих эту конечную точку в любой момент времени. Это хорошая идея, чтобы использовать шаблон ask, чтобы получить пользователя?

В коде это выглядит так в моем http контроллере

val result: Future[Any] = userActor ? FetchUser(id)

В моем актере я бы сделал следующее:

case fetchUser: FetchUser => sender ! myService.getUser(fetchUser.id)
1 2

1 ответ:

Отвечая на ваши вопросы в том же порядке, в каком вы их задали:

  1. Нет, использование ? не блокирует текущий поток. Он немедленно возвращает a Future. Однако результат в будущем может быть доступен не сразу.
  2. Если вам нужно, чтобы решение было "масштабируемым", и ваша служба способна выполнять несколько параллельных запросов, то вам может потребоваться использовать пул акторов, чтобы вы могли обслуживать несколько ? одновременно, или смотрите ниже только для фьючерсов, масштабируемых, решение.

Фьючерсы Исключительно

Если ваши актеры не кэшируют какие-либо промежуточные значения, то вы можете просто использовать фьючерсы напрямую и избегать ригмарола актеров (например, реквизит, actorOf, receive,?, ...):

import java.util.concurrent.Executors

import scala.concurrent.{ExecutionContext,Future}    

object ServicePool {      

  private val myService = ???

  val maxQueries = 11 //should come from a configuration file instead

  private val queryExecutionPool = 
    ExecutionContext.fromExecutor(Executors.newFixedThreadPool(maxQueries))

  type ID = ???

  /**Will only hit the DB with maxQueries at once.*/
  def queryService(id : ID) = 
    Future { myService getUser id }(queryExecutionPool)

}//end object ServiceQuery

Теперь вы можете звонить ServicePool.queryService так часто, как вы хотите, но служба не будет поражена более чем maxQueries за один раз, и никаких актеров:

val alotOfIDs : Seq[ID] = (1 to 1000000) map { i => ID(i)}

val results = alotOfIDs map ServicePool.queryService