Ich möchte a verwenden, Stream
um die Verarbeitung eines heterogenen Satzes von remote gespeicherten JSON-Dateien mit unbekannter Anzahl zu parallelisieren (die Anzahl der Dateien ist im Voraus nicht bekannt). Die Dateien können sehr unterschiedlich groß sein, von 1 JSON-Datensatz pro Datei bis zu 100.000 Datensätzen in einigen anderen Dateien. Ein JSON-Datensatz in diesem Fall ein in sich geschlossenes JSON-Objekt, das als eine Zeile in der Datei dargestellt wird.
Ich möchte wirklich Streams dafür verwenden und habe dies implementiert Spliterator
:
public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {
abstract protected JsonStreamSupport<METADATA> openInputStream(String path);
abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);
private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
private static final int MAX_BUFFER = 100;
private final Iterator<String> paths;
private JsonStreamSupport<METADATA> reader = null;
public JsonStreamSpliterator(Iterator<String> paths) {
this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
super(est, additionalCharacteristics);
this.paths = paths;
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
this(est, additionalCharacteristics, paths);
open(nextPath);
}
@Override
public boolean tryAdvance(Consumer<? super RECORD> action) {
if(reader == null) {
String path = takeNextPath();
if(path != null) {
open(path);
}
else {
return false;
}
}
Map<String, Object> json = reader.readJsonLine();
if(json != null) {
RECORD item = parse(reader.getMetadata(), json);
action.accept(item);
return true;
}
else {
reader.close();
reader = null;
return tryAdvance(action);
}
}
private void open(String path) {
reader = openInputStream(path);
}
private String takeNextPath() {
synchronized(paths) {
if(paths.hasNext()) {
return paths.next();
}
}
return null;
}
@Override
public Spliterator<RECORD> trySplit() {
String nextPath = takeNextPath();
if(nextPath != null) {
return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
@Override
protected JsonStreamSupport<METADATA> openInputStream(String path) {
return JsonStreamSpliterator.this.openInputStream(path);
}
@Override
protected RECORD parse(METADATA metaData, Map<String,Object> json) {
return JsonStreamSpliterator.this.parse(metaData, json);
}
};
}
else {
List<RECORD> records = new ArrayList<RECORD>();
while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
// loop
}
if(records.size() != 0) {
return records.spliterator();
}
else {
return null;
}
}
}
}
Das Problem, das ich habe, ist, dass, während der Stream zunächst wunderbar parallelisiert, die größte Datei schließlich in einem einzigen Thread verarbeitet wird. Ich glaube, die proximale Ursache ist gut dokumentiert: Der Spliterator ist "unausgeglichen".
Genauer gesagt scheint die trySplit
Methode nach einem bestimmten Punkt im Stream.forEach
Lebenszyklus des Systems nicht mehr aufgerufen zu werden , daher die zusätzliche Logik, kleine Stapel am Ende von zu verteilentrySplit
selten ausgeführt wird.
Beachten Sie, dass alle von trySplit zurückgegebenen Spliteratoren denselben paths
Iterator verwenden. Ich dachte, dies sei eine wirklich clevere Methode, um die Arbeit über alle Spliteratoren hinweg auszugleichen, aber es hat nicht ausgereicht, um eine vollständige Parallelität zu erreichen.
Ich möchte, dass die parallele Verarbeitung zuerst über Dateien hinweg erfolgt. Wenn dann nur noch wenige große Dateien splittert, möchte ich über Teile der verbleibenden Dateien parallelisieren. Das war die Absicht des else
Blocks am Ende vontrySplit
.
Gibt es einen einfachen / einfachen / kanonischen Weg, um dieses Problem zu umgehen?
quelle
Long.MAX_VALUE
zu einer übermäßigen und unnötigen Aufteilung führen, während jede andere Schätzung alsLong.MAX_VALUE
die weitere Aufteilung zum Stillstand kommt und die Parallelität zunichte macht. Die Rückgabe einer Mischung aus genauen Schätzungen scheint nicht zu intelligenten Optimierungen zu führen.AbstractSpliterator
aber überschreiben,trySplit()
was eine schlechte Kombination für etwas anderes istLong.MAX_VALUE
, da Sie die Größenschätzung in nicht anpassentrySplit()
. DanachtrySplit()
sollte die Größenschätzung um die Anzahl der abgespaltenen Elemente reduziert werden.Antworten:
Sie
trySplit
sollten Splits gleicher Größe ausgeben, unabhängig von der Größe der zugrunde liegenden Dateien. Sie sollten alle Dateien als eine Einheit behandeln und denArrayList
Spliterator mit Unterstützung jedes Mal mit der gleichen Anzahl von JSON-Objekten füllen . Die Anzahl der Objekte sollte so sein, dass die Verarbeitung eines Splits zwischen 1 und 10 Millisekunden dauert: Unter 1 ms nähern Sie sich den Kosten für die Übergabe des Stapels an einen Arbeitsthread, die höher sind, und Sie riskieren aufgrund von eine ungleichmäßige CPU-Auslastung Aufgaben, die zu grobkörnig sind.Der Spliterator ist nicht verpflichtet, eine Größenschätzung zu melden, und Sie tun dies bereits korrekt: Ihre Schätzung
Long.MAX_VALUE
ist ein spezieller Wert, der "unbegrenzt" bedeutet. Wenn Sie jedoch viele Dateien mit einem einzelnen JSON-Objekt haben, was zu Stapeln der Größe 1 führt, kann dies Ihre Leistung auf zwei Arten beeinträchtigen: Der Aufwand für das Öffnen, Lesen und Schließen der Datei kann zu einem Engpass werden, und wenn Sie es schaffen, zu entkommen dass die Kosten für die Thread-Übergabe im Vergleich zu den Kosten für die Verarbeitung eines Elements erheblich sein können, was wiederum einen Engpass verursacht.Vor fünf Jahren habe ich ein ähnliches Problem gelöst. Sie können sich meine Lösung ansehen .
quelle
Long.MAX_VALUE
beschreiben eine unbekannte Größe korrekt. Dies hilft jedoch nicht, wenn die tatsächliche Stream-Implementierung dann eine schlechte Leistung erbringt. SelbstThreadLocalRandom.current().nextInt(100, 100_000)
wenn das Ergebnis als geschätzte Größe verwendet wird, werden bessere Ergebnisse erzielt.ArraySpliterator
die hat eine geschätzte Größe (sogar eine exakte Größe). Bei der Stream-Implementierung wird also die Array-Größe vsLong.MAX_VALUE
angezeigt. Betrachten Sie dies als unausgeglichen und teilen Sie den "größeren" Spliterator (wenn Sie dies ignorieren,Long.MAX_VALUE
bedeutet dies "unbekannt"), bis er nicht mehr weiter aufgeteilt werden kann. Wenn dann nicht genügend Blöcke vorhanden sind, werden die Array-basierten Spliteratoren unter Verwendung ihrer bekannten Größen aufgeteilt. Ja, das funktioniert sehr gut, widerspricht aber nicht meiner Aussage, dass Sie eine Größenschätzung benötigen, unabhängig davon, wie schlecht sie ist.Long.MAX_VALUE
würde ausreichen .Nach vielen Experimenten konnte ich immer noch keine zusätzliche Parallelität erzielen, indem ich mit den Größenschätzungen spielte. Grundsätzlich jeder andere Wert als
Long.MAX_VALUE
dazu, dass der Spliterator zu früh (und ohne Aufteilung) beendet wird, während andererseits eineLong.MAX_VALUE
Schätzung dazu führttrySplit
, dass sie unerbittlich aufgerufen wird, bis sie zurückkehrtnull
.Die Lösung, die ich gefunden habe, besteht darin, die Ressourcen intern unter den Spliteratoren zu teilen und sie untereinander neu ausbalancieren zu lassen.
Arbeitscode:
quelle