Wie kann man auf mehrere Futures warten?

86

Angenommen , ich habe mehrere Futures und müssen warten , bis entweder einer von ihnen ausfällt oder alle von ihnen erfolgreich zu sein.

Zum Beispiel: Let gibt es 3 - Futures: f1, f2, f3.

  • Wenn dies f1erfolgreich ist und f2fehlschlägt, warte ich nicht darauf f3(und gebe den Fehler an den Client zurück).

  • Wenn dies f2fehlschlägt f1und f3noch ausgeführt wird, warte ich nicht auf sie (und gebe einen Fehler zurück ).

  • Wenn es f1gelingt und dann f2gelingt, warte ich weiter f3.

Wie würden Sie es implementieren?

Michael
quelle
ein Scala-Problem zu dieser Frage. Issues.scala-lang.org/browse/SI-8994 Die API sollte eine Option für verschiedene Verhaltensweisen haben
WeiChing

Antworten:

82

Sie können stattdessen ein Verständnis wie folgt verwenden:

val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}

val aggFut = for{
  f1Result <- fut1
  f2Result <- fut2
  f3Result <- fut3
} yield (f1Result, f2Result, f3Result)

In diesem Beispiel werden die Futures 1, 2 und 3 parallel gestartet. Dann warten wir zum Verständnis, bis die Ergebnisse 1 und dann 2 und dann 3 verfügbar sind. Wenn entweder 1 oder 2 fehlschlägt, werden wir nicht mehr auf 3 warten. Wenn alle 3 erfolgreich sind, enthält der aggFutWert ein Tupel mit 3 Slots, das den Ergebnissen der 3 Futures entspricht.

Wenn Sie nun das Verhalten benötigen, bei dem Sie aufhören möchten zu warten, wenn sagen, dass fut2 zuerst fehlschlägt, werden die Dinge etwas schwieriger. Im obigen Beispiel müssten Sie warten, bis fut1 abgeschlossen ist, bevor Sie feststellen, dass fut2 fehlgeschlagen ist. Um das zu lösen, können Sie Folgendes versuchen:

  val fut1 = Future{Thread.sleep(3000);1}
  val fut2 = Promise.failed(new RuntimeException("boo")).future
  val fut3 = Future{Thread.sleep(1000);3}

  def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
    val fut = if (futures.size == 1) futures.head._2
    else Future.firstCompletedOf(futures.values)

    fut onComplete{
      case Success(value) if (futures.size == 1)=> 
        prom.success(value :: values)

      case Success(value) =>
        processFutures(futures - value, value :: values, prom)

      case Failure(ex) => prom.failure(ex)
    }
    prom.future
  }

  val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
  aggFut onComplete{
    case value => println(value)
  }

Dies funktioniert nun korrekt, aber das Problem besteht darin, zu wissen, welche Futurezu entfernen sind, Mapwenn eine erfolgreich abgeschlossen wurde. Solange Sie eine Möglichkeit haben, ein Ergebnis richtig mit der Zukunft zu korrelieren, die dieses Ergebnis hervorgebracht hat, funktioniert so etwas. Es entfernt nur rekursiv abgeschlossene Futures von der Karte und ruft dann Future.firstCompletedOfdie verbleibenden auf, Futuresbis keine mehr übrig sind, und sammelt die Ergebnisse auf dem Weg. Es ist nicht schön, aber wenn Sie wirklich das Verhalten brauchen, über das Sie sprechen, dann könnte dies oder etwas Ähnliches funktionieren.

cmbaxter
quelle
Danke dir. Was passiert, wenn fut2es vorher fehlschlägt fut1? Warten wir fut1in diesem Fall noch? Wenn wir wollen, ist es nicht genau das, was ich will.
Michael
Aber wenn 3 zuerst fehlschlägt, warten wir immer noch auf 1 und 2, wenn wir früh zurückkehren können. Gibt es eine Möglichkeit, dies zu tun, ohne die Futures sequenzieren zu müssen?
Der archetypische Paul
Sie können einen onFailureInstallationsprozedur für fut2schnell zum Scheitern verurteilt, und ein onSuccessauf aggFutbis Griff Erfolg. Ein Erfolg bei aggFutImplikationen fut2wurde erfolgreich abgeschlossen, sodass nur einer der Handler aufgerufen wird.
pagoda_5b
Ich habe meiner Antwort etwas mehr hinzugefügt, um eine mögliche Lösung für ein schnelles Versagen aufzuzeigen, wenn eine der Futures versagt.
cmbaxter
1
In Ihrem ersten Beispiel werden 1 2 und 3 nicht parallel, sondern seriell ausgeführt. Probieren Sie es mit Printlines aus und sehen Sie
bwawok
35

Sie können ein Versprechen verwenden und ihm entweder den ersten Fehler oder den endgültig abgeschlossenen aggregierten Erfolg senden:

def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
  val p = Promise[M[A]]()

  // the first Future to fail completes the promise
  in.foreach(_.onFailure{case i => p.tryFailure(i)})

  // if the whole sequence succeeds (i.e. no failures)
  // then the promise is completed with the aggregated success
  Future.sequence(in).foreach(p trySuccess _)

  p.future
}

Dann können Sie Awaitauf das Ergebnis, Futurewenn Sie blockieren möchten, oder nur mapin etwas anderes.

Der Unterschied zum Verständnis besteht darin, dass hier der Fehler des ersten fehlschlägt, während beim Verständnis der erste Fehler in der Durchlaufreihenfolge der Eingabesammlung angezeigt wird (selbst wenn ein anderer zuerst fehlgeschlagen ist). Beispielsweise:

val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }

Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)}
// this waits one second, then prints "java.lang.ArithmeticException: / by zero"
// the first to fail in traversal order

Und:

val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }

sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)}
// this immediately prints "java.util.NoSuchElementException: None.get"
// the 'actual' first to fail (usually...)
// and it returns early (it does not wait 1 sec)
Gourlaysama
quelle
7

Hier ist eine Lösung ohne Schauspieler.

import scala.util._
import scala.concurrent._
import java.util.concurrent.atomic.AtomicInteger

// Nondeterministic.
// If any failure, return it immediately, else return the final success.
def allSucceed[T](fs: Future[T]*): Future[T] = {
  val remaining = new AtomicInteger(fs.length)

  val p = promise[T]

  fs foreach {
    _ onComplete {
      case s @ Success(_) => {
        if (remaining.decrementAndGet() == 0) {
          // Arbitrarily return the final success
          p tryComplete s
        }
      }
      case f @ Failure(_) => {
        p tryComplete f
      }
    }
  }

  p.future
}
FranklinChen
quelle
5

Sie können dies nur mit Futures tun. Hier ist eine Implementierung. Beachten Sie, dass die Ausführung nicht vorzeitig beendet wird! In diesem Fall müssen Sie etwas Anspruchsvolleres tun (und die Unterbrechung wahrscheinlich selbst implementieren). Wenn Sie jedoch nicht weiter auf etwas warten möchten, das nicht funktioniert, müssen Sie darauf warten, dass das erste fertig ist, und aufhören, wenn entweder nichts mehr übrig ist oder Sie eine Ausnahme haben:

import scala.annotation.tailrec
import scala.util.{Try, Success, Failure}
import scala.concurrent._
import scala.concurrent.duration.Duration
import ExecutionContext.Implicits.global

@tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()): 
Either[Throwable, Seq[A]] = {
  val first = Future.firstCompletedOf(fs)
  Await.ready(first, Duration.Inf).value match {
    case None => awaitSuccess(fs, done)  // Shouldn't happen!
    case Some(Failure(e)) => Left(e)
    case Some(Success(_)) =>
      val (complete, running) = fs.partition(_.isCompleted)
      val answers = complete.flatMap(_.value)
      answers.find(_.isFailure) match {
        case Some(Failure(e)) => Left(e)
        case _ =>
          if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done)
          else Right( answers.map(_.get) ++: done )
      }
  }
}

Hier ist ein Beispiel dafür in Aktion, wenn alles in Ordnung ist:

scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
  Future{ Thread.sleep(1000); println("Fancy meeting you here!") },
  Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
Fancy meeting you here!
Bye!
res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))

Aber wenn etwas schief geht:

scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
  Future{ Thread.sleep(1000); throw new Exception("boo"); () }, 
  Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo)

scala> Bye!
Rex Kerr
quelle
1
Schöne Umsetzung. Aber beachten Sie, dass, wenn Sie eine leere Folge von Futures passieren, um auf Erfolg zu warten, es für immer wartet ...
Michael Rueegg
5

Zu diesem Zweck würde ich einen Akka-Schauspieler verwenden. Im Gegensatz zum For-Understanding schlägt es fehl, sobald eine der Futures versagt, und ist daher in diesem Sinne etwas effizienter.

class ResultCombiner(futs: Future[_]*) extends Actor {

  var origSender: ActorRef = null
  var futsRemaining: Set[Future[_]] = futs.toSet

  override def receive = {
    case () =>
      origSender = sender
      for(f <- futs)
        f.onComplete(result => self ! if(result.isSuccess) f else false)
    case false =>
      origSender ! SomethingFailed
    case f: Future[_] =>
      futsRemaining -= f
      if(futsRemaining.isEmpty) origSender ! EverythingSucceeded
  }

}

sealed trait Result
case object SomethingFailed extends Result
case object EverythingSucceeded extends Result

Erstellen Sie dann den Akteur, senden Sie ihm eine Nachricht (damit er weiß, wohin er seine Antwort senden soll) und warten Sie auf eine Antwort.

val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3)))
try {
  val f4: Future[Result] = actor ? ()
  implicit val timeout = new Timeout(30 seconds) // or whatever
  Await.result(f4, timeout.duration).asInstanceOf[Result] match {
    case SomethingFailed => println("Oh noes!")
    case EverythingSucceeded => println("It all worked!")
  }
} finally {
  // Avoid memory leaks: destroy the actor
  actor ! PoisonPill
}
Robin Green
quelle
Sieht für eine so einfache Aufgabe etwas zu komplex aus. Brauche ich wirklich einen Schauspieler, der nur auf die Zukunft wartet? Danke trotzdem.
Michael
1
Ich konnte keine geeignete Methode in der API finden, die genau das kann, was Sie wollen, aber vielleicht habe ich etwas verpasst.
Robin Green
5

Diese Frage wurde beantwortet, aber ich veröffentliche meine Wertklassenlösung (Wertklassen wurden in 2.10 hinzugefügt), da hier keine vorhanden ist. Bitte zögern Sie nicht zu kritisieren.

  implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal {
    def concurrently = ConcurrentFuture(self)
  }
  case class ConcurrentFuture[A](future: Future[A]) extends AnyVal {
    def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future))
    def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class
  }
  def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = {
    val p = Promise[B]()
    val inner = f(outer.future)
    inner.future onFailure { case t => p.tryFailure(t) }
    outer.future onFailure { case t => p.tryFailure(t) }
    inner.future onSuccess { case b => p.trySuccess(b) }
    ConcurrentFuture(p.future)
  }

ConcurrentFuture ist ein Future-Wrapper ohne Overhead, der die Standard-Future-Map / FlatMap von Do-this-then-that in Combine-All-and-Fail-If-Any-Fail ändert. Verwendung:

def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 }
def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" }
def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 }

val f : Future[(Int,String,Double)] = {
  for {
    f1 <- func1.concurrently
    f2 <- func2.concurrently
    f3 <- func3.concurrently
  } yield for {
   v1 <- f1
   v2 <- f2
   v3 <- f3
  } yield (v1,v2,v3)
}.future
f.onFailure { case t => println("future failed $t") }

Im obigen Beispiel werden f1, f2 und f3 gleichzeitig ausgeführt, und wenn ein Fehler in einer beliebigen Reihenfolge auftritt, schlägt die Zukunft des Tupels sofort fehl.

Lancegatlin
quelle
Genial! Gibt es eine Bibliothek, die diese Art von Dienstprogrammfunktion bietet?
Srirachapills
1
Ja, ich habe seitdem ein umfangreiches Future-Dienstprogramm lib erstellt: github.com/S-Mach/s_mach.concurrent Siehe async.par im Beispielcode.
Lancegatlin
2

Sie können dies verwenden:

val l = List(1, 6, 8)

val f = l.map{
  i => future {
    println("future " +i)
    Thread.sleep(i* 1000)
    if (i == 12)
      throw new Exception("6 is not legal.")
    i
  }
}

val f1 = Future.sequence(f)

f1 onSuccess{
  case l => {
    logInfo("onSuccess")
    l.foreach(i => {

      logInfo("h : " + i)

    })
  }
}

f1 onFailure{
  case l => {
    logInfo("onFailure")
  }
igreenfield
quelle