Асинхронный IO в Scala с фьючерсами


допустим я получаю (потенциально большой) список изображений для загрузки с некоторых URL-адресов. Я использую Scala, так что я бы сделал это:

import scala.actors.Futures._

// Retrieve URLs from somewhere
val urls: List[String] = ...

// Download image (blocking operation)
val fimages: List[Future[...]] = urls.map (url => future { download url })

// Do something (display) when complete
fimages.foreach (_.foreach (display _))

Я немного новичок в Scala, так что это все еще выглядит немного как магия для меня:

  • это правильный способ сделать это? Любые альтернативы, если это не так?
  • если у меня есть 100 изображений для загрузки, это создаст 100 потоков сразу или будет использовать пул потоков?
  • будет последняя инструкция (display _) выполняется в основном потоке, а если нет, то как я могу убедиться, что это?

Спасибо за ваш совет!

3 66

3 ответа:

используйте фьючерсы в Scala 2.10. Они были совместной работой между командой Scala, командой Akka и Twitter для достижения более стандартизированного будущего API и реализации для использования в разных фреймворках. Мы только что опубликовали руководство по адресу:http://docs.scala-lang.org/overviews/core/futures.html

помимо полной неблокирующей (по умолчанию, хотя мы предоставляем возможность выполнять управляемые блокирующие операции) и композиционной, фьючерсы Scala 2.10 поставляются с неявным потоком бассейн для выполнения ваших задач, а также некоторые утилиты для управления тайм-ауты.

import scala.concurrent.{future, blocking, Future, Await, ExecutionContext.Implicits.global}
import scala.concurrent.duration._

// Retrieve URLs from somewhere
val urls: List[String] = ...

// Download image (blocking operation)
val imagesFuts: List[Future[...]] = urls.map {
  url => future { blocking { download url } }
}

// Do something (display) when complete
val futImages: Future[List[...]] = Future.sequence(imagesFuts)
Await.result(futImages, 10 seconds).foreach(display)

выше, мы сначала импортируем ряд вещей:

  • future: API для создания будущего.
  • blocking: API для управляемой блокировки.
  • Future: будущий сопутствующий объект, который содержит ряд полезных методов для коллекции фьючерсы.
  • Await: одноэлементный объект, используемый для блокировки на будущее (перенос его результата в текущий поток).
  • ExecutionContext.Implicits.global: глобальный пул потоков по умолчанию, пул ForkJoin.
  • duration._: утилиты для управления длительностью тайм-аутов.

imagesFuts остается во многом таким же, как и то, что вы изначально сделали - единственное отличие здесь заключается в том, что мы используем управляемую блокировку- blocking. Он уведомляет пул потоков, что блок кода, который вы передаете ему, содержит длительные или блокирующие операции. Это позволяет бассейну временно порождайте новых рабочих, чтобы убедиться, что никогда не произойдет, что все рабочие заблокированы. Это делается для предотвращения голодания (блокировки пула потоков) при блокировке приложений. Обратите внимание, что пул потоков также знает, когда код в управляемом блокирующем блоке завершен, поэтому он удалит запасной рабочий поток в этот момент, что означает, что пул будет сокращаться до ожидаемого размера.

(если вы хотите абсолютно предотвратить дополнительные потоки от когда-либо будучи созданным, вы должны использовать библиотеку AsyncIO, такую как библиотека Nio Java.)

затем мы используем методы сбора будущего сопутствующего объекта для преобразования imagesFuts С List[Future[...]] до Future[List[...]].

The

val all = Future.traverse(urls){ url =>
  val f = future(download url) /*(downloadContext)*/
  f.onComplete(display)(displayContext)
  f
}
Await.result(all, ...)
  1. используйте scala.параллельный.Будущее в 2.10, которое теперь RC.
  2. который использует неявный ExecutionContext
  3. новый будущий документ является явным, что onComplete (и foreach) может оценить немедленно, если значение доступно. Старые актеры будущего делают то же самое. В зависимости от того, что требуется для отображения, вы можете предоставить подходящий ExecutionContext (например, один исполнитель потока). Если вы просто хотите, чтобы основной поток ждал загрузка для завершения, траверс дает вам будущее, чтобы ждать дальше.
  1. Да, мне кажется, все в порядке, но вы можете исследовать более мощный twitter-util или Акка будущие API (Scala 2.10 будет иметь новую будущую библиотеку в этом стиле).

  2. Он использует пул потоков.

  3. нет, это не будет. Вам нужно использовать стандартный механизм вашего GUI toolkit для этого (SwingUtilities.invokeLater для качания или Display.asyncExec для SWT). Е. Г.

    fimages.foreach (_.foreach(im => SwingUtilities.invokeLater(new Runnable { display im })))