Это Акка задать блокируя текущий поток
У меня есть сценарий, в котором я должен получить сведения о пользователе по его id. Это HTTP-запрос, который приходит в мой уровень обработчика HTTP, я использую идентификатор, который я получаю из запроса, посылаю сообщение актору, который затем обращается к службе базы данных, чтобы получить пользователя.
Теперь, поскольку это HTTP-запрос, мне нужно удовлетворить запрос, отправив ответ обратно. Поэтому я подумал об использовании шаблона Akka ask, но у меня есть следующие вопросы в УМ:
-
Это будет блокировать мой текущий поток?
-
Использует задать шаблон для выборки пользователей в моем случае масштабируемое решение? Я имею в виду, что у меня может быть от нескольких сотен до миллиона пользователей, вызывающих эту конечную точку в любой момент времени. Это хорошая идея, чтобы использовать шаблон ask, чтобы получить пользователя?
В коде это выглядит так в моем http контроллере
val result: Future[Any] = userActor ? FetchUser(id)
В моем актере я бы сделал следующее:
case fetchUser: FetchUser => sender ! myService.getUser(fetchUser.id)
1 ответ:
Отвечая на ваши вопросы в том же порядке, в каком вы их задали:
- Нет, использование
?
не блокирует текущий поток. Он немедленно возвращает aFuture
. Однако результат в будущем может быть доступен не сразу.- Если вам нужно, чтобы решение было "масштабируемым", и ваша служба способна выполнять несколько параллельных запросов, то вам может потребоваться использовать пул акторов, чтобы вы могли обслуживать несколько
?
одновременно, или смотрите ниже только для фьючерсов, масштабируемых, решение.Фьючерсы Исключительно
Если ваши актеры не кэшируют какие-либо промежуточные значения, то вы можете просто использовать фьючерсы напрямую и избегать ригмарола актеров (например, реквизит, actorOf, receive,?, ...):
import java.util.concurrent.Executors import scala.concurrent.{ExecutionContext,Future} object ServicePool { private val myService = ??? val maxQueries = 11 //should come from a configuration file instead private val queryExecutionPool = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(maxQueries)) type ID = ??? /**Will only hit the DB with maxQueries at once.*/ def queryService(id : ID) = Future { myService getUser id }(queryExecutionPool) }//end object ServiceQuery
Теперь вы можете звонить
ServicePool.queryService
так часто, как вы хотите, но служба не будет поражена более чемmaxQueries
за один раз, и никаких актеров:val alotOfIDs : Seq[ID] = (1 to 1000000) map { i => ID(i)} val results = alotOfIDs map ServicePool.queryService