выполнение DAG подобных операций в scala Future


Я работаю над прецедентом использования, в котором я должен выполнить взаимозависимые операции (определенные как направленный ациклический граф), используя scala Future. В основном каждая операция (скажем, узел DAG) будет выполнена в будущем, и последующие зависимые узлы будут запущены (они тоже должны быть в будущем) после завершения текущего узла Future. Это будет продолжаться до тех пор, пока каждый узел не завершит обработку или один из них не выйдет из строя. Пока у меня есть (минимальный код):

def run(node: Node, result: Result): Unit = {
  val f: Future[(Node, Result)] = Future {
    // process current Node 
    ...
  }

  f onComplete {
    case Success(x) =>
      val n = x._1 // Current Node
      val r = x._2 // Result of current Node
      if (!n.isLeaf()) {
        n.children.foreach { z =>
          run(z, r)
        }
      } 
    case Failure(e) => throw e
  }
}

Является ли это правильным способом решения проблемы эта проблема (вызов другого будущего в обратном вызове)? Опять же, у меня нет надлежащего способа остановить другое запущенное будущее, как только один из узлов не справляется с обработкой.

Можно ли это решить с помощью будущей композиции? Если да, то как я могу этого достичь?

Спасибо,
Правин

1 2

1 ответ:

Вот более функциональный подход: вместо использования Unit в результате оценки run/Future мы можем иметь общий тип. Обычно вы хотели бы работать с результатами Future функционально, а не с его побочными эффектами.

Я добавил аннотации типов и описательные имена переменных, чтобы было легче понять. Я также добавил несколько случаев, чтобы показать, как это не сработает. Вы также можете выбрать, чтобы восстановить, а не провалить все, когда происходит сбой. Однако, для этой проблемы, если дочернее вычисление полагается на родительское значение, вероятно, более разумно потерпеть неудачу.

import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Try

case class Node[T](value: T, children: List[Node[T]])

object DagFuture extends App {

  def run[A, B](node: Node[A], result: B)(nodeEval: (Node[A], B) => B)(aggregator: List[B] => B): Future[B] = {
    val nodeResult: Future[B] = Future(nodeEval(node, result))
    val allResults: Future[List[B]] = nodeResult.flatMap(r => Future.sequence(nodeResult :: node.children.map(x => run(x, r)(nodeEval)(aggregator))))
    val finalResult: Future[B] = allResults.map(cl => aggregator(cl))
    finalResult
  }

  val debugSum = (l: List[Int]) => {
    println(s"aggregating: $l")
    l.sum
  }

  def debugNodeEval(f: (Node[Int], Int) => Int)(n: Node[Int], r: Int) = {
    val eval = Try { f(n, r) }
    println(s"node: $n, result: $r, eval: $eval")
    eval.get
  }

  val debugNodeEvalDefault = debugNodeEval((n, r) => n.value + r) _

  val singleNodeDag = Node(1, Nil)
  val multiNodeDag = Node(1, List(Node(20, Nil), Node(300, Nil)))

  println("\nSINGLE NODE DAG EXAMPLE:")
  val singleNodeFuture = run(singleNodeDag, 0)(debugNodeEvalDefault)(debugSum)
  val singleNodeResult = Await.result(singleNodeFuture, 5 seconds)
  println(s"Single node result: $singleNodeResult")

  println("\nDAG PATH LENGTH EXAMPLE:")
  val pathLengthFuture = run(multiNodeDag, 0)(debugNodeEvalDefault)(debugSum)
  val pathLengthResult = Await.result(pathLengthFuture, 5 seconds)
  println(s"Path length: $pathLengthResult")

  println("\nFAILED DAG ROOT NODE EXAMPLE:")
  val failedRootNodeFuture = run(multiNodeDag, 0)(debugNodeEval((n, r) => throw new Exception))(debugSum)
  val failedRootNodePromise = Await.ready(failedRootNodeFuture, 5 seconds)
  println(s"Failed root node: ${failedRootNodePromise.value}")

  println("\nFAILED DAG CHILD NODE EXAMPLE:")
  val failedChildNodeFuture = run(multiNodeDag, 0)(debugNodeEval((n, r) => if (n.value == 300) throw new Exception else n.value + r))(debugSum)
  val failedChildNodePromise = Await.ready(failedChildNodeFuture, 5 seconds)
  println(s"Failed child node: ${failedChildNodePromise.value}")
}

Выводит следующее:

SINGLE NODE DAG EXAMPLE:
node: Node(1,List()), result: 0, eval: Success(1)
aggregating: List(1)
Single node result: 1

DAG PATH LENGTH EXAMPLE:
node: Node(1,List(Node(20,List()), Node(300,List()))), result: 0, eval: Success(1)
node: Node(20,List()), result: 1, eval: Success(21)
node: Node(300,List()), result: 1, eval: Success(301)
aggregating: List(301)
aggregating: List(21)
aggregating: List(1, 21, 301)
Path length: 323

FAILED DAG ROOT NODE EXAMPLE:
node: Node(1,List(Node(20,List()), Node(300,List()))), result: 0, eval: Failure(java.lang.Exception)
Failed root node: Some(Failure(java.lang.Exception))

FAILED DAG CHILD NODE EXAMPLE:
node: Node(1,List(Node(20,List()), Node(300,List()))), result: 0, eval: Success(1)
node: Node(20,List()), result: 1, eval: Success(21)
aggregating: List(21)
node: Node(300,List()), result: 1, eval: Failure(java.lang.Exception)
Failed child node: Some(Failure(java.lang.Exception))

TL; DR

def run[A, B](node: Node[A], result: B)(nodeEval: (Node[A], B) => B)(aggregator: Traversable[B] => B): Future[B] = {
    val nodeResult = Future(nodeEval(node, result))
    val allResults = nodeResult flatMap { r => Future.sequence(nodeResult :: node.children.map { x => run(x, r)(nodeEval)(aggregator) }) }
    allResults map aggregator
  }

Грубо говоря, это Просто Future.flatMap(result => Future.sequence(children ...)). Когда родитель Future завершает вычисление, его результат передается в flatMap дочерним вычислениям. Если родитель Future терпит неудачу, то все вычисления также терпят неудачу. sequence объединяет результат из списка Futures в один Future. Ребенок Future является родителем для своих детей, и так далее рекурсивно. Таким образом, применяется тот же режим отказа.