Java Parallel Stream - Reihenfolge des Aufrufs der parallel () -Methode [geschlossen]

11
AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
     .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
     .parallel()           
     .filter(record -> doSomeOperation())
     .findFirst()

Als ich dies schrieb, ging ich davon aus, dass die Threads nur beim Kartenaufruf erzeugt werden, da parallel nach der Karte platziert wird. Einige Zeilen in der Datei erhielten jedoch bei jeder Ausführung unterschiedliche Datensatznummern.

Ich habe die offizielle Java-Stream-Dokumentation und einige Websites gelesen , um zu verstehen, wie Streams unter der Haube funktionieren.

Ein paar Fragen:

  • Der parallele Java-Stream basiert auf SplitIterator , der von jeder Sammlung wie ArrayList, LinkedList usw. implementiert wird. Wenn wir aus diesen Sammlungen einen parallelen Stream erstellen, wird der entsprechende geteilte Iterator zum Teilen und Iterieren der Sammlung verwendet. Dies erklärt, warum Parallelität eher auf der Ebene der ursprünglichen Eingabequelle (Dateizeilen) als auf dem Ergebnis der Karte (dh Record Pojo) auftrat. Ist mein Verständnis richtig?

  • In meinem Fall ist die Eingabe ein Datei-E / A-Stream. Welcher Split-Iterator wird verwendet?

  • Es spielt keine Rolle, wo wir parallel()in der Pipeline platzieren. Die ursprüngliche Eingabequelle wird immer aufgeteilt und die verbleibenden Zwischenoperationen werden angewendet.

    In diesem Fall sollte Java Benutzern nicht erlauben, Paralleloperationen an einer beliebigen Stelle in der Pipeline zu platzieren, außer an der ursprünglichen Quelle. Weil es ein falsches Verständnis für diejenigen gibt, die nicht wissen, wie Java Stream intern funktioniert. Ich weiß, dass die parallel()Operation für den Stream-Objekttyp definiert worden wäre, und daher funktioniert sie auf diese Weise. Es ist jedoch besser, eine alternative Lösung bereitzustellen.

  • Im obigen Code-Snippet versuche ich, jedem Datensatz in der Eingabedatei eine Zeilennummer hinzuzufügen, und daher sollte sie bestellt werden. Ich möchte mich jedoch doSomeOperation()parallel bewerben, da es sich um eine Schwergewichtslogik handelt. Der eine Weg, dies zu erreichen, besteht darin, meinen eigenen angepassten Split-Iterator zu schreiben. Gibt es einen anderen Weg?

Forscher
quelle
2
Es hat mehr damit zu tun, wie Java-Entwickler beschlossen haben, die Benutzeroberfläche zu entwerfen. Sie stellen Ihre Anforderungen an die Pipeline und alles, was keine endgültige Operation ist, wird zuerst gesammelt. parallel()ist nichts weiter als eine allgemeine Modifikatoranforderung, die auf das zugrunde liegende Stream-Objekt angewendet wird. Denken Sie daran, dass es nur einen Quellstrom gibt, wenn Sie keine endgültigen Operationen auf die Pipe anwenden, dh solange nichts "ausgeführt" wird. Trotzdem hinterfragen Sie im Grunde nur die Auswahl des Java-Designs. Welches ist meinungsbasiert und wir können dabei nicht wirklich helfen.
Zabuzard
1
Ich verstehe Ihren Standpunkt und Ihre Verwirrung vollkommen, aber ich glaube nicht, dass es viel bessere Lösungen gibt. Die Methode wird Streamdirekt in der Schnittstelle angeboten und aufgrund der schönen Kaskadierung gibt jede Operation wieder Streamzurück. Stellen Sie sich vor, jemand möchte Ihnen eine geben, hat Streamaber bereits einige Operationen wie mapdiese angewendet . Als Benutzer möchten Sie weiterhin entscheiden können, ob es parallel ausgeführt werden soll oder nicht. Es muss also möglich sein, dass Sie noch anrufen parallel(), obwohl der Stream bereits vorhanden ist.
Zabuzard
1
Außerdem möchte ich lieber fragen, warum Sie einen Teil eines Streams nacheinander ausführen und später auf parallel umschalten möchten. Wenn der Stream bereits groß genug ist, um sich für die parallele Ausführung zu qualifizieren, gilt dies wahrscheinlich auch für alles, was zuvor in der Pipeline war. Warum also nicht auch für diesen Teil die parallele Ausführung verwenden? Ich verstehe, dass es Randfälle gibt, wie wenn Sie die Größe mit drastisch flatMapunsicheren Methoden oder ähnlichem oder ähnlichem dramatisch erhöhen .
Zabuzard
1
@Zabuza Ich stelle die Wahl des Java-Designs nicht in Frage, aber ich mache mir nur Sorgen. Jeder grundlegende Java-Stream-Benutzer kann die gleiche Verwirrung stiften, wenn er nicht die Funktionsweise von Stream versteht. Ich stimme Ihrem zweiten Kommentar jedoch voll und ganz zu. Ich habe gerade eine mögliche Lösung hervorgehoben, die, wie Sie bereits erwähnt haben, ihre eigenen Nachteile haben könnte. Aber wir können sehen, ob es auf andere Weise gelöst werden kann. In Bezug auf Ihren dritten Kommentar habe ich meinen Anwendungsfall bereits im letzten Punkt meiner Beschreibung erwähnt
Explorer
1
@Eugene Wenn sich das Pathim lokalen Dateisystem befindet und Sie ein aktuelles JDK verwenden, verfügt der Spliterator über eine bessere Parallelverarbeitungsfähigkeit als das Stapeln von Vielfachen von 1024. In einigen findFirstSzenarien kann eine ausgeglichene Aufteilung jedoch sogar kontraproduktiv sein …
Holger,

Antworten:

8

Dies erklärt, warum Parallelität eher auf der Ebene der ursprünglichen Eingabequelle (Dateizeilen) als auf dem Ergebnis der Karte (dh Record Pojo) auftrat.

Der gesamte Stream ist entweder parallel oder sequentiell. Wir wählen keine Teilmenge von Operationen aus, die sequentiell oder parallel ausgeführt werden sollen.

Wenn die Terminaloperation gestartet wird, wird die Stream-Pipeline abhängig von der Ausrichtung des Streams, auf den sie aufgerufen wird, sequentiell oder parallel ausgeführt. [...] Wenn die Terminaloperation gestartet wird, wird die Stream-Pipeline je nach Modus des Streams, in dem sie aufgerufen wird, sequentiell oder parallel ausgeführt. gleiche Quelle

Wie Sie bereits erwähnt haben, verwenden parallele Streams geteilte Iteratoren. Dies dient eindeutig dazu, die Daten zu partitionieren, bevor die Vorgänge ausgeführt werden.


In meinem Fall ist die Eingabe ein Datei-E / A-Stream. Welcher Split-Iterator wird verwendet?

Wenn ich mir die Quelle anschaue, sehe ich, dass sie verwendet wird java.nio.file.FileChannelLinesSpliterator


Es spielt keine Rolle, wo wir parallel () in der Pipeline platzieren. Die ursprüngliche Eingabequelle wird immer aufgeteilt und die verbleibenden Zwischenoperationen werden angewendet.

Recht. Sie können sogar mehrmals anrufen parallel()und anrufen sequential(). Der zuletzt aufgerufene gewinnt. Wenn wir aufrufen parallel(), legen wir dies für den zurückgegebenen Stream fest. und wie oben angegeben, laufen alle Operationen entweder sequentiell oder parallel.


In diesem Fall sollte Java Benutzern nicht erlauben, Paralleloperationen irgendwo in der Pipeline zu platzieren, außer an der ursprünglichen Quelle ...

Dies wird zu einer Ansichtssache. Ich denke, Zabuza gibt einen guten Grund, die Wahl der JDK-Designer zu unterstützen.


Der eine Weg, dies zu erreichen, besteht darin, meinen eigenen angepassten Split-Iterator zu schreiben. Gibt es einen anderen Weg?

Dies hängt von Ihrem Betrieb ab

  • Wenn findFirst()es sich um Ihren eigentlichen Terminalbetrieb handelt, müssen Sie sich nicht einmal um die parallele Ausführung kümmern, da doSomething()ohnehin nicht viele Anrufe getätigt werden ( findFirst()Kurzschluss). .parallel()Tatsächlich kann dies dazu führen, dass mehr als ein Element verarbeitet wird, während dies findFirst()in einem sequentiellen Stream verhindert wird.
  • Wenn Ihre Terminaloperation nicht viele Daten erstellt, können Sie Ihre RecordObjekte möglicherweise mithilfe eines sequentiellen Streams erstellen und das Ergebnis dann parallel verarbeiten:

    List<Record> smallData = Files.lines(inputFile.toPath(), 
                                         StandardCharsets.UTF_8)
      .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
      .collect(Collectors.toList())
      .parallelStream()     
      .filter(record -> doSomeOperation())
      .collect(Collectors.toList());
  • Wenn Ihre Pipeline viele Daten in den Speicher laden würde (was möglicherweise der Grund ist, warum Sie sie verwenden Files.lines()), benötigen Sie möglicherweise einen benutzerdefinierten Split-Iterator. Bevor ich dorthin gehe, würde ich mir jedoch andere Optionen ansehen (z. B. das Speichern von Zeilen mit einer ID-Spalte - das ist nur meine Meinung).
    Ich würde auch versuchen, Datensätze in kleineren Stapeln wie folgt zu verarbeiten:

    AtomicInteger recordNumber = new AtomicInteger();
    final int batchSize = 10;
    
    try(BufferedReader reader = Files.newBufferedReader(inputFile.toPath(), 
            StandardCharsets.UTF_8);) {
        Supplier<List<Record>> batchSupplier = () -> {
            List<Record> batch = new ArrayList<>();
            for (int i = 0; i < batchSize; i++) {
                String nextLine;
                try {
                    nextLine = reader.readLine();
                } catch (IOException e) {
                    //hanlde exception
                    throw new RuntimeException(e);
                }
    
                if(null == nextLine) 
                    return batch;
                batch.add(new Record(recordNumber.getAndIncrement(), nextLine));
            }
            System.out.println("next batch");
    
            return batch;
        };
    
        Stream.generate(batchSupplier)
            .takeWhile(list -> list.size() >= batchSize)
            .map(list -> list.parallelStream()
                             .filter(record -> doSomeOperation())
                             .collect(Collectors.toList()))
            .flatMap(List::stream)
            .forEach(System.out::println);
    }

    Dies wird doSomeOperation()parallel ausgeführt, ohne dass alle Daten in den Speicher geladen werden. Beachten Sie jedoch, dass darüber batchSizenachgedacht werden muss.

ernest_k
quelle
1
Danke für die Klarstellung. Es ist gut zu wissen, welche dritte Lösung Sie hervorgehoben haben. Ich werde einen Blick darauf werfen, da ich takeWhile und Supplier nicht verwendet habe.
Explorer
2
Eine benutzerdefinierte SpliteratorImplementierung wäre nicht komplizierter und ermöglicht gleichzeitig eine effizientere Parallelverarbeitung…
Holger,
1
Jede Ihrer inneren parallelStreamOperationen hat einen festen Overhead, um die Operation zu starten und auf das Endergebnis zu warten, während sie auf eine Parallelität von beschränkt ist batchSize. Zunächst benötigen Sie ein Vielfaches der derzeit verfügbaren Anzahl von CPU-Kernen, um Leerlauf-Threads zu vermeiden. Dann sollte die Zahl hoch genug sein, um den festen Overhead zu kompensieren. Je höher die Zahl, desto höher die Pause, die durch die sequentielle Leseoperation verursacht wird, bevor die Parallelverarbeitung überhaupt beginnt.
Holger
1
Das Parallelschalten des äußeren Streams würde in der aktuellen Implementierung zu Stream.generateeiner starken Störung des inneren Streams führen, abgesehen von dem Punkt, an dem ein ungeordneter Stream erzeugt wird, der mit den vom OP beabsichtigten Anwendungsfällen wie z findFirst(). Im Gegensatz dazu funktioniert ein einzelner paralleler Stream mit einem Spliterator, der Chunks zurückgibt, trySplitunkompliziert und ermöglicht es Arbeitsthreads , den nächsten Chunk zu verarbeiten, ohne auf den Abschluss des vorherigen zu warten.
Holger
2
Es gibt keinen Grund anzunehmen, dass eine findFirst()Operation nur eine kleine Anzahl von Elementen verarbeitet. Die erste Übereinstimmung kann immer noch auftreten, nachdem 90% aller Elemente verarbeitet wurden. Wenn Sie zehn Millionen Zeilen haben, müssen Sie auch nach 10% noch eine Million Zeilen verarbeiten, um eine Übereinstimmung zu finden.
Holger
7

Das ursprüngliche Stream-Design enthielt die Idee, nachfolgende Pipeline-Phasen mit unterschiedlichen Einstellungen für die parallele Ausführung zu unterstützen. Diese Idee wurde jedoch aufgegeben. Die API kann aus dieser Zeit stammen, aber andererseits wäre ein API-Design, das den Aufrufer zwingt, eine einzige eindeutige Entscheidung für die parallele oder sequentielle Ausführung zu treffen, viel komplizierter.

Das tatsächliche Spliterator Verwendung durch Files.lines(…)ist implementierungsabhängig. In Java 8 (Oracle oder OpenJDK) erhalten Sie immer das gleiche wie bei BufferedReader.lines(). Wenn in neueren JDKs das Pathzum Standarddateisystem gehört und der Zeichensatz einer der für diese Funktion unterstützten ist, erhalten Sie einen Stream mit einer dedizierten SpliteratorImplementierung, dem java.nio.file.FileChannelLinesSpliterator. Wenn die Voraussetzungen nicht erfüllt sind, erhalten Sie das gleiche wie bei BufferedReader.lines(), das immer noch auf einem Iteratorimplementierten BufferedReaderund umschlossenen basiert Spliterators.spliteratorUnknownSize.

Ihre spezifische Aufgabe wird am besten mit einem benutzerdefinierten erledigt Spliterator die die Zeilennummerierung direkt an der Quelle vor der Parallelverarbeitung durchführen kann, um eine nachfolgende Parallelverarbeitung ohne Einschränkungen zu ermöglichen.

public static Stream<Record> records(Path p) throws IOException {
    LineNoSpliterator sp = new LineNoSpliterator(p);
    return StreamSupport.stream(sp, false).onClose(sp);
}

private static class LineNoSpliterator implements Spliterator<Record>, Runnable {
    int chunkSize = 100;
    SeekableByteChannel channel;
    LineNumberReader reader;

    LineNoSpliterator(Path path) throws IOException {
        channel = Files.newByteChannel(path, StandardOpenOption.READ);
        reader=new LineNumberReader(Channels.newReader(channel,StandardCharsets.UTF_8));
    }

    @Override
    public void run() {
        try(Closeable c1 = reader; Closeable c2 = channel) {}
        catch(IOException ex) { throw new UncheckedIOException(ex); }
        finally { reader = null; channel = null; }
    }

    @Override
    public boolean tryAdvance(Consumer<? super Record> action) {
        try {
            String line = reader.readLine();
            if(line == null) return false;
            action.accept(new Record(reader.getLineNumber(), line));
            return true;
        } catch (IOException ex) {
            throw new UncheckedIOException(ex);
        }
    }

    @Override
    public Spliterator<Record> trySplit() {
        Record[] chunks = new Record[chunkSize];
        int read;
        for(read = 0; read < chunks.length; read++) {
            int pos = read;
            if(!tryAdvance(r -> chunks[pos] = r)) break;
        }
        return Spliterators.spliterator(chunks, 0, read, characteristics());
    }

    @Override
    public long estimateSize() {
        try {
            return (channel.size() - channel.position()) / 60;
        } catch (IOException ex) {
            return 0;
        }
    }

    @Override
    public int characteristics() {
        return ORDERED | NONNULL | DISTINCT;
    }
}
Holger
quelle
0

Das Folgende ist eine einfache Demonstration, wann die Anwendung von Parallel angewendet wird. Die Ausgabe von Peek zeigt deutlich den Unterschied zwischen den beiden Beispielen. Hinweis: Der mapAufruf wird nur eingeworfen, um zuvor eine weitere Methode hinzuzufügen parallel.

IntStream.rangeClosed (1,20).peek(a->System.out.print(a+" "))
        .map(a->a + 200).sum();
System.out.println();
IntStream.rangeClosed(1,20).peek(a->System.out.print(a+" "))
        .map(a->a + 200).parallel().sum();
WJS
quelle