Java 8 Stream mit Stapelverarbeitung

93

Ich habe eine große Datei, die eine Liste von Elementen enthält.

Ich möchte einen Stapel von Elementen erstellen und mit diesem Stapel eine HTTP-Anfrage stellen (alle Elemente werden als Parameter in der HTTP-Anfrage benötigt). Ich kann es sehr einfach mit einer forSchleife machen, aber als Java 8-Liebhaber möchte ich versuchen, dies mit dem Stream-Framework von Java 8 zu schreiben (und die Vorteile der verzögerten Verarbeitung zu nutzen).

Beispiel:

List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
  batch.add(data.get(i));
  if (batch.size() == BATCH_SIZE) process(batch);
}

if (batch.size() > 0) process(batch);

Ich möchte etwas langes tun lazyFileStream.group(500).map(processBatch).collect(toList())

Was wäre der beste Weg, dies zu tun?

Andy Dang
quelle
Ich kann leider nicht genau herausfinden, wie die Gruppierung durchgeführt werden soll, aber die Zeilen von Files # lesen den Inhalt der Datei träge.
Toby
1
Sie benötigen also grundsätzlich eine Umkehrung von flatMap(+ eine zusätzliche flatMap, um die Streams wieder zu reduzieren)? Ich denke nicht, dass so etwas als bequeme Methode in der Standardbibliothek existiert. Entweder müssen Sie eine Drittanbieter-Bibliothek finden oder eine eigene basierend auf Spliteratoren und / oder einem Sammler schreiben, der einen Stream von Streams
ausgibt
3
Vielleicht können Sie Stream.generatemit reader::readLineund kombinieren limit, aber das Problem ist, dass Streams nicht gut zu Ausnahmen passen. Auch dies ist wahrscheinlich nicht gut parallelisierbar. Ich denke, die forSchleife ist immer noch die beste Option.
tobias_k
Ich habe gerade einen Beispielcode hinzugefügt. Ich denke nicht, dass flatMap der richtige Weg ist. Ich vermute, dass ich einen benutzerdefinierten Spliterator schreiben muss
Andy Dang
1
Ich präge den Begriff "Stream Missbrauch" für Fragen wie diese.
Kervin

Antworten:

13

Hinweis! Diese Lösung liest die gesamte Datei, bevor forEach ausgeführt wird.

Sie können dies mit jOOλ tun , einer Bibliothek, die Java 8-Streams für Anwendungsfälle mit sequentiellen Streams mit einem Thread erweitert:

Seq.seq(lazyFileStream)              // Seq<String>
   .zipWithIndex()                   // Seq<Tuple2<String, Long>>
   .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
   .forEach((index, batch) -> {
       process(batch);
   });

Hinter den Kulissen zipWithIndex()ist nur:

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
    final Iterator<T> it = stream.iterator();

    class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
        long index;

        @Override
        public boolean hasNext() {
            return it.hasNext();
        }

        @Override
        public Tuple2<T, Long> next() {
            return tuple(it.next(), index++);
        }
    }

    return seq(new ZipWithIndex());
}

... wohingegen groupBy()API-Komfort für:

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
    return collect(Collectors.groupingBy(classifier));
}

(Haftungsausschluss: Ich arbeite für die Firma hinter jOOλ)

Lukas Eder
quelle
Beeindruckend. Genau das suche ich. Unser System verarbeitet normalerweise Datenströme nacheinander, sodass dies gut für die Umstellung auf Java 8 geeignet ist.
Andy Dang
16
Beachten Sie, dass diese Lösung unnötigerweise den gesamten Eingabestream auf dem Zwischenprodukt speichert Map(im Gegensatz zum Beispiel zur Lösung von Ben Manes)
Tagir Valeev,
123

Der Vollständigkeit halber finden Sie hier eine Guava- Lösung.

Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);

In der Frage ist die Sammlung verfügbar, sodass kein Stream benötigt wird und wie folgt geschrieben werden kann:

Iterables.partition(data, batchSize).forEach(this::process);
Ben Manes
quelle
2
Das sieht für mich am einfachsten und am besten lesbar aus. Danke für das Teilen!
grinch
11
Lists.partitionist eine andere Variante, die ich hätte erwähnen sollen.
Ben Manes
2
das ist faul, oder? es wird nicht das gesamte Streamin den Speicher aufrufen , bevor der relevante Stapel verarbeitet wird
orirab
1
@orirab ja. Es ist zwischen Stapeln faul, da darin batchSizeElemente pro Iteration verbraucht werden.
Ben Manes
Könnten Sie bitte einen Blick auf stackoverflow.com/questions/58666190/…
gstackoverflow
57

Eine reine Java-8-Implementierung ist ebenfalls möglich:

int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
         .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
         .forEach(batch -> process(batch));

Beachten Sie, dass es im Gegensatz zu JOOl gut parallel funktionieren kann (vorausgesetzt, es datahandelt sich um eine Direktzugriffsliste).

Tagir Valeev
quelle
1
Was ist, wenn Ihre Daten tatsächlich ein Stream sind? (Sagen wir Zeilen in einer Datei oder sogar aus dem Netzwerk).
Omry Yadan
6
@OmryYadan, war die Frage nach der Eingabe von der aufweist List(siehe data.size(), data.get()in der Frage). Ich beantworte die gestellte Frage. Wenn Sie eine andere Frage haben, stellen Sie sie stattdessen (obwohl ich denke, dass die Stream-Frage auch bereits gestellt wurde).
Tagir Valeev
1
Wie werden die Chargen parallel verarbeitet?
suppe_boy
Sehr innovativ
Sylvester
35

Reine Java 8-Lösung :

Wir können einen benutzerdefinierten Sammler erstellen, um dies elegant zu tun, der ein batch sizeund ein benötigt Consumer, um jede Charge zu verarbeiten:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;

import static java.util.Objects.requireNonNull;


/**
 * Collects elements in the stream and calls the supplied batch processor
 * after the configured batch size is reached.
 *
 * In case of a parallel stream, the batch processor may be called with
 * elements less than the batch size.
 *
 * The elements are not kept in memory, and the final result will be an
 * empty list.
 *
 * @param <T> Type of the elements being collected
 */
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {

    private final int batchSize;
    private final Consumer<List<T>> batchProcessor;


    /**
     * Constructs the batch collector
     *
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     */
    BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        batchProcessor = requireNonNull(batchProcessor);

        this.batchSize = batchSize;
        this.batchProcessor = batchProcessor;
    }

    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }

    public BiConsumer<List<T>, T> accumulator() {
        return (ts, t) -> {
            ts.add(t);
            if (ts.size() >= batchSize) {
                batchProcessor.accept(ts);
                ts.clear();
            }
        };
    }

    public BinaryOperator<List<T>> combiner() {
        return (ts, ots) -> {
            // process each parallel list without checking for batch size
            // avoids adding all elements of one to another
            // can be modified if a strict batching mode is required
            batchProcessor.accept(ts);
            batchProcessor.accept(ots);
            return Collections.emptyList();
        };
    }

    public Function<List<T>, List<T>> finisher() {
        return ts -> {
            batchProcessor.accept(ts);
            return Collections.emptyList();
        };
    }

    public Set<Characteristics> characteristics() {
        return Collections.emptySet();
    }
}

Optional können Sie dann eine Hilfsdienstklasse erstellen:

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;

public class StreamUtils {

    /**
     * Creates a new batch collector
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     * @param <T> the type of elements being processed
     * @return a batch collector instance
     */
    public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        return new BatchCollector<T>(batchSize, batchProcessor);
    }
}

Anwendungsbeispiel:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();

int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);

input.stream()
     .collect(StreamUtils.batchCollector(batchSize, batchProcessor));

Ich habe meinen Code auch auf GitHub gepostet, wenn jemand einen Blick darauf werfen möchte:

Link zu Github

Rohitvats
quelle
1
Dies ist eine gute Lösung, es sei denn, Sie können nicht alle Elemente aus Ihrem Stream in den Speicher einpassen. Es funktioniert auch nicht mit endlosen Streams - die Erfassungsmethode ist terminal, was bedeutet, dass anstelle des Erzeugens eines Stapelstroms gewartet wird, bis der Stream abgeschlossen ist, und dann das Ergebnis in Stapeln verarbeitet wird.
Alex Ackerman
2
@AlexAckerman Ein unendlicher Stream bedeutet, dass der Finisher nie aufgerufen wird, der Akkumulator jedoch weiterhin aufgerufen wird, sodass die Elemente weiterhin verarbeitet werden. Außerdem muss sich nur die Stapelgröße der Elemente gleichzeitig im Speicher befinden.
Solubris
@ Solubris, du hast recht! Mein schlechtes Dankeschön für den Hinweis - ich werde den Kommentar für die Referenz nicht löschen, wenn jemand die gleiche Vorstellung davon hat, wie die Erfassungsmethode funktioniert.
Alex Ackerman
Die an den Verbraucher gesendete Liste sollte kopiert werden, um die Änderung sicher zu machen, z. B.: BatchProcessor.accept (copyOf (ts))
Solubris
19

Ich habe einen benutzerdefinierten Spliterator für solche Szenarien geschrieben. Es werden Listen einer bestimmten Größe aus dem Eingabestream gefüllt. Der Vorteil dieses Ansatzes besteht darin, dass eine verzögerte Verarbeitung durchgeführt wird und mit anderen Stream-Funktionen gearbeitet wird.

public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
    return batchSize <= 0
        ? Stream.of(stream.collect(Collectors.toList()))
        : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}

private static class BatchSpliterator<E> implements Spliterator<List<E>> {

    private final Spliterator<E> base;
    private final int batchSize;

    public BatchSpliterator(Spliterator<E> base, int batchSize) {
        this.base = base;
        this.batchSize = batchSize;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<E>> action) {
        final List<E> batch = new ArrayList<>(batchSize);
        for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
            ;
        if (batch.isEmpty())
            return false;
        action.accept(batch);
        return true;
    }

    @Override
    public Spliterator<List<E>> trySplit() {
        if (base.estimateSize() <= batchSize)
            return null;
        final Spliterator<E> splitBase = this.base.trySplit();
        return splitBase == null ? null
                : new BatchSpliterator<>(splitBase, batchSize);
    }

    @Override
    public long estimateSize() {
        final double baseSize = base.estimateSize();
        return baseSize == 0 ? 0
                : (long) Math.ceil(baseSize / (double) batchSize);
    }

    @Override
    public int characteristics() {
        return base.characteristics();
    }

}
Bruce Hamilton
quelle
wirklich hilfreich. Wenn jemand nach benutzerdefinierten Kriterien stapeln möchte (z. B. Größe der Sammlung in Byte), können Sie Ihr benutzerdefiniertes Prädikat delegieren und es als Bedingung in der for-Schleife verwenden (imho while-Schleife ist dann besser lesbar)
pls
Ich bin mir nicht sicher, ob die Implementierung korrekt ist. Wenn es sich bei dem Basisstrom beispielsweise um SUBSIZEDdie von zurückgegebenen Teilungen handelt, trySplitkönnen mehr Elemente als vor der Teilung vorhanden sein (wenn die Teilung in der Mitte des Stapels erfolgt).
Malt
@Malt Wenn mein Verständnis Spliteratorskorrekt ist, trySplitsollten die Daten dann immer in zwei ungefähr gleiche Teile aufgeteilt werden, damit das Ergebnis niemals größer als das Original ist?
Bruce Hamilton
@BruceHamilton Leider können die Teile laut Dokumentation nicht ungefähr gleich sein. Sie müssen gleich sein:if this Spliterator is SUBSIZED, then estimateSize() for this spliterator before splitting must be equal to the sum of estimateSize() for this and the returned Spliterator after splitting.
Malt
Ja, das stimmt mit meinem Verständnis der Spliterator-Aufteilung überein. Es fällt mir jedoch schwer zu verstehen, wie "die von trySplit zurückgegebenen Teilungen mehr Elemente enthalten können als vor der Teilung". Können Sie näher erläutern, was Sie dort meinen?
Bruce Hamilton
13

Wir hatten ein ähnliches Problem zu lösen. Wir wollten einen Stream nehmen, der größer als der Systemspeicher ist (alle Objekte in einer Datenbank durchlaufen) und die Reihenfolge so gut wie möglich zufällig sortieren - wir dachten, es wäre in Ordnung, 10.000 Elemente zu puffern und zufällig zu sortieren.

Das Ziel war eine Funktion, die einen Strom aufnahm.

Von den hier vorgeschlagenen Lösungen scheint es eine Reihe von Optionen zu geben:

  • Verwenden Sie verschiedene zusätzliche Bibliotheken, die nicht von Java 8 stammen
  • Beginnen Sie mit etwas, das kein Stream ist - z. B. einer Liste mit wahlfreiem Zugriff
  • Haben Sie einen Strom, der leicht in einem Spliterator geteilt werden kann

Unser Instinkt war ursprünglich, einen benutzerdefinierten Sammler zu verwenden, aber dies bedeutete, das Streaming zu beenden. Die oben beschriebene benutzerdefinierte Kollektorlösung ist sehr gut und wir haben sie fast verwendet.

Hier ist eine Lösung, die betrügt, indem sie die Tatsache nutzt, dass Streams Ihnen eine geben kann, Iteratordie Sie als Notluke verwenden können , damit Sie etwas extra tun können, das Streams nicht unterstützen. Das Iteratorwird mit einem anderen Stück Java 8- StreamSupportZauberei wieder in einen Stream konvertiert .

/**
 * An iterator which returns batches of items taken from another iterator
 */
public class BatchingIterator<T> implements Iterator<List<T>> {
    /**
     * Given a stream, convert it to a stream of batches no greater than the
     * batchSize.
     * @param originalStream to convert
     * @param batchSize maximum size of a batch
     * @param <T> type of items in the stream
     * @return a stream of batches taken sequentially from the original stream
     */
    public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
        return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
    }

    private static <T> Stream<T> asStream(Iterator<T> iterator) {
        return StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(iterator,ORDERED),
            false);
    }

    private int batchSize;
    private List<T> currentBatch;
    private Iterator<T> sourceIterator;

    public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
        this.batchSize = batchSize;
        this.sourceIterator = sourceIterator;
    }

    @Override
    public boolean hasNext() {
        prepareNextBatch();
        return currentBatch!=null && !currentBatch.isEmpty();
    }

    @Override
    public List<T> next() {
        return currentBatch;
    }

    private void prepareNextBatch() {
        currentBatch = new ArrayList<>(batchSize);
        while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
            currentBatch.add(sourceIterator.next());
        }
    }
}

Ein einfaches Beispiel für die Verwendung würde folgendermaßen aussehen:

@Test
public void getsBatches() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        .forEach(System.out::println);
}

Die obigen Drucke

[A, B, C]
[D, E, F]

Für unseren Anwendungsfall wollten wir die Stapel mischen und dann als Stream behalten - es sah so aus:

@Test
public void howScramblingCouldBeDone() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
        .map(list -> {
            Collections.shuffle(list); return list; })
        .flatMap(List::stream)
        .forEach(System.out::println);
}

Dies gibt so etwas wie aus (es ist zufällig, also jedes Mal anders)

A
C
B
E
D
F

Die geheime Sauce hier ist, dass es immer einen Stream gibt, sodass Sie entweder einen Stream von Chargen bearbeiten oder mit jeder Charge etwas tun und dann flatMapzurück zu einem Stream. Noch besser ist , alle der oben genannten nur läuft , wenn die letzte forEachoder collectoder andere Abschluss Ausdrücke PULL die Daten über den Strom.

Es stellt sich heraus, dass dies iteratoreine spezielle Art der Beendigung eines Streams ist und nicht dazu führt, dass der gesamte Stream ausgeführt wird und in den Speicher gelangt! Vielen Dank an die Java 8 Jungs für ein brillantes Design!

Ashley Frieze
quelle
Und es ist sehr gut, dass Sie jede Charge vollständig durchlaufen, wenn sie gesammelt wird, und auf einer bleiben List- Sie können die Iteration der Elemente innerhalb der Charge nicht verschieben, da der Verbraucher möglicherweise eine ganze Charge überspringen möchte und wenn Sie die nicht verbraucht haben Elemente dann würden sie nicht sehr weit überspringen. (Ich habe eine davon in C # implementiert, obwohl es wesentlich einfacher war.)
ErikE
9

Sie können auch RxJava verwenden :

Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));

oder

Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();

oder

Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();
frhack
quelle
8

Sie können sich auch Cyclops-React ansehen , ich bin der Autor dieser Bibliothek. Es implementiert die jOOλ-Schnittstelle (und damit auch JDK 8-Streams), konzentriert sich jedoch im Gegensatz zu parallelen JDK 8-Streams auf asynchrone Vorgänge (z. B. das potenzielle Blockieren von Async-E / A-Aufrufen). JDK Parallel Streams hingegen konzentrieren sich auf Datenparallelität für CPU-gebundene Operationen. Es verwaltet Aggregate zukünftiger Aufgaben unter der Haube, bietet Endbenutzern jedoch eine standardmäßige erweiterte Stream-API.

Dieser Beispielcode kann Ihnen den Einstieg erleichtern

LazyFutureStream.parallelCommonBuilder()
                .react(data)
                .grouped(BATCH_SIZE)                  
                .map(this::process)
                .run();

Hier finden Sie ein Tutorial zum Stapeln

Und ein allgemeineres Tutorial hier

Um Ihren eigenen Thread-Pool zu verwenden (der wahrscheinlich besser zum Blockieren von E / A geeignet ist), können Sie mit der Verarbeitung beginnen

     LazyReact reactor = new LazyReact(40);

     reactor.react(data)
            .grouped(BATCH_SIZE)                  
            .map(this::process)
            .run();
John McClean
quelle
3

Reines Java 8-Beispiel, das auch mit parallelen Streams funktioniert.

Wie benutzt man:

Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed();
CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));

Die Methodendeklaration und -implementierung:

public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor)
{
    List<ElementType> newBatch = new ArrayList<>(batchSize);

    stream.forEach(element -> {
        List<ElementType> fullBatch;

        synchronized (newBatch)
        {
            if (newBatch.size() < batchSize)
            {
                newBatch.add(element);
                return;
            }
            else
            {
                fullBatch = new ArrayList<>(newBatch);
                newBatch.clear();
                newBatch.add(element);
            }
        }

        batchProcessor.accept(fullBatch);
    });

    if (newBatch.size() > 0)
        batchProcessor.accept(new ArrayList<>(newBatch));
}
Nicolas Lacombe
quelle
2

Schauen Sie sich fairerweise die elegante Vavr- Lösung an:

Stream.ofAll(data).grouped(BATCH_SIZE).forEach(this::process);
Nolequen
quelle
1

Einfaches Beispiel mit Spliterator

    // read file into stream, try-with-resources
    try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
        //skip header
        Spliterator<String> split = stream.skip(1).spliterator();
        Chunker<String> chunker = new Chunker<String>();
        while(true) {              
            boolean more = split.tryAdvance(chunker::doSomething);
            if (!more) {
                break;
            }
        }           
    } catch (IOException e) {
        e.printStackTrace();
    }
}

static class Chunker<T> {
    int ct = 0;
    public void doSomething(T line) {
        System.out.println(ct++ + " " + line.toString());
        if (ct % 100 == 0) {
            System.out.println("====================chunk=====================");               
        }           
    }       
}

Die Antwort von Bruce ist umfassender, aber ich suchte nach etwas Schnellem und Schmutzigem, um eine Reihe von Dateien zu verarbeiten.

Strass
quelle
1

Dies ist eine reine Java-Lösung, die träge bewertet wird.

public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
    List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable 
    currentBatch.add(new ArrayList<T>(batchSize));
    return Stream.concat(stream
      .sequential()                   
      .map(new Function<T, List<T>>(){
          public List<T> apply(T t){
              currentBatch.get(0).add(t);
              return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
            }
      }), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
                .limit(1)
    ).filter(Objects::nonNull);
}
Hei
quelle
1

Sie können apache.commons verwenden:

ListUtils.partition(ListOfLines, 500).stream()
                .map(partition -> processBatch(partition)
                .collect(Collectors.toList());

Der Partitionierungsteil erfolgt nicht träge, aber nachdem die Liste partitioniert wurde, erhalten Sie die Vorteile der Arbeit mit Streams (z. B. parallele Streams verwenden, Filter hinzufügen usw.). Andere Antworten schlugen ausgefeiltere Lösungen vor, aber manchmal sind Lesbarkeit und Wartbarkeit wichtiger (und manchmal nicht :-))

Tal Joffe
quelle
Ich bin mir nicht sicher, wer herabgestimmt hat, wäre aber nett zu verstehen, warum. Ich gab eine Antwort, die die anderen Antworten für Leute ergänzte, die Guava
Tal Joffe
Sie verarbeiten hier eine Liste, keinen Stream.
Drakemor
@Drakemor Ich verarbeite einen Strom von Unterlisten. Beachten Sie den Funktionsaufruf stream ()
Tal Joffe
Aber zuerst verwandeln Sie es in eine Liste von Unterlisten, die für echte gestreamte Daten nicht richtig funktionieren . Hier ist der Verweis auf Partition: commons.apache.org/proper/commons-collections/apidocs/org/…
Drakemor
1
TBH Ich verstehe Ihr Argument nicht vollständig, aber ich denke, wir können uns darauf einigen, nicht zuzustimmen. Ich habe meine Antwort bearbeitet, um unser Gespräch hier wiederzugeben. Vielen Dank für die Diskussion
Tal Joffe
1

Es könnte leicht mit Reaktor gemacht werden :

Flux.fromStream(fileReader.lines().onClose(() -> safeClose(fileReader)))
            .map(line -> someProcessingOfSingleLine(line))
            .buffer(BUFFER_SIZE)
            .subscribe(apiService::makeHttpRequest);
Alex
quelle
0

Mit Java 8und com.google.common.collect.Listskönnen Sie so etwas tun wie:

public class BatchProcessingUtil {
    public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) {
        List<List<T>> batches = Lists.partition(data, batchSize);
        return batches.stream()
                .map(processFunction) // Send each batch to the process function
                .flatMap(Collection::stream) // flat results to gather them in 1 stream
                .collect(Collectors.toList());
    }
}

Hier Tist der Typ der Elemente in der Eingabeliste und Uder Typ der Elemente in der Ausgabeliste

Und Sie können es so verwenden:

List<String> userKeys = [... list of user keys]
List<Users> users = BatchProcessingUtil.process(
    userKeys,
    10, // Batch Size
    partialKeys -> service.getUsers(partialKeys)
);
Josebui
quelle