Scala: Liste [Zukunft] bis Zukunft [Liste] ohne Berücksichtigung fehlgeschlagener Futures

116

Ich suche nach einer Möglichkeit, eine Liste von Futures beliebiger Länge in eine Future of List umzuwandeln. Ich benutze Playframework, also möchte ich letztendlich wirklich ein Future[Result], aber um die Dinge einfacher zu machen, sagen wir einfach. Future[List[Int]]Der normale Weg, dies zu tun, wäre zu verwenden, Future.sequence(...)aber es gibt eine Wendung ... Die Liste, die ich normalerweise bekomme, hat Es gibt ungefähr 10 bis 20 Futures, und es ist nicht ungewöhnlich, dass eine dieser Futures fehlschlägt (sie stellen externe Webdienstanfragen). Anstatt alle erneut versuchen zu müssen, falls einer von ihnen ausfällt, möchte ich in der Lage sein, diejenigen zu finden, die erfolgreich waren, und diese zurückzugeben.

Das Folgende funktioniert beispielsweise nicht

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Success
import scala.util.Failure

val listOfFutures = Future.successful(1) :: Future.failed(new Exception("Failure")) :: 
                    Future.successful(3) :: Nil

val futureOfList = Future.sequence(listOfFutures)

futureOfList onComplete {
  case Success(x) => println("Success!!! " + x)
  case Failure(ex) => println("Failed !!! " + ex)
}

scala> Failed !!! java.lang.Exception: Failure

Anstatt die einzige Ausnahme zu bekommen, möchte ich in der Lage sein, die 1 und 3 da rauszuziehen. Ich habe es versucht Future.fold, aber das ruft anscheinend nur Future.sequencehinter die Kulissen.

Vielen Dank im Voraus für die Hilfe!

Joe
quelle

Antworten:

146

Der Trick besteht darin, zunächst sicherzustellen, dass keine der Futures fehlgeschlagen ist. .recoverIst Ihr Freund hier, können Sie es kombinieren map, um alle Future[T]Ergebnisse in Future[Try[T]]]Instanzen umzuwandeln , von denen alle sicher sind, dass sie eine erfolgreiche Zukunft darstellen.

Hinweis: Sie können Optionoder Eitherauch hier verwenden, Tryist jedoch der sauberste Weg, wenn Sie speziell Ausnahmen abfangen möchten

def futureToFutureTry[T](f: Future[T]): Future[Try[T]] =
  f.map(Success(_)).recover { case x => Failure(x)}

val listOfFutures = ...
val listOfFutureTrys = listOfFutures.map(futureToFutureTry(_))

Dann verwenden Sie Future.sequencewie zuvor, um Ihnen eine zu gebenFuture[List[Try[T]]]

val futureListOfTrys = Future.sequence(listOfFutureTrys)

Dann filtern:

val futureListOfSuccesses = futureListOfTrys.map(_.filter(_.isSuccess))

Sie können sogar die spezifischen Fehler beheben, wenn Sie sie benötigen:

val futureListOfFailures = futureListOfTrys.map(_.filter(_.isFailure))
Kevin Wright
quelle
Vielen Dank! .recoverwar in der Tat das fehlende Stück für mich.
Joe
20
Sie könnten verwenden, _.collect{ case Success(x) => x}anstatt in Art von _.filter(_.isSuccess)loszuwerden . TryfutureListOfSuccesses
Senia
43
In scala 2010 .recover(x => Failure(x))ist nicht gültig, verwenden Sie .recover({case e => Failure(e)})stattdessen
FGRibreau
Ich denke, Sie vermissen den zukünftigen Wrapper: def futureToFutureOfTry [A] (f: Future [A]): ​​Future [Try [A]] = {val p = Versprechen [Try [A]] () f.map {a => p.success (scala.util.Success (a))} .recover {case x: Throwable => p.success (Fehler (x))} p.future}
Dario
Nicht so. Ich ordne eine Zukunft einer anderen Zukunft zu, ein dazwischenliegendes Versprechen wird nicht benötigt und wäre verschwenderisch
Kevin Wright
12

Scala 2.12 hat eine Verbesserung gegenüber Future.transformeiner Antwort mit weniger Codes.

val futures = Seq(Future{1},Future{throw new Exception})

// instead of `map` and `recover`, use `transform`
val seq = Future.sequence(futures.map(_.transform(Success(_)))) 

val successes = seq.map(_.collect{case Success(x)=>x})
successes
//res1: Future[Seq[Int]] = Future(Success(List(1)))

val failures = seq.map(_.collect{case Failure(x)=>x})
failures
//res2: Future[Seq[Throwable]] = Future(Success(List(java.lang.Exception)))
WeiChing 林 煒 清
quelle
11

Ich habe Kevins Antwort ausprobiert und bin auf einen Fehler in meiner Version von Scala (2.11.5) gestoßen ... Ich habe das korrigiert und ein paar zusätzliche Tests geschrieben, wenn jemand interessiert ist ... hier ist meine Version>

implicit class FutureCompanionOps(val f: Future.type) extends AnyVal {

    /** Given a list of futures `fs`, returns the future holding the list of Try's of the futures from `fs`.
      * The returned future is completed only once all of the futures in `fs` have been completed.
      */
    def allAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
      val listOfFutureTrys: List[Future[Try[T]]] = fItems.map(futureToFutureTry)
      Future.sequence(listOfFutureTrys)
    }

    def futureToFutureTry[T](f: Future[T]): Future[Try[T]] = {
      f.map(Success(_)) .recover({case x => Failure(x)})
    }

    def allFailedAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
      allAsTrys(fItems).map(_.filter(_.isFailure))
    }

    def allSucceededAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
      allAsTrys(fItems).map(_.filter(_.isSuccess))
    }
}


// Tests... 



  // allAsTrys tests
  //
  test("futureToFutureTry returns Success if no exception") {
    val future =  Future.futureToFutureTry(Future{"mouse"})
    Thread.sleep(0, 100)
    val futureValue = future.value
    assert(futureValue == Some(Success(Success("mouse"))))
  }
  test("futureToFutureTry returns Failure if exception thrown") {
    val future =  Future.futureToFutureTry(Future{throw new IllegalStateException("bad news")})
    Thread.sleep(5)            // need to sleep a LOT longer to get Exception from failure case... interesting.....
    val futureValue = future.value

    assertResult(true) {
      futureValue match {
        case Some(Success(Failure(error: IllegalStateException)))  => true
      }
    }
  }
  test("Future.allAsTrys returns Nil given Nil list as input") {
    val future =  Future.allAsTrys(Nil)
    assert ( Await.result(future, 100 nanosecond).isEmpty )
  }
  test("Future.allAsTrys returns successful item even if preceded by failing item") {
    val future1 =  Future{throw new IllegalStateException("bad news")}
    var future2 = Future{"dog"}

    val futureListOfTrys =  Future.allAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys, 10 milli)
    System.out.println("successItem:" + listOfTrys);

    assert(listOfTrys(0).failed.get.getMessage.contains("bad news"))
    assert(listOfTrys(1) == Success("dog"))
  }
  test("Future.allAsTrys returns successful item even if followed by failing item") {
    var future1 = Future{"dog"}
    val future2 =  Future{throw new IllegalStateException("bad news")}

    val futureListOfTrys =  Future.allAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
    System.out.println("successItem:" + listOfTrys);

    assert(listOfTrys(1).failed.get.getMessage.contains("bad news"))
    assert(listOfTrys(0) == Success("dog"))
  }
  test("Future.allFailedAsTrys returns the failed item and only that item") {
    var future1 = Future{"dog"}
    val future2 =  Future{throw new IllegalStateException("bad news")}

    val futureListOfTrys =  Future.allFailedAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
    assert(listOfTrys(0).failed.get.getMessage.contains("bad news"))
    assert(listOfTrys.size == 1)
  }
  test("Future.allSucceededAsTrys returns the succeeded item and only that item") {
    var future1 = Future{"dog"}
    val future2 =  Future{throw new IllegalStateException("bad news")}

    val futureListOfTrys =  Future.allSucceededAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
    assert(listOfTrys(0) == Success("dog"))
    assert(listOfTrys.size == 1)
  }
Chris Bedford
quelle
7

Ich bin gerade auf diese Frage gestoßen und habe eine andere Lösung zu bieten:

def allSuccessful[A, M[X] <: TraversableOnce[X]](in: M[Future[A]])
                                                (implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], 
                                                 executor: ExecutionContext): Future[M[A]] = {
    in.foldLeft(Future.successful(cbf(in))) {
      (fr, fa)(for (r ← fr; a ← fa) yield r += a) fallbackTo fr
    } map (_.result())
}

Die Idee hier ist, dass Sie innerhalb der Falte darauf warten, dass das nächste Element in der Liste abgeschlossen ist (unter Verwendung der For-Understanding-Syntax), und wenn das nächste fehlschlägt, greifen Sie einfach auf das zurück, was Sie bereits haben.

Idan Waisman
quelle
Ich mag den Namen nicht, aber ich mag die Art und Weise, wie er gemacht wird, direkt aus der Sequenz impl
crak
1

Sie können zukünftige Ergebnisse einfach mit Option umschließen und dann die Liste reduzieren:

def futureToFutureOption[T](f: Future[T]): Future[Option[T]] =
    f.map(Some(_)).recover {
      case e => None
    }
val listOfFutureOptions = listOfFutures.map(futureToFutureOption(_))

val futureListOfOptions = Future.sequence(listOfFutureOptions)

val futureListOfSuccesses = futureListOfOptions.flatten
Amir Hossein Javan
quelle
Nur für den Fall, dass jemand anderes in der ersten Funktion auf einen Fehler mit Some stößt, kann die erste Funktion wie folgt umgeschrieben werden, um Compilerfehler zu vermeiden: def futureToFutureOption [T] (f: Future [T]): Future [Option [T]] = f.map (Option (_)). Wiederherstellen {case e => None}
Zee
0

Sie können auch erfolgreiche und erfolglose Ergebnisse in verschiedenen Listen sammeln:

def safeSequence[A](futures: List[Future[A]]): Future[(List[Throwable], List[A])] = {
  futures.foldLeft(Future.successful((List.empty[Throwable], List.empty[A]))) { (flist, future) =>
    flist.flatMap { case (elist, alist) =>
      future
        .map { success => (elist, alist :+ success) }
        .recover { case error: Throwable => (elist :+ error, alist) }
    }
  }
}
Evgeniy Lyutikov
quelle