Benutzerdefinierter Thread-Pool im parallelen Java 8-Stream

398

Ist es möglich, einen benutzerdefinierten Thread-Pool für den parallelen Java 8- Stream anzugeben ? Ich kann es nirgendwo finden.

Stellen Sie sich vor, ich habe eine Serveranwendung und möchte parallele Streams verwenden. Aber die Anwendung ist groß und hat mehrere Threads, deshalb möchte ich sie unterteilen. Ich möchte keine langsam laufende Aufgabe in einem Modul der Applicationblock-Aufgaben von einem anderen Modul.

Wenn ich keine unterschiedlichen Thread-Pools für verschiedene Module verwenden kann, bedeutet dies, dass ich in den meisten Situationen der realen Welt parallele Streams nicht sicher verwenden kann.

Versuchen Sie das folgende Beispiel. Es gibt einige CPU-intensive Aufgaben, die in separaten Threads ausgeführt werden. Die Aufgaben nutzen parallele Streams. Die erste Aufgabe ist unterbrochen, daher dauert jeder Schritt 1 Sekunde (simuliert durch Thread-Schlaf). Das Problem ist, dass andere Threads hängen bleiben und warten, bis die fehlerhafte Aufgabe abgeschlossen ist. Dies ist ein erfundenes Beispiel, aber stellen Sie sich eine Servlet-App und jemanden vor, der eine lange laufende Aufgabe an den Shared Fork Join-Pool sendet.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}
Lukas
quelle
3
Was meinst du mit benutzerdefiniertem Thread-Pool? Es gibt einen einzigen gemeinsamen ForkJoinPool, aber Sie können jederzeit Ihren eigenen ForkJoinPool erstellen und Anforderungen an diesen senden.
Edharned
7
Hinweis: Java-Champion Heinz Kabutz untersucht das gleiche Problem, jedoch mit noch schlimmerer Auswirkung: Deadlocking-Threads des Common Fork Join Pools. Siehe javaspecialists.eu/archive/Issue223.html
Peti

Antworten:

395

Es gibt tatsächlich einen Trick, wie eine parallele Operation in einem bestimmten Fork-Join-Pool ausgeführt wird. Wenn Sie es als Task in einem Fork-Join-Pool ausführen, bleibt es dort und verwendet nicht das allgemeine.

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

Der Trick basiert auf ForkJoinTask.fork, das Folgendes angibt: "Leitet die asynchrone Ausführung dieser Aufgabe in dem Pool ein, in dem die aktuelle Aufgabe ausgeführt wird, falls zutreffend, oder verwendet ForkJoinPool.commonPool (), wenn nicht inForkJoinPool ()"

Lukas
quelle
20
Details zur Lösung finden Sie hier blog.krecan.net/2014/03/18/…
Lukas
3
Aber ist auch angegeben, dass Streams das verwenden ForkJoinPooloder ist das ein Implementierungsdetail? Ein Link zur Dokumentation wäre schön.
Nicolai
6
@ Lukas Danke für den Ausschnitt. Ich werde hinzufügen, dass die ForkJoinPoolInstanz sein sollte, shutdown()wenn sie nicht mehr benötigt wird, um ein Thread-Leck zu vermeiden. (Beispiel)
jck
5
Beachten Sie, dass es in Java 8 einen Fehler gibt, dass Aufgaben, die auf einer benutzerdefinierten Poolinstanz ausgeführt werden, dennoch an den gemeinsam genutzten Pool gekoppelt sind: Die Größe der Berechnung bleibt proportional zum gemeinsamen Pool und nicht zum benutzerdefinierten Pool. Wurde in Java 10 behoben: JDK-8190974
Terran
3
@terran Dieses Problem wurde auch für Java 8 behoben. openjdk.java.net/browse/JDK-8224620
Cutberto Ocampo
192

Die parallelen Streams verwenden die Standardeinstellung, ForkJoinPool.commonPooldie standardmäßig einen Thread weniger enthält, da Sie Prozessoren haben , wie von zurückgegeben Runtime.getRuntime().availableProcessors()(Dies bedeutet, dass parallele Streams alle Ihre Prozessoren verwenden, da sie auch den Hauptthread verwenden):

Für Anwendungen, die separate oder benutzerdefinierte Pools erfordern, kann ein ForkJoinPool mit einer bestimmten Zielparallelitätsstufe erstellt werden. Standardmäßig entspricht dies der Anzahl der verfügbaren Prozessoren.

Dies bedeutet auch, wenn Sie verschachtelte parallele Streams oder mehrere parallele Streams gleichzeitig gestartet haben, teilen sich alle denselben Pool. Vorteil: Sie werden niemals mehr als die Standardeinstellung (Anzahl der verfügbaren Prozessoren) verwenden. Nachteil: Möglicherweise werden nicht jedem von Ihnen initiierten parallelen Stream "alle Prozessoren" zugewiesen (wenn Sie zufällig mehr als einen haben). (Anscheinend können Sie einen ManagedBlocker verwenden , um dies zu umgehen.)

Sie können die Art und Weise ändern, in der parallele Streams ausgeführt werden

  • Senden Sie die parallele Stream-Ausführung an Ihren eigenen ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();oder
  • Sie können die Größe des allgemeinen Pools mithilfe der Systemeigenschaften ändern: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")für eine Zielparallelität von 20 Threads. Dies funktioniert jedoch nach dem zurückportierten Patch https://bugs.openjdk.java.net/browse/JDK-8190974 nicht mehr .

Beispiel für Letzteres auf meinem Computer mit 8 Prozessoren. Wenn ich das folgende Programm ausführe:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

Die Ausgabe ist:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

Sie können also sehen, dass der parallele Stream 8 Elemente gleichzeitig verarbeitet, dh 8 Threads verwendet. Wenn ich jedoch die kommentierte Zeile auskommentiere, lautet die Ausgabe:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

Diesmal hat der parallele Stream 20 Threads verwendet und alle 20 Elemente im Stream wurden gleichzeitig verarbeitet.

Assylien
quelle
30
Das commonPoolhat tatsächlich eins weniger als availableProcessors, was zu einer totalen Parallelität führt, die gleich ist, availableProcessorsweil der aufrufende Thread als eins zählt.
Marko Topolnik
2
Rücksendung einreichen ForkJoinTask. Nachahmen parallel() get()ist erforderlich:stream.parallel().forEach(soSomething)).get();
Grigory Kislin
5
Ich bin nicht davon überzeugt, dass ForkJoinPool.submit(() -> stream.forEach(...))meine Stream-Aktionen mit den angegebenen ausgeführt werden ForkJoinPool. Ich würde erwarten, dass die gesamte Stream-Aktion im ForJoinPool als EINE Aktion ausgeführt wird, aber intern immer noch den standardmäßigen / gemeinsamen ForkJoinPool verwendet. Wo haben Sie gesehen, dass ForkJoinPool.submit () das tun würde, was Sie sagen?
Frederic Leitenberger
@FredericLeitenberger Du wolltest wahrscheinlich deinen Kommentar unter Lukas 'Antwort setzen.
Assylias
2
Ich sehe jetzt stackoverflow.com/a/34930831/1520422 zeigt gut, dass es tatsächlich wie angekündigt funktioniert. Trotzdem verstehe ich immer noch nicht, wie es funktioniert. Aber mir geht es gut mit "es funktioniert". Vielen Dank!
Frederic Leitenberger
39

Alternativ zum Trick, die parallele Berechnung in Ihrem eigenen forkJoinPool auszulösen, können Sie diesen Pool auch an die CompletableFuture.supplyAsync-Methode übergeben, wie in:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);
Mario Fusco
quelle
22

Die ursprüngliche Lösung (Festlegen der gemeinsamen Parallelitätseigenschaft ForkJoinPool) funktioniert nicht mehr. Wenn man sich die Links in der ursprünglichen Antwort ansieht, wurde ein Update, das dies unterbricht, wieder auf Java 8 portiert. Wie in den verknüpften Threads erwähnt, konnte nicht garantiert werden, dass diese Lösung für immer funktioniert. Basierend darauf ist die Lösung die in der akzeptierten Antwort beschriebene Lösung forkjoinpool.submit with .get. Ich denke, der Backport behebt auch die Unzuverlässigkeit dieser Lösung.

ForkJoinPool fjpool = new ForkJoinPool(10);
System.out.println("stream.parallel");
IntStream range = IntStream.range(0, 20);
fjpool.submit(() -> range.parallel()
        .forEach((int theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();
System.out.println("list.parallelStream");
int [] array = IntStream.range(0, 20).toArray();
List<Integer> list = new ArrayList<>();
for (int theInt: array)
{
    list.add(theInt);
}
fjpool.submit(() -> list.parallelStream()
        .forEach((theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();
Tod Casasent
quelle
Ich sehe keine Änderung der Parallelität ForkJoinPool.commonPool().getParallelism()im Debug-Modus.
D-Codierer
Vielen Dank. Ich habe einige Tests / Nachforschungen angestellt und die Antwort aktualisiert. Es sieht so aus, als hätte ein Update es geändert, da es in älteren Versionen funktioniert.
Tod Casasent
Warum bekomme ich das immer wieder: unreported exception InterruptedException; must be caught or declared to be throwntrotz aller catchAusnahmen in der Schleife.
Rocky Li
Rocky, ich sehe keine Fehler. Die Kenntnis der Java-Version und der genauen Zeile hilft. Die "InterruptedException" weist darauf hin, dass der Versuch / Fang um den Schlaf in Ihrer Version nicht richtig geschlossen ist.
Tod Casasent
13

Wir können die Standardparallelität mithilfe der folgenden Eigenschaft ändern:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16

die eingerichtet werden kann, um mehr Parallelität zu verwenden.

KayV
quelle
Obwohl es sich um eine globale Einstellung handelt, wird der ParallelStream
Meadlai
Dies funktionierte für mich auf openjdk Version "1.8.0_222"
abbas
Dieselbe Person wie oben, dies funktioniert nicht für mich auf openjdk "11.0.6"
abbas
8

Um die tatsächliche Anzahl der verwendeten Threads zu messen, können Sie Folgendes überprüfen Thread.activeCount():

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

Dies kann auf einer 4-Kern-CPU eine Ausgabe wie folgt erzeugen:

5 // common pool
23 // custom pool

Ohne .parallel()es gibt:

3 // common pool
4 // custom pool
Charlie
quelle
6
Thread.activeCount () sagt Ihnen nicht, welche Threads Ihren Stream verarbeiten. Ordnen Sie stattdessen Thread.currentThread (). GetName () zu, gefolgt von einem eindeutigen (). Dann werden Sie feststellen, dass nicht jeder Thread im Pool verwendet wird ... Fügen Sie Ihrer Verarbeitung eine Verzögerung hinzu, und alle Threads im Pool werden verwendet.
Keyoxy
7

Bisher habe ich die in den Antworten auf diese Frage beschriebenen Lösungen verwendet. Dafür habe ich mir eine kleine Bibliothek namens Parallel Stream Support ausgedacht:

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

Wie @PabloMatiasGomez in den Kommentaren hervorhob, gibt es jedoch Nachteile hinsichtlich des Aufteilungsmechanismus paralleler Streams, der stark von der Größe des gemeinsamen Pools abhängt. Siehe Paralleler Stream von einem HashSet wird nicht parallel ausgeführt .

Ich verwende diese Lösung nur, um separate Pools für verschiedene Arten von Arbeiten zu haben, aber ich kann die Größe des gemeinsamen Pools nicht auf 1 setzen, selbst wenn ich ihn nicht verwende.

Stefan Ferstl
quelle
4

Hinweis: In JDK 10 scheint ein Fix implementiert zu sein, der sicherstellt, dass der benutzerdefinierte Thread-Pool die erwartete Anzahl von Threads verwendet.

Die parallele Stream-Ausführung in einem benutzerdefinierten ForkJoinPool sollte der Parallelität https://bugs.openjdk.java.net/browse/JDK-8190974 entsprechen

Scott Langley
quelle
1

Ich habe den benutzerdefinierten ForkJoinPool wie folgt ausprobiert, um die Poolgröße anzupassen:

private static Set<String> ThreadNameSet = new HashSet<>();
private static Callable<Long> getSum() {
    List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
    return () -> aList.parallelStream()
            .peek((i) -> {
                String threadName = Thread.currentThread().getName();
                ThreadNameSet.add(threadName);
            })
            .reduce(0L, Long::sum);
}

private static void testForkJoinPool() {
    final int parallelism = 10;

    ForkJoinPool forkJoinPool = null;
    Long result = 0L;
    try {
        forkJoinPool = new ForkJoinPool(parallelism);
        result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown(); //always remember to shutdown the pool
        }
    }
    out.println(result);
    out.println(ThreadNameSet);
}

Hier ist die Ausgabe, die besagt, dass der Pool mehr Threads als die Standardeinstellung 4 verwendet .

50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]

Aber tatsächlich gibt es einen Verrückten , als ich versuchte, das gleiche Ergebnis ThreadPoolExecutorwie folgt zu erzielen :

BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));

aber ich habe versagt.

Es wird nur der Anfang parallelStream in einem neuen Thread und dann alles andere ist genau das gleiche, was wiederum beweist , dass die parallelStreamVerwendung wird die ForkJoinPool seine untergeordneten Threads zu starten.

Gehört
quelle
Was könnte der mögliche Grund dafür sein, dass andere Testamentsvollstrecker nicht zugelassen werden?
Omjego
@omjego Das ist eine gute Frage, vielleicht könnten Sie eine neue Frage beginnen und mehr Details bereitstellen, um Ihre Ideen auszuarbeiten;)
Hearen
1

Geh und hol AbacusUtil . Die Thread-Nummer kann für den parallelen Stream angegeben werden. Hier ist der Beispielcode:

LongStream.range(4, 1_000_000).parallel(threadNum)...

Offenlegung: Ich bin der Entwickler von AbacusUtil.

user_3380739
quelle
1

Wenn Sie sich nicht auf Implementierungshacks verlassen möchten, gibt es immer eine Möglichkeit, dasselbe zu erreichen, indem Sie benutzerdefinierte Kollektoren implementieren, die kombinieren mapund collectsemantisch sind ... und Sie wären nicht auf ForkJoinPool beschränkt:

list.stream()
  .collect(parallelToList(i -> fetchFromDb(i), executor))
  .join()

Zum Glück ist es bereits hier fertig und auf Maven Central verfügbar: http://github.com/pivovarit/parallel-collectors

Haftungsausschluss: Ich habe es geschrieben und übernehme die Verantwortung dafür.

Grzegorz Piwowarek
quelle
0

Wenn es Ihnen nichts ausmacht, eine Bibliothek eines Drittanbieters zu verwenden, können Sie mit cyclops-react sequentielle und parallele Streams innerhalb derselben Pipeline mischen und benutzerdefinierte ForkJoinPools bereitstellen. Zum Beispiel

 ReactiveSeq.range(1, 1_000_000)
            .foldParallel(new ForkJoinPool(10),
                          s->s.filter(i->true)
                              .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
                              .max(Comparator.naturalOrder()));

Oder wenn wir die Verarbeitung innerhalb eines sequentiellen Streams fortsetzen möchten

 ReactiveSeq.range(1, 1_000_000)
            .parallel(new ForkJoinPool(10),
                      s->s.filter(i->true)
                          .peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
            .map(this::processSequentially)
            .forEach(System.out::println);

[Offenlegung Ich bin der Hauptentwickler von Cyclops-React]

John McClean
quelle
0

Wenn Sie keinen benutzerdefinierten ThreadPool benötigen, sondern die Anzahl der gleichzeitigen Aufgaben begrenzen möchten, können Sie Folgendes verwenden:

List<Path> paths = List.of("/path/file1.csv", "/path/file2.csv", "/path/file3.csv").stream().map(e -> Paths.get(e)).collect(toList());
List<List<Path>> partitions = Lists.partition(paths, 4); // Guava method

partitions.forEach(group -> group.parallelStream().forEach(csvFilePath -> {
       // do your processing   
}));

(Die doppelte Frage, die danach gestellt wird, ist gesperrt. Bitte tragen Sie mich hier.)

Martin Vseticka
quelle
-2

Sie können versuchen, diese ForkJoinWorkerThreadFactory zu implementieren und in die Fork-Join-Klasse einzufügen.

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }

Sie können diesen Konstruktor des Fork-Join-Pools verwenden, um dies zu tun.

Anmerkungen: - 1. Wenn Sie dies verwenden, berücksichtigen Sie, dass basierend auf Ihrer Implementierung neuer Threads die Planung von JVM betroffen ist, wodurch Fork-Join-Threads im Allgemeinen für verschiedene Kerne geplant werden (die als Rechen-Thread behandelt werden). 2. Die Aufgabenplanung durch Fork-Join zu Threads wird nicht beeinflusst. 3. Sie haben nicht wirklich herausgefunden, wie paralleler Stream Threads aus Fork-Join auswählt (es wurde keine ordnungsgemäße Dokumentation gefunden). Verwenden Sie daher eine andere threadNaming-Factory, um sicherzustellen, dass Threads im parallelen Stream ausgewählt werden von customThreadFactory, die Sie bereitstellen. 4. commonThreadPool verwendet diese customThreadFactory nicht.

Nitish Kumar
quelle
Können Sie ein brauchbares Beispiel liefern, das zeigt, wie Sie das verwenden, was Sie angegeben haben?
J. Murray