AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
.map(record -> new Record(recordNumber.incrementAndGet(), record))
.parallel()
.filter(record -> doSomeOperation())
.findFirst()
Als ich dies schrieb, ging ich davon aus, dass die Threads nur beim Kartenaufruf erzeugt werden, da parallel nach der Karte platziert wird. Einige Zeilen in der Datei erhielten jedoch bei jeder Ausführung unterschiedliche Datensatznummern.
Ich habe die offizielle Java-Stream-Dokumentation und einige Websites gelesen , um zu verstehen, wie Streams unter der Haube funktionieren.
Ein paar Fragen:
Der parallele Java-Stream basiert auf SplitIterator , der von jeder Sammlung wie ArrayList, LinkedList usw. implementiert wird. Wenn wir aus diesen Sammlungen einen parallelen Stream erstellen, wird der entsprechende geteilte Iterator zum Teilen und Iterieren der Sammlung verwendet. Dies erklärt, warum Parallelität eher auf der Ebene der ursprünglichen Eingabequelle (Dateizeilen) als auf dem Ergebnis der Karte (dh Record Pojo) auftrat. Ist mein Verständnis richtig?
In meinem Fall ist die Eingabe ein Datei-E / A-Stream. Welcher Split-Iterator wird verwendet?
Es spielt keine Rolle, wo wir
parallel()
in der Pipeline platzieren. Die ursprüngliche Eingabequelle wird immer aufgeteilt und die verbleibenden Zwischenoperationen werden angewendet.In diesem Fall sollte Java Benutzern nicht erlauben, Paralleloperationen an einer beliebigen Stelle in der Pipeline zu platzieren, außer an der ursprünglichen Quelle. Weil es ein falsches Verständnis für diejenigen gibt, die nicht wissen, wie Java Stream intern funktioniert. Ich weiß, dass die
parallel()
Operation für den Stream-Objekttyp definiert worden wäre, und daher funktioniert sie auf diese Weise. Es ist jedoch besser, eine alternative Lösung bereitzustellen.Im obigen Code-Snippet versuche ich, jedem Datensatz in der Eingabedatei eine Zeilennummer hinzuzufügen, und daher sollte sie bestellt werden. Ich möchte mich jedoch
doSomeOperation()
parallel bewerben, da es sich um eine Schwergewichtslogik handelt. Der eine Weg, dies zu erreichen, besteht darin, meinen eigenen angepassten Split-Iterator zu schreiben. Gibt es einen anderen Weg?
quelle
parallel()
ist nichts weiter als eine allgemeine Modifikatoranforderung, die auf das zugrunde liegende Stream-Objekt angewendet wird. Denken Sie daran, dass es nur einen Quellstrom gibt, wenn Sie keine endgültigen Operationen auf die Pipe anwenden, dh solange nichts "ausgeführt" wird. Trotzdem hinterfragen Sie im Grunde nur die Auswahl des Java-Designs. Welches ist meinungsbasiert und wir können dabei nicht wirklich helfen.Stream
direkt in der Schnittstelle angeboten und aufgrund der schönen Kaskadierung gibt jede Operation wiederStream
zurück. Stellen Sie sich vor, jemand möchte Ihnen eine geben, hatStream
aber bereits einige Operationen wiemap
diese angewendet . Als Benutzer möchten Sie weiterhin entscheiden können, ob es parallel ausgeführt werden soll oder nicht. Es muss also möglich sein, dass Sie noch anrufenparallel()
, obwohl der Stream bereits vorhanden ist.flatMap
unsicheren Methoden oder ähnlichem oder ähnlichem dramatisch erhöhen .Path
im lokalen Dateisystem befindet und Sie ein aktuelles JDK verwenden, verfügt der Spliterator über eine bessere Parallelverarbeitungsfähigkeit als das Stapeln von Vielfachen von 1024. In einigenfindFirst
Szenarien kann eine ausgeglichene Aufteilung jedoch sogar kontraproduktiv sein …Antworten:
Der gesamte Stream ist entweder parallel oder sequentiell. Wir wählen keine Teilmenge von Operationen aus, die sequentiell oder parallel ausgeführt werden sollen.
Wie Sie bereits erwähnt haben, verwenden parallele Streams geteilte Iteratoren. Dies dient eindeutig dazu, die Daten zu partitionieren, bevor die Vorgänge ausgeführt werden.
Wenn ich mir die Quelle anschaue, sehe ich, dass sie verwendet wird
java.nio.file.FileChannelLinesSpliterator
Recht. Sie können sogar mehrmals anrufen
parallel()
und anrufensequential()
. Der zuletzt aufgerufene gewinnt. Wenn wir aufrufenparallel()
, legen wir dies für den zurückgegebenen Stream fest. und wie oben angegeben, laufen alle Operationen entweder sequentiell oder parallel.Dies wird zu einer Ansichtssache. Ich denke, Zabuza gibt einen guten Grund, die Wahl der JDK-Designer zu unterstützen.
Dies hängt von Ihrem Betrieb ab
findFirst()
es sich um Ihren eigentlichen Terminalbetrieb handelt, müssen Sie sich nicht einmal um die parallele Ausführung kümmern, dadoSomething()
ohnehin nicht viele Anrufe getätigt werden (findFirst()
Kurzschluss)..parallel()
Tatsächlich kann dies dazu führen, dass mehr als ein Element verarbeitet wird, während diesfindFirst()
in einem sequentiellen Stream verhindert wird.Wenn Ihre Terminaloperation nicht viele Daten erstellt, können Sie Ihre
Record
Objekte möglicherweise mithilfe eines sequentiellen Streams erstellen und das Ergebnis dann parallel verarbeiten:Wenn Ihre Pipeline viele Daten in den Speicher laden würde (was möglicherweise der Grund ist, warum Sie sie verwenden
Files.lines()
), benötigen Sie möglicherweise einen benutzerdefinierten Split-Iterator. Bevor ich dorthin gehe, würde ich mir jedoch andere Optionen ansehen (z. B. das Speichern von Zeilen mit einer ID-Spalte - das ist nur meine Meinung).Ich würde auch versuchen, Datensätze in kleineren Stapeln wie folgt zu verarbeiten:
Dies wird
doSomeOperation()
parallel ausgeführt, ohne dass alle Daten in den Speicher geladen werden. Beachten Sie jedoch, dass darüberbatchSize
nachgedacht werden muss.quelle
Spliterator
Implementierung wäre nicht komplizierter und ermöglicht gleichzeitig eine effizientere Parallelverarbeitung…parallelStream
Operationen hat einen festen Overhead, um die Operation zu starten und auf das Endergebnis zu warten, während sie auf eine Parallelität von beschränkt istbatchSize
. Zunächst benötigen Sie ein Vielfaches der derzeit verfügbaren Anzahl von CPU-Kernen, um Leerlauf-Threads zu vermeiden. Dann sollte die Zahl hoch genug sein, um den festen Overhead zu kompensieren. Je höher die Zahl, desto höher die Pause, die durch die sequentielle Leseoperation verursacht wird, bevor die Parallelverarbeitung überhaupt beginnt.Stream.generate
einer starken Störung des inneren Streams führen, abgesehen von dem Punkt, an dem ein ungeordneter Stream erzeugt wird, der mit den vom OP beabsichtigten Anwendungsfällen wie zfindFirst()
. Im Gegensatz dazu funktioniert ein einzelner paralleler Stream mit einem Spliterator, der Chunks zurückgibt,trySplit
unkompliziert und ermöglicht es Arbeitsthreads , den nächsten Chunk zu verarbeiten, ohne auf den Abschluss des vorherigen zu warten.findFirst()
Operation nur eine kleine Anzahl von Elementen verarbeitet. Die erste Übereinstimmung kann immer noch auftreten, nachdem 90% aller Elemente verarbeitet wurden. Wenn Sie zehn Millionen Zeilen haben, müssen Sie auch nach 10% noch eine Million Zeilen verarbeiten, um eine Übereinstimmung zu finden.Das ursprüngliche Stream-Design enthielt die Idee, nachfolgende Pipeline-Phasen mit unterschiedlichen Einstellungen für die parallele Ausführung zu unterstützen. Diese Idee wurde jedoch aufgegeben. Die API kann aus dieser Zeit stammen, aber andererseits wäre ein API-Design, das den Aufrufer zwingt, eine einzige eindeutige Entscheidung für die parallele oder sequentielle Ausführung zu treffen, viel komplizierter.
Das tatsächliche
Spliterator
Verwendung durchFiles.lines(…)
ist implementierungsabhängig. In Java 8 (Oracle oder OpenJDK) erhalten Sie immer das gleiche wie beiBufferedReader.lines()
. Wenn in neueren JDKs dasPath
zum Standarddateisystem gehört und der Zeichensatz einer der für diese Funktion unterstützten ist, erhalten Sie einen Stream mit einer dediziertenSpliterator
Implementierung, demjava.nio.file.FileChannelLinesSpliterator
. Wenn die Voraussetzungen nicht erfüllt sind, erhalten Sie das gleiche wie beiBufferedReader.lines()
, das immer noch auf einemIterator
implementiertenBufferedReader
und umschlossenen basiertSpliterators.spliteratorUnknownSize
.Ihre spezifische Aufgabe wird am besten mit einem benutzerdefinierten erledigt
Spliterator
die die Zeilennummerierung direkt an der Quelle vor der Parallelverarbeitung durchführen kann, um eine nachfolgende Parallelverarbeitung ohne Einschränkungen zu ermöglichen.quelle
Das Folgende ist eine einfache Demonstration, wann die Anwendung von Parallel angewendet wird. Die Ausgabe von Peek zeigt deutlich den Unterschied zwischen den beiden Beispielen. Hinweis: Der
map
Aufruf wird nur eingeworfen, um zuvor eine weitere Methode hinzuzufügenparallel
.quelle