Ist es möglich, einen benutzerdefinierten Thread-Pool für den parallelen Java 8- Stream anzugeben ? Ich kann es nirgendwo finden.
Stellen Sie sich vor, ich habe eine Serveranwendung und möchte parallele Streams verwenden. Aber die Anwendung ist groß und hat mehrere Threads, deshalb möchte ich sie unterteilen. Ich möchte keine langsam laufende Aufgabe in einem Modul der Applicationblock-Aufgaben von einem anderen Modul.
Wenn ich keine unterschiedlichen Thread-Pools für verschiedene Module verwenden kann, bedeutet dies, dass ich in den meisten Situationen der realen Welt parallele Streams nicht sicher verwenden kann.
Versuchen Sie das folgende Beispiel. Es gibt einige CPU-intensive Aufgaben, die in separaten Threads ausgeführt werden. Die Aufgaben nutzen parallele Streams. Die erste Aufgabe ist unterbrochen, daher dauert jeder Schritt 1 Sekunde (simuliert durch Thread-Schlaf). Das Problem ist, dass andere Threads hängen bleiben und warten, bis die fehlerhafte Aufgabe abgeschlossen ist. Dies ist ein erfundenes Beispiel, aber stellen Sie sich eine Servlet-App und jemanden vor, der eine lange laufende Aufgabe an den Shared Fork Join-Pool sendet.
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
Antworten:
Es gibt tatsächlich einen Trick, wie eine parallele Operation in einem bestimmten Fork-Join-Pool ausgeführt wird. Wenn Sie es als Task in einem Fork-Join-Pool ausführen, bleibt es dort und verwendet nicht das allgemeine.
Der Trick basiert auf ForkJoinTask.fork, das Folgendes angibt: "Leitet die asynchrone Ausführung dieser Aufgabe in dem Pool ein, in dem die aktuelle Aufgabe ausgeführt wird, falls zutreffend, oder verwendet ForkJoinPool.commonPool (), wenn nicht inForkJoinPool ()"
quelle
ForkJoinPool
oder ist das ein Implementierungsdetail? Ein Link zur Dokumentation wäre schön.ForkJoinPool
Instanz sein sollte,shutdown()
wenn sie nicht mehr benötigt wird, um ein Thread-Leck zu vermeiden. (Beispiel)Die parallelen Streams verwenden die Standardeinstellung,
ForkJoinPool.commonPool
die standardmäßig einen Thread weniger enthält, da Sie Prozessoren haben , wie von zurückgegebenRuntime.getRuntime().availableProcessors()
(Dies bedeutet, dass parallele Streams alle Ihre Prozessoren verwenden, da sie auch den Hauptthread verwenden):Dies bedeutet auch, wenn Sie verschachtelte parallele Streams oder mehrere parallele Streams gleichzeitig gestartet haben, teilen sich alle denselben Pool. Vorteil: Sie werden niemals mehr als die Standardeinstellung (Anzahl der verfügbaren Prozessoren) verwenden. Nachteil: Möglicherweise werden nicht jedem von Ihnen initiierten parallelen Stream "alle Prozessoren" zugewiesen (wenn Sie zufällig mehr als einen haben). (Anscheinend können Sie einen ManagedBlocker verwenden , um dies zu umgehen.)
Sie können die Art und Weise ändern, in der parallele Streams ausgeführt werden
yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();
oderSystem.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")
für eine Zielparallelität von 20 Threads. Dies funktioniert jedoch nach dem zurückportierten Patch https://bugs.openjdk.java.net/browse/JDK-8190974 nicht mehr .Beispiel für Letzteres auf meinem Computer mit 8 Prozessoren. Wenn ich das folgende Programm ausführe:
Die Ausgabe ist:
Sie können also sehen, dass der parallele Stream 8 Elemente gleichzeitig verarbeitet, dh 8 Threads verwendet. Wenn ich jedoch die kommentierte Zeile auskommentiere, lautet die Ausgabe:
Diesmal hat der parallele Stream 20 Threads verwendet und alle 20 Elemente im Stream wurden gleichzeitig verarbeitet.
quelle
commonPool
hat tatsächlich eins weniger alsavailableProcessors
, was zu einer totalen Parallelität führt, die gleich ist,availableProcessors
weil der aufrufende Thread als eins zählt.ForkJoinTask
. Nachahmenparallel()
get()
ist erforderlich:stream.parallel().forEach(soSomething)).get();
ForkJoinPool.submit(() -> stream.forEach(...))
meine Stream-Aktionen mit den angegebenen ausgeführt werdenForkJoinPool
. Ich würde erwarten, dass die gesamte Stream-Aktion im ForJoinPool als EINE Aktion ausgeführt wird, aber intern immer noch den standardmäßigen / gemeinsamen ForkJoinPool verwendet. Wo haben Sie gesehen, dass ForkJoinPool.submit () das tun würde, was Sie sagen?Alternativ zum Trick, die parallele Berechnung in Ihrem eigenen forkJoinPool auszulösen, können Sie diesen Pool auch an die CompletableFuture.supplyAsync-Methode übergeben, wie in:
quelle
Die ursprüngliche Lösung (Festlegen der gemeinsamen Parallelitätseigenschaft ForkJoinPool) funktioniert nicht mehr. Wenn man sich die Links in der ursprünglichen Antwort ansieht, wurde ein Update, das dies unterbricht, wieder auf Java 8 portiert. Wie in den verknüpften Threads erwähnt, konnte nicht garantiert werden, dass diese Lösung für immer funktioniert. Basierend darauf ist die Lösung die in der akzeptierten Antwort beschriebene Lösung forkjoinpool.submit with .get. Ich denke, der Backport behebt auch die Unzuverlässigkeit dieser Lösung.
quelle
ForkJoinPool.commonPool().getParallelism()
im Debug-Modus.unreported exception InterruptedException; must be caught or declared to be thrown
trotz allercatch
Ausnahmen in der Schleife.Wir können die Standardparallelität mithilfe der folgenden Eigenschaft ändern:
die eingerichtet werden kann, um mehr Parallelität zu verwenden.
quelle
Um die tatsächliche Anzahl der verwendeten Threads zu messen, können Sie Folgendes überprüfen
Thread.activeCount()
:Dies kann auf einer 4-Kern-CPU eine Ausgabe wie folgt erzeugen:
Ohne
.parallel()
es gibt:quelle
Bisher habe ich die in den Antworten auf diese Frage beschriebenen Lösungen verwendet. Dafür habe ich mir eine kleine Bibliothek namens Parallel Stream Support ausgedacht:
Wie @PabloMatiasGomez in den Kommentaren hervorhob, gibt es jedoch Nachteile hinsichtlich des Aufteilungsmechanismus paralleler Streams, der stark von der Größe des gemeinsamen Pools abhängt. Siehe Paralleler Stream von einem HashSet wird nicht parallel ausgeführt .
Ich verwende diese Lösung nur, um separate Pools für verschiedene Arten von Arbeiten zu haben, aber ich kann die Größe des gemeinsamen Pools nicht auf 1 setzen, selbst wenn ich ihn nicht verwende.
quelle
Hinweis: In JDK 10 scheint ein Fix implementiert zu sein, der sicherstellt, dass der benutzerdefinierte Thread-Pool die erwartete Anzahl von Threads verwendet.
Die parallele Stream-Ausführung in einem benutzerdefinierten ForkJoinPool sollte der Parallelität https://bugs.openjdk.java.net/browse/JDK-8190974 entsprechen
quelle
Ich habe den benutzerdefinierten ForkJoinPool wie folgt ausprobiert, um die Poolgröße anzupassen:
Hier ist die Ausgabe, die besagt, dass der Pool mehr Threads als die Standardeinstellung 4 verwendet .
Aber tatsächlich gibt es einen Verrückten , als ich versuchte, das gleiche Ergebnis
ThreadPoolExecutor
wie folgt zu erzielen :aber ich habe versagt.
Es wird nur der Anfang parallelStream in einem neuen Thread und dann alles andere ist genau das gleiche, was wiederum beweist , dass die
parallelStream
Verwendung wird die ForkJoinPool seine untergeordneten Threads zu starten.quelle
Geh und hol AbacusUtil . Die Thread-Nummer kann für den parallelen Stream angegeben werden. Hier ist der Beispielcode:
Offenlegung: Ich bin der Entwickler von AbacusUtil.
quelle
Wenn Sie sich nicht auf Implementierungshacks verlassen möchten, gibt es immer eine Möglichkeit, dasselbe zu erreichen, indem Sie benutzerdefinierte Kollektoren implementieren, die kombinieren
map
undcollect
semantisch sind ... und Sie wären nicht auf ForkJoinPool beschränkt:Zum Glück ist es bereits hier fertig und auf Maven Central verfügbar: http://github.com/pivovarit/parallel-collectors
Haftungsausschluss: Ich habe es geschrieben und übernehme die Verantwortung dafür.
quelle
Wenn es Ihnen nichts ausmacht, eine Bibliothek eines Drittanbieters zu verwenden, können Sie mit cyclops-react sequentielle und parallele Streams innerhalb derselben Pipeline mischen und benutzerdefinierte ForkJoinPools bereitstellen. Zum Beispiel
Oder wenn wir die Verarbeitung innerhalb eines sequentiellen Streams fortsetzen möchten
[Offenlegung Ich bin der Hauptentwickler von Cyclops-React]
quelle
Wenn Sie keinen benutzerdefinierten ThreadPool benötigen, sondern die Anzahl der gleichzeitigen Aufgaben begrenzen möchten, können Sie Folgendes verwenden:
(Die doppelte Frage, die danach gestellt wird, ist gesperrt. Bitte tragen Sie mich hier.)
quelle
Sie können versuchen, diese ForkJoinWorkerThreadFactory zu implementieren und in die Fork-Join-Klasse einzufügen.
Sie können diesen Konstruktor des Fork-Join-Pools verwenden, um dies zu tun.
Anmerkungen: - 1. Wenn Sie dies verwenden, berücksichtigen Sie, dass basierend auf Ihrer Implementierung neuer Threads die Planung von JVM betroffen ist, wodurch Fork-Join-Threads im Allgemeinen für verschiedene Kerne geplant werden (die als Rechen-Thread behandelt werden). 2. Die Aufgabenplanung durch Fork-Join zu Threads wird nicht beeinflusst. 3. Sie haben nicht wirklich herausgefunden, wie paralleler Stream Threads aus Fork-Join auswählt (es wurde keine ordnungsgemäße Dokumentation gefunden). Verwenden Sie daher eine andere threadNaming-Factory, um sicherzustellen, dass Threads im parallelen Stream ausgewählt werden von customThreadFactory, die Sie bereitstellen. 4. commonThreadPool verwendet diese customThreadFactory nicht.
quelle