Ich habe eine große Datei, die eine Liste von Elementen enthält.
Ich möchte einen Stapel von Elementen erstellen und mit diesem Stapel eine HTTP-Anfrage stellen (alle Elemente werden als Parameter in der HTTP-Anfrage benötigt). Ich kann es sehr einfach mit einer for
Schleife machen, aber als Java 8-Liebhaber möchte ich versuchen, dies mit dem Stream-Framework von Java 8 zu schreiben (und die Vorteile der verzögerten Verarbeitung zu nutzen).
Beispiel:
List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}
if (batch.size() > 0) process(batch);
Ich möchte etwas langes tun
lazyFileStream.group(500).map(processBatch).collect(toList())
Was wäre der beste Weg, dies zu tun?
java
java-8
batch-processing
java-stream
Andy Dang
quelle
quelle
flatMap
(+ eine zusätzliche flatMap, um die Streams wieder zu reduzieren)? Ich denke nicht, dass so etwas als bequeme Methode in der Standardbibliothek existiert. Entweder müssen Sie eine Drittanbieter-Bibliothek finden oder eine eigene basierend auf Spliteratoren und / oder einem Sammler schreiben, der einen Stream von StreamsStream.generate
mitreader::readLine
und kombinierenlimit
, aber das Problem ist, dass Streams nicht gut zu Ausnahmen passen. Auch dies ist wahrscheinlich nicht gut parallelisierbar. Ich denke, diefor
Schleife ist immer noch die beste Option.Antworten:
Hinweis! Diese Lösung liest die gesamte Datei, bevor forEach ausgeführt wird.
Sie können dies mit jOOλ tun , einer Bibliothek, die Java 8-Streams für Anwendungsfälle mit sequentiellen Streams mit einem Thread erweitert:
Hinter den Kulissen
zipWithIndex()
ist nur:... wohingegen
groupBy()
API-Komfort für:(Haftungsausschluss: Ich arbeite für die Firma hinter jOOλ)
quelle
Map
(im Gegensatz zum Beispiel zur Lösung von Ben Manes)Der Vollständigkeit halber finden Sie hier eine Guava- Lösung.
In der Frage ist die Sammlung verfügbar, sodass kein Stream benötigt wird und wie folgt geschrieben werden kann:
quelle
Lists.partition
ist eine andere Variante, die ich hätte erwähnen sollen.Stream
in den Speicher aufrufen , bevor der relevante Stapel verarbeitet wirdbatchSize
Elemente pro Iteration verbraucht werden.Eine reine Java-8-Implementierung ist ebenfalls möglich:
Beachten Sie, dass es im Gegensatz zu JOOl gut parallel funktionieren kann (vorausgesetzt, es
data
handelt sich um eine Direktzugriffsliste).quelle
List
(siehedata.size()
,data.get()
in der Frage). Ich beantworte die gestellte Frage. Wenn Sie eine andere Frage haben, stellen Sie sie stattdessen (obwohl ich denke, dass die Stream-Frage auch bereits gestellt wurde).Reine Java 8-Lösung :
Wir können einen benutzerdefinierten Sammler erstellen, um dies elegant zu tun, der ein
batch size
und ein benötigtConsumer
, um jede Charge zu verarbeiten:Optional können Sie dann eine Hilfsdienstklasse erstellen:
Anwendungsbeispiel:
Ich habe meinen Code auch auf GitHub gepostet, wenn jemand einen Blick darauf werfen möchte:
Link zu Github
quelle
Ich habe einen benutzerdefinierten Spliterator für solche Szenarien geschrieben. Es werden Listen einer bestimmten Größe aus dem Eingabestream gefüllt. Der Vorteil dieses Ansatzes besteht darin, dass eine verzögerte Verarbeitung durchgeführt wird und mit anderen Stream-Funktionen gearbeitet wird.
quelle
SUBSIZED
die von zurückgegebenen Teilungen handelt,trySplit
können mehr Elemente als vor der Teilung vorhanden sein (wenn die Teilung in der Mitte des Stapels erfolgt).Spliterators
korrekt ist,trySplit
sollten die Daten dann immer in zwei ungefähr gleiche Teile aufgeteilt werden, damit das Ergebnis niemals größer als das Original ist?if this Spliterator is SUBSIZED, then estimateSize() for this spliterator before splitting must be equal to the sum of estimateSize() for this and the returned Spliterator after splitting.
Wir hatten ein ähnliches Problem zu lösen. Wir wollten einen Stream nehmen, der größer als der Systemspeicher ist (alle Objekte in einer Datenbank durchlaufen) und die Reihenfolge so gut wie möglich zufällig sortieren - wir dachten, es wäre in Ordnung, 10.000 Elemente zu puffern und zufällig zu sortieren.
Das Ziel war eine Funktion, die einen Strom aufnahm.
Von den hier vorgeschlagenen Lösungen scheint es eine Reihe von Optionen zu geben:
Unser Instinkt war ursprünglich, einen benutzerdefinierten Sammler zu verwenden, aber dies bedeutete, das Streaming zu beenden. Die oben beschriebene benutzerdefinierte Kollektorlösung ist sehr gut und wir haben sie fast verwendet.
Hier ist eine Lösung, die betrügt, indem sie die Tatsache nutzt, dass
Stream
s Ihnen eine geben kann,Iterator
die Sie als Notluke verwenden können , damit Sie etwas extra tun können, das Streams nicht unterstützen. DasIterator
wird mit einem anderen Stück Java 8-StreamSupport
Zauberei wieder in einen Stream konvertiert .Ein einfaches Beispiel für die Verwendung würde folgendermaßen aussehen:
Die obigen Drucke
Für unseren Anwendungsfall wollten wir die Stapel mischen und dann als Stream behalten - es sah so aus:
Dies gibt so etwas wie aus (es ist zufällig, also jedes Mal anders)
Die geheime Sauce hier ist, dass es immer einen Stream gibt, sodass Sie entweder einen Stream von Chargen bearbeiten oder mit jeder Charge etwas tun und dann
flatMap
zurück zu einem Stream. Noch besser ist , alle der oben genannten nur läuft , wenn die letzteforEach
odercollect
oder andere Abschluss Ausdrücke PULL die Daten über den Strom.Es stellt sich heraus, dass dies
iterator
eine spezielle Art der Beendigung eines Streams ist und nicht dazu führt, dass der gesamte Stream ausgeführt wird und in den Speicher gelangt! Vielen Dank an die Java 8 Jungs für ein brillantes Design!quelle
List
- Sie können die Iteration der Elemente innerhalb der Charge nicht verschieben, da der Verbraucher möglicherweise eine ganze Charge überspringen möchte und wenn Sie die nicht verbraucht haben Elemente dann würden sie nicht sehr weit überspringen. (Ich habe eine davon in C # implementiert, obwohl es wesentlich einfacher war.)Sie können auch RxJava verwenden :
oder
oder
quelle
Sie können sich auch Cyclops-React ansehen , ich bin der Autor dieser Bibliothek. Es implementiert die jOOλ-Schnittstelle (und damit auch JDK 8-Streams), konzentriert sich jedoch im Gegensatz zu parallelen JDK 8-Streams auf asynchrone Vorgänge (z. B. das potenzielle Blockieren von Async-E / A-Aufrufen). JDK Parallel Streams hingegen konzentrieren sich auf Datenparallelität für CPU-gebundene Operationen. Es verwaltet Aggregate zukünftiger Aufgaben unter der Haube, bietet Endbenutzern jedoch eine standardmäßige erweiterte Stream-API.
Dieser Beispielcode kann Ihnen den Einstieg erleichtern
Hier finden Sie ein Tutorial zum Stapeln
Und ein allgemeineres Tutorial hier
Um Ihren eigenen Thread-Pool zu verwenden (der wahrscheinlich besser zum Blockieren von E / A geeignet ist), können Sie mit der Verarbeitung beginnen
quelle
Reines Java 8-Beispiel, das auch mit parallelen Streams funktioniert.
Wie benutzt man:
Die Methodendeklaration und -implementierung:
quelle
Schauen Sie sich fairerweise die elegante Vavr- Lösung an:
quelle
Einfaches Beispiel mit Spliterator
Die Antwort von Bruce ist umfassender, aber ich suchte nach etwas Schnellem und Schmutzigem, um eine Reihe von Dateien zu verarbeiten.
quelle
Dies ist eine reine Java-Lösung, die träge bewertet wird.
quelle
Sie können apache.commons verwenden:
Der Partitionierungsteil erfolgt nicht träge, aber nachdem die Liste partitioniert wurde, erhalten Sie die Vorteile der Arbeit mit Streams (z. B. parallele Streams verwenden, Filter hinzufügen usw.). Andere Antworten schlugen ausgefeiltere Lösungen vor, aber manchmal sind Lesbarkeit und Wartbarkeit wichtiger (und manchmal nicht :-))
quelle
Es könnte leicht mit Reaktor gemacht werden :
quelle
Mit
Java 8
undcom.google.common.collect.Lists
können Sie so etwas tun wie:Hier
T
ist der Typ der Elemente in der Eingabeliste undU
der Typ der Elemente in der AusgabelisteUnd Sie können es so verwenden:
quelle