Können Sie einen unsymmetrischen Spliterator unbekannter Größe neu ausbalancieren?

12

Ich möchte a verwenden, Streamum die Verarbeitung eines heterogenen Satzes von remote gespeicherten JSON-Dateien mit unbekannter Anzahl zu parallelisieren (die Anzahl der Dateien ist im Voraus nicht bekannt). Die Dateien können sehr unterschiedlich groß sein, von 1 JSON-Datensatz pro Datei bis zu 100.000 Datensätzen in einigen anderen Dateien. Ein JSON-Datensatz in diesem Fall ein in sich geschlossenes JSON-Objekt, das als eine Zeile in der Datei dargestellt wird.

Ich möchte wirklich Streams dafür verwenden und habe dies implementiert Spliterator:

public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {

    abstract protected JsonStreamSupport<METADATA> openInputStream(String path);

    abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);

    private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
    private static final int MAX_BUFFER = 100;
    private final Iterator<String> paths;
    private JsonStreamSupport<METADATA> reader = null;

    public JsonStreamSpliterator(Iterator<String> paths) {
        this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
        super(est, additionalCharacteristics);
        this.paths = paths;
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
        this(est, additionalCharacteristics, paths);
        open(nextPath);
    }

    @Override
    public boolean tryAdvance(Consumer<? super RECORD> action) {
        if(reader == null) {
            String path = takeNextPath();
            if(path != null) {
                open(path);
            }
            else {
                return false;
            }
        }
        Map<String, Object> json = reader.readJsonLine();
        if(json != null) {
            RECORD item = parse(reader.getMetadata(), json);
            action.accept(item);
            return true;
        }
        else {
            reader.close();
            reader = null;
            return tryAdvance(action);
        }
    }

    private void open(String path) {
        reader = openInputStream(path);
    }

    private String takeNextPath() {
        synchronized(paths) {
            if(paths.hasNext()) {
                return paths.next();
            }
        }
        return null;
    }

    @Override
    public Spliterator<RECORD> trySplit() {
        String nextPath = takeNextPath();
        if(nextPath != null) {
            return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
                @Override
                protected JsonStreamSupport<METADATA> openInputStream(String path) {
                    return JsonStreamSpliterator.this.openInputStream(path);
                }
                @Override
                protected RECORD parse(METADATA metaData, Map<String,Object> json) {
                    return JsonStreamSpliterator.this.parse(metaData, json);
                }
            };              
        }
        else {
            List<RECORD> records = new ArrayList<RECORD>();
            while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
                // loop
            }
            if(records.size() != 0) {
                return records.spliterator();
            }
            else {
                return null;
            }
        }
    }
}

Das Problem, das ich habe, ist, dass, während der Stream zunächst wunderbar parallelisiert, die größte Datei schließlich in einem einzigen Thread verarbeitet wird. Ich glaube, die proximale Ursache ist gut dokumentiert: Der Spliterator ist "unausgeglichen".

Genauer gesagt scheint die trySplitMethode nach einem bestimmten Punkt im Stream.forEachLebenszyklus des Systems nicht mehr aufgerufen zu werden , daher die zusätzliche Logik, kleine Stapel am Ende von zu verteilentrySplit selten ausgeführt wird.

Beachten Sie, dass alle von trySplit zurückgegebenen Spliteratoren denselben pathsIterator verwenden. Ich dachte, dies sei eine wirklich clevere Methode, um die Arbeit über alle Spliteratoren hinweg auszugleichen, aber es hat nicht ausgereicht, um eine vollständige Parallelität zu erreichen.

Ich möchte, dass die parallele Verarbeitung zuerst über Dateien hinweg erfolgt. Wenn dann nur noch wenige große Dateien splittert, möchte ich über Teile der verbleibenden Dateien parallelisieren. Das war die Absicht des elseBlocks am Ende vontrySplit .

Gibt es einen einfachen / einfachen / kanonischen Weg, um dieses Problem zu umgehen?

Alex R.
quelle
2
Sie benötigen eine Größenschätzung. Es kann völlig falsch sein, solange es ungefähr das Verhältnis Ihrer unausgeglichenen Aufteilung widerspiegelt. Andernfalls weiß der Stream nicht, dass die Teilungen nicht ausgeglichen sind, und stoppt, sobald eine bestimmte Anzahl von Blöcken erstellt wurde.
Holger
@Holger können Sie näher darauf eingehen, "wird aufhören, sobald eine bestimmte Anzahl von Chunks erstellt wurde" oder mich dafür auf die JDK-Quelle verweisen? Wie viele Brocken hört es auf?
Alex R
Der Code ist irrelevant, da er zu viele irrelevante Implementierungsdetails enthält, die sich jederzeit ändern können. Der relevante Punkt ist, dass die Implementierung häufig genug versucht, Split aufzurufen, sodass jeder Worker-Thread (angepasst an die Anzahl der CPU-Kerne) etwas zu tun hat. Um unvorhersehbare Unterschiede in der Rechenzeit auszugleichen, werden wahrscheinlich noch mehr Blöcke als Arbeitsthreads erzeugt, um das Stehlen von Arbeit zu ermöglichen und die geschätzten Größen als Heuristik zu verwenden (z. B. um zu entscheiden, welcher Sub-Spliterator weiter aufgeteilt werden soll). Siehe auch stackoverflow.com/a/48174508/2711488
Holger
Ich habe einige Experimente durchgeführt, um Ihren Kommentar zu verstehen. Die Heuristiken scheinen ziemlich primitiv zu sein. Es sieht so aus, als würde die Rückkehr Long.MAX_VALUEzu einer übermäßigen und unnötigen Aufteilung führen, während jede andere Schätzung als Long.MAX_VALUEdie weitere Aufteilung zum Stillstand kommt und die Parallelität zunichte macht. Die Rückgabe einer Mischung aus genauen Schätzungen scheint nicht zu intelligenten Optimierungen zu führen.
Alex R
Ich behaupte nicht, dass die Strategie der Implementierung sehr klug war, aber zumindest funktioniert sie für einige Szenarien mit geschätzten Größen (ansonsten gab es weitaus mehr Fehlerberichte darüber). Es scheint also, dass während der Experimente einige Fehler auf Ihrer Seite waren. Zum Beispiel erweitern Sie im Code Ihrer Frage, AbstractSpliteratoraber überschreiben, trySplit()was eine schlechte Kombination für etwas anderes ist Long.MAX_VALUE, da Sie die Größenschätzung in nicht anpassen trySplit(). Danach trySplit()sollte die Größenschätzung um die Anzahl der abgespaltenen Elemente reduziert werden.
Holger

Antworten:

0

Sie trySplitsollten Splits gleicher Größe ausgeben, unabhängig von der Größe der zugrunde liegenden Dateien. Sie sollten alle Dateien als eine Einheit behandeln und den ArrayListSpliterator mit Unterstützung jedes Mal mit der gleichen Anzahl von JSON-Objekten füllen . Die Anzahl der Objekte sollte so sein, dass die Verarbeitung eines Splits zwischen 1 und 10 Millisekunden dauert: Unter 1 ms nähern Sie sich den Kosten für die Übergabe des Stapels an einen Arbeitsthread, die höher sind, und Sie riskieren aufgrund von eine ungleichmäßige CPU-Auslastung Aufgaben, die zu grobkörnig sind.

Der Spliterator ist nicht verpflichtet, eine Größenschätzung zu melden, und Sie tun dies bereits korrekt: Ihre Schätzung Long.MAX_VALUEist ein spezieller Wert, der "unbegrenzt" bedeutet. Wenn Sie jedoch viele Dateien mit einem einzelnen JSON-Objekt haben, was zu Stapeln der Größe 1 führt, kann dies Ihre Leistung auf zwei Arten beeinträchtigen: Der Aufwand für das Öffnen, Lesen und Schließen der Datei kann zu einem Engpass werden, und wenn Sie es schaffen, zu entkommen dass die Kosten für die Thread-Übergabe im Vergleich zu den Kosten für die Verarbeitung eines Elements erheblich sein können, was wiederum einen Engpass verursacht.

Vor fünf Jahren habe ich ein ähnliches Problem gelöst. Sie können sich meine Lösung ansehen .

Marko Topolnik
quelle
Ja, Sie sind "nicht verpflichtet, eine Größenschätzung zu melden" und Long.MAX_VALUEbeschreiben eine unbekannte Größe korrekt. Dies hilft jedoch nicht, wenn die tatsächliche Stream-Implementierung dann eine schlechte Leistung erbringt. Selbst ThreadLocalRandom.current().nextInt(100, 100_000)wenn das Ergebnis als geschätzte Größe verwendet wird, werden bessere Ergebnisse erzielt.
Holger
Es hat sich für meine Anwendungsfälle bewährt, bei denen die Berechnungskosten für jeden Artikel erheblich waren. Ich konnte problemlos eine Gesamtauslastung von 98% der CPU erreichen und den Durchsatz nahezu linear mit der Parallelität skalieren. Grundsätzlich ist es wichtig, die Stapelgröße richtig einzustellen, damit die Verarbeitung zwischen 1 und 10 Millisekunden dauert. Dies liegt weit über den Kosten für die Thread-Übergabe und ist nicht zu lang, um Probleme mit der Granularität von Aufgaben zu verursachen. Ich habe gegen Ende dieses Beitrags Benchmark-Ergebnisse veröffentlicht .
Marko Topolnik
Ihre Lösung spaltet ein Off , ArraySpliteratordie hat eine geschätzte Größe (sogar eine exakte Größe). Bei der Stream-Implementierung wird also die Array-Größe vs Long.MAX_VALUEangezeigt. Betrachten Sie dies als unausgeglichen und teilen Sie den "größeren" Spliterator (wenn Sie dies ignorieren, Long.MAX_VALUEbedeutet dies "unbekannt"), bis er nicht mehr weiter aufgeteilt werden kann. Wenn dann nicht genügend Blöcke vorhanden sind, werden die Array-basierten Spliteratoren unter Verwendung ihrer bekannten Größen aufgeteilt. Ja, das funktioniert sehr gut, widerspricht aber nicht meiner Aussage, dass Sie eine Größenschätzung benötigen, unabhängig davon, wie schlecht sie ist.
Holger
OK, es scheint also ein Missverständnis zu sein - weil Sie keine Größenschätzung für die Eingabe benötigen. Nur auf die einzelnen Splits, und das können Sie immer haben.
Marko Topolnik
Nun, mein erster Kommentar war: " Sie benötigen eine Größenschätzung. Es kann völlig falsch sein, solange es ungefähr das Verhältnis Ihrer unausgeglichenen Aufteilung widerspiegelt. " Der entscheidende Punkt hierbei war, dass der OP-Code einen weiteren Spliterator erstellt, der jedoch ein einzelnes Element enthält meldet immer noch eine unbekannte Größe. Dies macht die Stream-Implementierung hilflos. Jede geschätzte Anzahl für den neuen Spliterator, die erheblich kleiner ist, Long.MAX_VALUEwürde ausreichen .
Holger
0

Nach vielen Experimenten konnte ich immer noch keine zusätzliche Parallelität erzielen, indem ich mit den Größenschätzungen spielte. Grundsätzlich jeder andere Wert alsLong.MAX_VALUE dazu, dass der Spliterator zu früh (und ohne Aufteilung) beendet wird, während andererseits eine Long.MAX_VALUESchätzung dazu führt trySplit, dass sie unerbittlich aufgerufen wird, bis sie zurückkehrt null.

Die Lösung, die ich gefunden habe, besteht darin, die Ressourcen intern unter den Spliteratoren zu teilen und sie untereinander neu ausbalancieren zu lassen.

Arbeitscode:

public class AwsS3LineSpliterator<LINE> extends AbstractSpliterator<AwsS3LineInput<LINE>> {

    public final static class AwsS3LineInput<LINE> {
        final public S3ObjectSummary s3ObjectSummary;
        final public LINE lineItem;
        public AwsS3LineInput(S3ObjectSummary s3ObjectSummary, LINE lineItem) {
            this.s3ObjectSummary = s3ObjectSummary;
            this.lineItem = lineItem;
        }
    }

    private final class InputStreamHandler {
        final S3ObjectSummary file;
        final InputStream inputStream;
        InputStreamHandler(S3ObjectSummary file, InputStream is) {
            this.file = file;
            this.inputStream = is;
        }
    }

    private final Iterator<S3ObjectSummary> incomingFiles;

    private final Function<S3ObjectSummary, InputStream> fileOpener;

    private final Function<InputStream, LINE> lineReader;

    private final Deque<S3ObjectSummary> unopenedFiles;

    private final Deque<InputStreamHandler> openedFiles;

    private final Deque<AwsS3LineInput<LINE>> sharedBuffer;

    private final int maxBuffer;

    private AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener,
            Function<InputStream, LINE> lineReader,
            Deque<S3ObjectSummary> unopenedFiles, Deque<InputStreamHandler> openedFiles, Deque<AwsS3LineInput<LINE>> sharedBuffer,
            int maxBuffer) {
        super(Long.MAX_VALUE, 0);
        this.incomingFiles = incomingFiles;
        this.fileOpener = fileOpener;
        this.lineReader = lineReader;
        this.unopenedFiles = unopenedFiles;
        this.openedFiles = openedFiles;
        this.sharedBuffer = sharedBuffer;
        this.maxBuffer = maxBuffer;
    }

    public AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener, Function<InputStream, LINE> lineReader, int maxBuffer) {
        this(incomingFiles, fileOpener, lineReader, new ConcurrentLinkedDeque<>(), new ConcurrentLinkedDeque<>(), new ArrayDeque<>(maxBuffer), maxBuffer);
    }

    @Override
    public boolean tryAdvance(Consumer<? super AwsS3LineInput<LINE>> action) {
        AwsS3LineInput<LINE> lineInput;
        synchronized(sharedBuffer) {
            lineInput=sharedBuffer.poll();
        }
        if(lineInput != null) {
            action.accept(lineInput);
            return true;
        }
        InputStreamHandler handle = openedFiles.poll();
        if(handle == null) {
            S3ObjectSummary unopenedFile = unopenedFiles.poll();
            if(unopenedFile == null) {
                return false;
            }
            handle = new InputStreamHandler(unopenedFile, fileOpener.apply(unopenedFile));
        }
        for(int i=0; i < maxBuffer; ++i) {
            LINE line = lineReader.apply(handle.inputStream);
            if(line != null) {
                synchronized(sharedBuffer) {
                    sharedBuffer.add(new AwsS3LineInput<LINE>(handle.file, line));
                }
            }
            else {
                return tryAdvance(action);
            }
        }
        openedFiles.addFirst(handle);
        return tryAdvance(action);
    }

    @Override
    public Spliterator<AwsS3LineInput<LINE>> trySplit() {
        synchronized(incomingFiles) {
            if (incomingFiles.hasNext()) {
                unopenedFiles.add(incomingFiles.next());
                return new AwsS3LineSpliterator<LINE>(incomingFiles, fileOpener, lineReader, unopenedFiles, openedFiles, sharedBuffer, maxBuffer);
            } else {
                return null;
            }
        }
    }
}
Alex R.
quelle