Ich versuche zu konvertieren List<CompletableFuture<X>>
zu CompletableFuture<List<T>>
. Dies ist sehr nützlich, wenn Sie viele asynchrone Aufgaben haben und Ergebnisse von allen erhalten möchten.
Wenn einer von ihnen fehlschlägt, schlägt die endgültige Zukunft fehl. So habe ich implementiert:
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}
Um es auszuführen:
ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);
Wenn einer von ihnen fehlschlägt, schlägt er fehl. Es liefert die erwartete Leistung, selbst wenn es eine Million Futures gibt. Das Problem, das ich habe, ist: Sagen wir, wenn es mehr als 5000 Futures gibt und wenn einer von ihnen fehlschlägt, bekomme ich eine StackOverflowError
:
Ausnahme im Thread "pool-1-thread-2611" java.lang.StackOverflowError unter java.util.concurrent.CompletableFuture.internalComplete (CompletableFuture.java:210) unter java.util.concurrent.CompletableFuture $ ThenCompose.run (CompletableFuture.j : 1487) unter java.util.concurrent.CompletableFuture.postComplete (CompletableFuture.java:193) unter java.util.concurrent.CompletableFuture.internalComplete (CompletableFuture.java:210) unter java.util.concurrent.CompletableFuture CompletableFuture.java:1487)
Was mache ich falsch?
Hinweis: Die oben zurückgegebene Zukunft schlägt sofort fehl, wenn eine der Zukunft fehlschlägt. Die akzeptierte Antwort sollte auch diesen Punkt berücksichtigen.
Collector
stattdessen ein ...Antworten:
Verwendung
CompletableFuture.allOf(...)
:static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) { return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0])) .thenApply(v -> com.stream() .map(CompletableFuture::join) .collect(Collectors.toList()) ); }
Einige Kommentare zu Ihrer Implementierung:
Ihre Nutzung
.thenComposeAsync
,.thenApplyAsync
und.thenCombineAsync
wird wahrscheinlich nicht tun , was Sie erwarten. Diese...Async
Methoden führen die ihnen bereitgestellte Funktion in einem separaten Thread aus. In Ihrem Fall wird das Hinzufügen des neuen Elements zur Liste im bereitgestellten Executor ausgeführt. Es ist nicht erforderlich, leichte Operationen in einen zwischengespeicherten Thread-Executor zu stopfen. Verwenden Sie keinethenXXXXAsync
Methoden ohne guten Grund.Darüber hinaus
reduce
sollte nicht verwendet werden, um sich in veränderlichen Behältern anzusammeln. Auch wenn es möglicherweise ordnungsgemäß funktioniert, wenn der Stream sequentiell ist, schlägt es fehl, wenn der Stream parallel geschaltet wird. Verwenden Sie.collect
stattdessen, um eine veränderbare Reduktion durchzuführen .Wenn Sie die gesamte Berechnung ausnahmsweise unmittelbar nach dem ersten Fehler ausführen möchten, gehen Sie in Ihrer
sequence
Methode wie folgt vor :CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0])) .thenApply(v -> com.stream() .map(CompletableFuture::join) .collect(Collectors.toList()) ); com.forEach(f -> f.whenComplete((t, ex) -> { if (ex != null) { result.completeExceptionally(ex); } })); return result;
Wenn Sie die verbleibenden Vorgänge beim ersten Fehler zusätzlich abbrechen möchten, fügen Sie sie
exec.shutdownNow();
direkt danach hinzuresult.completeExceptionally(ex);
. Dies setzt natürlich voraus, dassexec
nur für diese eine Berechnung existiert. Wenn dies nicht der Fall ist, müssen Sie eine Schleife durchführen und jeden verbleibendenFuture
einzeln abbrechen .quelle
allof
Rückgabetyp istCompletableFuture<Void>
und wirCompletableFuture<List<T>>
ohne Compiler-Warnung zurückkehren. Ich war mir dieser Natur der Leere nicht bewusstreduce
, solange der Stream in dersequence2
Methode sequentiell gehalten wird. Es ist jedoch sehr unerwünscht, Stream-Konstrukte zu schreiben, die brechen, wenn der Stream parallel geschaltet wird. Wenn Sie sich darauf verlassen, dass der Stream sequentiell ist, sollte das dritte Argumentreduce
lauten(a, b) -> {throw new IllegalStateException("Parallel not allowed");}
thenCombine
würde sich Ihre ursprüngliche Lösung (mit ) verhalten. Wenn Sie die Berechnung kurzschließen und sofort eine außergewöhnliche Fertigstellung auslösen möchten, ist dies einfach. Siehe aktualisierte Antwort.join
. Der Vorteil der VerwendungallOf
besteht darin, dass beimallOf
Auslösen alle Aufgaben abgeschlossen wurden undjoin
nur die Ergebnisse angezeigt werden.Wie Mischa betont hat , überbeanspruchen Sie
…Async
Operationen. Außerdem erstellen Sie eine komplexe Operationskette, die eine Abhängigkeit modelliert, die Ihre Programmlogik nicht widerspiegelt:Dann kann das Abbrechen (explizit oder aufgrund einer Ausnahme) dieses rekursiv zusammengestellten Jobs rekursiv ausgeführt werden und mit a fehlschlagen
StackOverflowError
. Das ist implementierungsabhängig.Wie bereits von Misha gezeigt , gibt es eine Methode, mit
allOf
der Sie Ihre ursprüngliche Absicht modellieren können, um einen Job zu definieren, der von allen Jobs Ihrer Liste abhängt.Es ist jedoch erwähnenswert, dass auch das nicht notwendig ist. Da Sie einen unbegrenzten Thread-Pool-Executor verwenden, können Sie einfach einen asynchronen Job veröffentlichen, der die Ergebnisse in einer Liste sammelt, und fertig. Das Warten auf den Abschluss wird impliziert, indem ohnehin nach dem Ergebnis jedes Auftrags gefragt wird.
ExecutorService executorService = Executors.newCachedThreadPool(); List<CompletableFuture<Integer>> que = IntStream.range(0, 100000) .mapToObj(x -> CompletableFuture.supplyAsync(() -> { LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10))); return x; }, executorService)).collect(Collectors.toList()); CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync( () -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()), executorService);
Die Verwendung von Methoden zum Erstellen abhängiger Operationen ist wichtig, wenn die Anzahl der Threads begrenzt ist und die Jobs möglicherweise zusätzliche asynchrone Jobs erzeugen, um zu vermeiden, dass wartende Jobs Threads von Jobs stehlen, die zuerst abgeschlossen werden müssen, aber dies ist hier auch nicht der Fall.
In diesem speziellen Fall kann ein Job, der einfach über diese große Anzahl von vorausgesetzten Jobs iteriert und bei Bedarf wartet, effizienter sein, als diese große Anzahl von Abhängigkeiten zu modellieren und jeden Job den abhängigen Job über den Abschluss zu benachrichtigen.
quelle
supplyAsync
anstelle vonallOf
einen Thread aus dem Pool verbraucht, um auf den Abschluss aller Aufgaben zu warten. Wenn ich mich nicht irre,allOf
wird innerhalb der Threads arbeiten, die den jeweiligen Aufgaben zugewiesen sind. Für die meisten Anwendungsfälle keine große Sache, aber erwähnenswert.Sie können die Spotify-
CompletableFutures
Bibliothek abrufen und dieallAsList
Methode verwenden. Ich denke, es ist von GuavasFutures.allAsList
Methode inspiriert .public static <T> CompletableFuture<List<T>> allAsList( List<? extends CompletionStage<? extends T>> stages) {
Und hier ist eine einfache Implementierung, wenn Sie keine Bibliothek verwenden möchten:
public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) { return CompletableFuture.allOf( futures.toArray(new CompletableFuture[futures.size()]) ).thenApply(ignored -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()) ); }
quelle
Um die akzeptierte Antwort von @Misha zu ergänzen, kann sie als Sammler weiter ausgebaut werden:
public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> sequenceCollector() { return Collectors.collectingAndThen(Collectors.toList(), com -> sequence(com)); }
Jetzt kannst du:
Stream<CompletableFuture<Integer>> stream = Stream.of( CompletableFuture.completedFuture(1), CompletableFuture.completedFuture(2), CompletableFuture.completedFuture(3) ); CompletableFuture<List<Integer>> ans = stream.collect(sequenceCollector());
quelle
Eine beispielhafte Sequenzoperation mit thenCombine on CompletableFuture
public<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com){ CompletableFuture<List<T>> identity = CompletableFuture.completedFuture(new ArrayList<T>()); BiFunction<CompletableFuture<List<T>>,CompletableFuture<T>,CompletableFuture<List<T>>> combineToList = (acc,next) -> acc.thenCombine(next,(a,b) -> { a.add(b); return a;}); BinaryOperator<CompletableFuture<List<T>>> combineLists = (a,b)-> a.thenCombine(b,(l1,l2)-> { l1.addAll(l2); return l1;}) ; return com.stream() .reduce(identity, combineToList, combineLists); } }
Wenn es Ihnen nichts ausmacht, Bibliotheken von Drittanbietern zu verwenden, bietet cyclops-react (ich bin der Autor) eine Reihe von Dienstprogrammmethoden für CompletableFutures (und Optionals, Streams usw.).
quelle
Haftungsausschluss: Dies wird die ursprüngliche Frage nicht vollständig beantworten. Es wird der Teil "Alles scheitern, wenn man versagt" fehlen. Ich kann die eigentliche, allgemeinere Frage jedoch nicht beantworten, da sie als Duplikat dieser Frage geschlossen wurde: Java 8 CompletableFuture.allOf (...) mit Sammlung oder Liste . Also werde ich hier antworten:
Zusammenfassung: Verwenden Sie Folgendes:
private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) { CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>()); BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) -> futureValue.thenCombine(futureList, (value, list) -> { List<V> newList = new ArrayList<>(list.size() + 1); newList.addAll(list); newList.add(value); return newList; }); BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> { List<V> newList = new ArrayList<>(list1.size() + list2.size()); newList.addAll(list1); newList.addAll(list2); return newList; }); return listOfFutures.stream().reduce(identity, accumulator, combiner); }
Anwendungsbeispiel:
List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads) .mapToObj(i -> loadData(i, executor)).collect(toList()); CompletableFuture<List<String>> futureList = sequence(listOfFutures);
Vollständiges Beispiel:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.function.BiFunction; import java.util.function.BinaryOperator; import java.util.stream.IntStream; import static java.util.stream.Collectors.toList; public class ListOfFuturesToFutureOfList { public static void main(String[] args) { ListOfFuturesToFutureOfList test = new ListOfFuturesToFutureOfList(); test.load(10); } public void load(int numThreads) { final ExecutorService executor = Executors.newFixedThreadPool(numThreads); List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads) .mapToObj(i -> loadData(i, executor)).collect(toList()); CompletableFuture<List<String>> futureList = sequence(listOfFutures); System.out.println("Future complete before blocking? " + futureList.isDone()); // this will block until all futures are completed List<String> data = futureList.join(); System.out.println("Loaded data: " + data); System.out.println("Future complete after blocking? " + futureList.isDone()); executor.shutdown(); } public CompletableFuture<String> loadData(int dataPoint, Executor executor) { return CompletableFuture.supplyAsync(() -> { ThreadLocalRandom rnd = ThreadLocalRandom.current(); System.out.println("Starting to load test data " + dataPoint); try { Thread.sleep(500 + rnd.nextInt(1500)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Successfully loaded test data " + dataPoint); return "data " + dataPoint; }, executor); } private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) { CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>()); BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) -> futureValue.thenCombine(futureList, (value, list) -> { List<V> newList = new ArrayList<>(list.size() + 1); newList.addAll(list); newList.add(value); return newList; }); BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> { List<V> newList = new ArrayList<>(list1.size() + list2.size()); newList.addAll(list1); newList.addAll(list2); return newList; }); return listOfFutures.stream().reduce(identity, accumulator, combiner); } }
quelle
thenCombine()
stattthenApply()
im Akkumulator verwenden, um denjoin()
Anruf zu vermeiden . Andernfalls führt der aufrufende Thread dies tatsächlich aus, sodass die Sammlung erst zurückgegeben wird, nachdem alles abgeschlossen ist. Sie können dies überprüfen, indem Sie einen Ausdruck vor demfutureList.join()
: hinzufügen, der erst gedruckt wird, nachdem alle Futures " Erfolgreich geladene Testdaten " gedruckt haben .thenApply()
zuthenCombine()
dann wechsle , wird der letztejoin()
Aufruf vonCompletableFuture<List<V>>
nicht mehr blockiert, sondern kehrt sofort mit einem leeren Ergebnis zurück. Die Zukunft der Liste wird also nicht warten, bis alle einzelnen Futures abgeschlossen sind. Aber das war die ursprüngliche Idee des Ganzen.Collector
auf Mutation beruht. Das Problem mit Ihrem Code ist, dass er äquivalent zu istCompletableFuture.completedFuture(listOfFutures.stream().map(CompletableFuture::join).collect(toList()));
. Die Sammlung gibt tatsächlich eine Zukunft zurück, die bereits abgeschlossen ist. Es macht also keinen Sinn mehr, eine Zukunft zurückzugeben.toFutureList()
Kollektors. Was nicht gleichwertig ist, istlistOfFutures.stream().map(CompletableFuture::join).collect(toList())
undlistOfFutures.stream().collect(toFutureList())
. Ersteres gibt Ihnen ein vollständiges Ergebnis mit allen abgeschlossenen Futures, während letzteres Ihnen eine Zukunft mit einer Liste von Werten gibt, die Sie weitergeben oder anderen Werten zuordnen können, ohne sie zu blockieren.join()
alle Futures im aufrufenden Thread auf und verpackt das Ergebnis in ein bereits abgeschlossenesCompletableFuture
. Es blockiert. Wie ich bereits sagte, fügen Sie einfach direkt nach der Stream-Sammlung einen Druck hinzu, und Sie werden sehen, dass dieser Druck erst nach Abschluss aller Futures erfolgt.Zusätzlich zur Spotify Futures-Bibliothek können Sie versuchen, meinen Code hier zu finden: https://github.com/vsilaev/java-async-await/blob/master/net.tascalate.async.examples/src/main/java/net/ tascalate / concurrent / CompletionStages.java (hat Abhängigkeiten zu anderen Klassen im selben Paket)
Es implementiert eine Logik, um "mindestens N aus M" CompletionStage-s mit einer Richtlinie zurückzugeben, wie viele Fehler toleriert werden dürfen. Es gibt praktische Methoden für alle Fälle sowie Stornierungsbedingungen für die verbleibenden Futures. Der Code behandelt CompletionStage-s (Schnittstelle) und nicht CompletableFuture (konkrete Klasse).
quelle
Javaslang hat eine sehr praktische
Future
API . Es ermöglicht auch, aus einer Sammlung von Futures eine Zukunft der Sammlung zu machen.Siehe http://static.javadoc.io/io.javaslang/javaslang/2.0.5/javaslang/concurrent/Future.html#sequence-java.lang.Iterable-
quelle
javaslang.concurrent.Future
:(Ihre Aufgabe könnte leicht wie folgt erledigt werden:
final List<CompletableFuture<Module> futures =... CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).join();
quelle