Java 8 Streams - Sammeln gegen Reduzieren

143

Wann würden Sie collect()vs verwenden reduce()? Hat jemand gute, konkrete Beispiele dafür, wann es definitiv besser ist, in die eine oder andere Richtung zu gehen?

Javadoc erwähnt, dass collect () eine veränderliche Reduktion ist .

Da es sich um eine veränderbare Reduzierung handelt, gehe ich davon aus, dass eine Synchronisierung (intern) erforderlich ist, was sich wiederum nachteilig auf die Leistung auswirken kann. Vermutlich reduce()ist es leichter parallelisierbar, wenn nach jedem Schritt der Reduzierung eine neue Datenstruktur für die Rückgabe erstellt werden muss.

Die obigen Aussagen sind jedoch Vermutungen und ich würde gerne einen Experten hier einschalten.

jimhooker2002
quelle
1
Der Rest der Seite, auf die Sie verlinkt haben, erklärt dies: Wie bei redu () besteht ein Vorteil des Ausdrucks von collect auf diese abstrakte Weise darin, dass es direkt für die Parallelisierung zugänglich ist: Wir können Teilergebnisse parallel akkumulieren und sie dann kombinieren, solange Die Akkumulations- und Kombinationsfunktionen erfüllen die entsprechenden Anforderungen.
JB Nizet
1
Siehe auch "Streams in Java 8: Reduzieren vs. Sammeln" von Angelika Langer - youtube.com/watch?v=oWlWEKNM5Aw
MasterJoe2

Antworten:

115

reduceist eine " Fold " -Operation und wendet einen binären Operator auf jedes Element im Stream an, wobei das erste Argument für den Operator der Rückgabewert der vorherigen Anwendung und das zweite Argument das aktuelle Stream-Element ist.

collectist eine Aggregationsoperation, bei der eine "Sammlung" erstellt und jedes Element zu dieser Sammlung "hinzugefügt" wird. Sammlungen in verschiedenen Teilen des Streams werden dann zusammenaddiert.

Das von Ihnen verknüpfte Dokument gibt den Grund für zwei unterschiedliche Ansätze an:

Wenn wir einen Strom von Zeichenfolgen nehmen und sie zu einer einzigen langen Zeichenfolge verketten möchten, können wir dies mit einer normalen Reduzierung erreichen:

 String concatenated = strings.reduce("", String::concat)  

Wir würden das gewünschte Ergebnis erzielen und es würde sogar parallel funktionieren. Wir könnten uns jedoch nicht über die Leistung freuen! Eine solche Implementierung würde viel Zeichenfolgen kopieren, und die Laufzeit wäre O (n ^ 2) in der Anzahl der Zeichen. Ein performanterer Ansatz wäre, die Ergebnisse in einem StringBuilder zu akkumulieren, einem veränderlichen Container zum Akkumulieren von Strings. Wir können dieselbe Technik verwenden, um die veränderliche Reduktion zu parallelisieren, wie wir es bei der normalen Reduktion tun.

Der Punkt ist also, dass die Parallelisierung in beiden Fällen gleich ist, aber in dem reduceFall wenden wir die Funktion auf die Stream-Elemente selbst an. In diesem collectFall wenden wir die Funktion auf einen veränderlichen Container an.

Boris die Spinne
quelle
1
Wenn dies beim Sammeln der Fall ist: "Ein performanterer Ansatz wäre es, die Ergebnisse in einem StringBuilder zu akkumulieren", warum sollten wir dann jemals reduzieren?
Jimhooker2002
2
@ Jimhooker2002 nochmal lesen. Wenn Sie beispielsweise das Produkt berechnen, kann die Reduktionsfunktion einfach parallel auf die aufgeteilten Ströme angewendet und am Ende miteinander kombiniert werden. Der Prozess des Reduzierens führt immer zu dem Typ als Stream. Das Sammeln wird verwendet, wenn Sie die Ergebnisse in einem veränderlichen Container sammeln möchten, dh wenn das Ergebnis einen anderen Typ als der Stream hat. Dies hat den Vorteil, dass eine einzelne Instanz des Containers für jeden geteilten Stream verwendet werden kann, aber den Nachteil, dass die Container am Ende kombiniert werden müssen.
Boris die Spinne
1
@ jimhooker2002 im Produktbeispiel intist unveränderlich, sodass Sie einen Erfassungsvorgang nicht ohne weiteres verwenden können. Sie könnten einen schmutzigen Hack wie einen AtomicIntegeroder einen Brauch machen, IntWrapperaber warum sollten Sie? Eine Falzoperation unterscheidet sich einfach von einer Sammeloperation.
Boris die Spinne
17
Es gibt auch eine andere reduceMethode, mit der Sie Objekte vom Typ zurückgeben können, die sich von Elementen des Streams unterscheiden.
Damluar
1
Ein weiterer Fall, in dem Sie "Sammeln" anstelle von "Reduzieren" verwenden würden, besteht darin, dass bei einer Reduktionsoperation Elemente zu einer Sammlung hinzugefügt werden. Jedes Mal, wenn Ihre Akkumulatorfunktion ein Element verarbeitet, wird eine neue Sammlung erstellt, die das Element enthält, was ineffizient ist.
Raghu
40

Der Grund ist einfach:

  • collect() kann nur funktionieren , mit wandelbaren Ergebnisobjekten.
  • reduce()wurde entwickelt, um mit unveränderlichen Ergebnisobjekten zu arbeiten .

" reduce()mit unveränderlichem" Beispiel

public class Employee {
  private Integer salary;
  public Employee(String aSalary){
    this.salary = new Integer(aSalary);
  }
  public Integer getSalary(){
    return this.salary;
  }
}

@Test
public void testReduceWithImmutable(){
  List<Employee> list = new LinkedList<>();
  list.add(new Employee("1"));
  list.add(new Employee("2"));
  list.add(new Employee("3"));

  Integer sum = list
  .stream()
  .map(Employee::getSalary)
  .reduce(0, (Integer a, Integer b) -> Integer.sum(a, b));

  assertEquals(Integer.valueOf(6), sum);
}

Beispiel " collect()mit veränderlichem"

Beispiel : Wenn Sie möchten , dass manuell eine Summe berechnen unter Verwendung von collect()mit er kann nicht arbeiten , BigDecimalsondern nur mit MutableIntvon org.apache.commons.lang.mutablezum Beispiel. Sehen:

public class Employee {
  private MutableInt salary;
  public Employee(String aSalary){
    this.salary = new MutableInt(aSalary);
  }
  public MutableInt getSalary(){
    return this.salary;
  }
}

@Test
public void testCollectWithMutable(){
  List<Employee> list = new LinkedList<>();
  list.add(new Employee("1"));
  list.add(new Employee("2"));

  MutableInt sum = list.stream().collect(
    MutableInt::new, 
    (MutableInt container, Employee employee) -> 
      container.add(employee.getSalary().intValue())
    , 
    MutableInt::add);
  assertEquals(new MutableInt(3), sum);
}

Dies funktioniert, weil der Akkumulator container.add(employee.getSalary().intValue()); kein neues Objekt mit dem Ergebnis zurückgeben soll, sondern den Status des veränderlichen containerTyps ändern soll MutableInt.

Wenn Sie BigDecimalstattdessen für die containerverwenden möchten, können Sie die collect()Methode nicht verwenden , da container.add(employee.getSalary());dies die nicht ändern würde, containerda BigDecimalsie unveränderlich ist. (Abgesehen davon BigDecimal::newwürde nicht funktionieren, da BigDecimalkein leerer Konstruktor vorhanden ist)

Sandro
quelle
2
Beachten Sie, dass Sie einen IntegerKonstruktor ( new Integer(6)) verwenden, der in späteren Java-Versionen veraltet ist.
MC Emperor
1
Guter Fang @MCEmperor! Ich habe es geändert inInteger.valueOf(6)
Sandro
@ Sandro - ich bin verwirrt. Warum funktioniert collect () nur mit veränderlichen Objekten? Ich habe es verwendet, um Zeichenfolgen zu verketten. Zeichenfolge allNames = employee.stream () .map (Employee :: getNameString) .collect (Collectors.joining (",")) .toString ();
MasterJoe2
1
@ MasterJoe2 Es ist einfach. Kurz gesagt - die Implementierung verwendet immer noch das, StringBuilderwas veränderlich ist. Siehe: hg.openjdk.java.net/jdk8/jdk8/jdk/file/687fd7c7986d/src/share/…
Sandro
30

Die normale Reduzierung soll zwei unveränderliche Werte wie int, double usw. kombinieren und einen neuen erzeugen. Es ist eine unveränderliche Reduzierung. Im Gegensatz dazu dient die Sammelmethode dazu, einen Container zu mutieren , um das Ergebnis zu akkumulieren, das er erzeugen soll.

Nehmen wir an, Sie möchten zur Veranschaulichung des Problems Collectors.toList()eine einfache Reduzierung wie erreichen

List<Integer> numbers = stream.reduce(
        new ArrayList<Integer>(),
        (List<Integer> l, Integer e) -> {
            l.add(e);
            return l;
        },
        (List<Integer> l1, List<Integer> l2) -> {
            l1.addAll(l2);
            return l1;
        });

Dies entspricht Collectors.toList(). In diesem Fall mutieren Sie jedoch die List<Integer>. Wie wir wissen, ArrayListist das weder threadsicher noch sicher, während der Iteration Werte hinzuzufügen / daraus zu entfernen, sodass Sie entweder eine gleichzeitige Ausnahme ArrayIndexOutOfBoundsExceptionoder eine andere Ausnahme (insbesondere bei paralleler Ausführung) erhalten, wenn Sie die Liste oder den Kombinierer aktualisieren versucht, die Listen zusammenzuführen, da Sie die Liste mutieren, indem Sie die Ganzzahlen akkumulieren (hinzufügen). Wenn Sie diesen Thread sicher machen möchten, müssen Sie jedes Mal eine neue Liste übergeben, die die Leistung beeinträchtigen würde.

Im Gegensatz dazu Collectors.toList()funktioniert das in ähnlicher Weise. Es garantiert jedoch die Thread-Sicherheit, wenn Sie die Werte in der Liste akkumulieren. Aus der Dokumentation zur collectMethode :

Führt mit einem Collector eine veränderbare Reduktionsoperation für die Elemente dieses Streams durch. Wenn der Stream parallel ist und der Collector gleichzeitig ist und entweder der Stream ungeordnet oder der Collector ungeordnet ist, wird eine gleichzeitige Reduzierung durchgeführt. Bei paralleler Ausführung können mehrere Zwischenergebnisse instanziiert, aufgefüllt und zusammengeführt werden, um die Isolation veränderlicher Datenstrukturen aufrechtzuerhalten. Daher ist selbst bei paralleler Ausführung mit nicht threadsicheren Datenstrukturen (wie ArrayList) keine zusätzliche Synchronisation für eine parallele Reduzierung erforderlich.

Um Ihre Frage zu beantworten:

Wann würden Sie collect()vs verwenden reduce()?

Wenn Sie unveränderliche Werte wie ints, haben doubles, Stringsfunktioniert die normale Reduzierung einwandfrei. Wenn Sie jedoch reduceIhre Werte in eine List(veränderbare Datenstruktur) umwandeln müssen, müssen Sie mit der collectMethode eine veränderbare Reduktion verwenden.

George
quelle
Im Code-Snippet besteht meines Erachtens das Problem darin, dass die Identität (in diesem Fall eine einzelne Instanz einer ArrayList) angenommen wird und "unveränderlich" ist, sodass sie xThreads starten können , die jeweils "zur Identität hinzufügen" und dann miteinander kombiniert werden. Gutes Beispiel.
Rogerdpack
Warum wir eine Ausnahme für gleichzeitige Änderungen erhalten, wird beim Aufrufen von Streams nur der serielle Stream erneut ausgeführt, und was bedeutet, dass er von einem einzelnen Thread verarbeitet wird und die Combiner-Funktion überhaupt nicht aufgerufen wird?
Amarnath Harish
public static void main(String[] args) { List<Integer> l = new ArrayList<>(); l.add(1); l.add(10); l.add(3); l.add(-3); l.add(-4); List<Integer> numbers = l.stream().reduce( new ArrayList<Integer>(), (List<Integer> l2, Integer e) -> { l2.add(e); return l2; }, (List<Integer> l1, List<Integer> l2) -> { l1.addAll(l2); return l1; });for(Integer i:numbers)System.out.println(i); } }Ich habe versucht und keine CCm-Ausnahme bekommen
Amarnath Harish
@amarnathharish das Problem tritt auf, wenn Sie versuchen, es parallel auszuführen und mehrere Threads versuchen, auf dieselbe Liste zuzugreifen
George
11

Der Strom sei a <- b <- c <- d

In Reduktion,

Sie haben ((a # b) # c) # d

wo # ist diese interessante Operation, die Sie gerne machen würden.

In der Sammlung,

Ihr Sammler wird eine Art Sammelstruktur K haben.

K verbraucht a. K verbraucht dann b. K verbraucht dann c. K verbraucht dann d.

Am Ende fragen Sie K, was das Endergebnis ist.

K gibt es dir dann.

Yan Ng
quelle
2

Sie unterscheiden sich stark im potenziellen Speicherbedarf zur Laufzeit. Während Sie alle Daten collect()sammeln und in die Sammlung aufnehmen, werden Sie ausdrücklich aufgefordert, anzugeben, wie die Daten reduziert werden sollen, die den Stream durchlaufen haben.reduce()

Wenn Sie beispielsweise einige Daten aus einer Datei lesen, verarbeiten und in eine Datenbank einfügen möchten, erhalten Sie möglicherweise einen ähnlichen Java-Stream-Code:

streamDataFromFile(file)
            .map(data -> processData(data))
            .map(result -> database.save(result))
            .collect(Collectors.toList());

In diesem Fall wird collect()Java verwendet, um Daten zu streamen und das Ergebnis in der Datenbank zu speichern. Ohne collect()die Daten wird nie gelesen und nie gespeichert.

Dieser Code generiert gerne einen java.lang.OutOfMemoryError: Java heap spaceLaufzeitfehler, wenn die Dateigröße groß genug oder die Heap-Größe niedrig genug ist. Der offensichtliche Grund ist, dass versucht wird, alle Daten, die es durch den Stream geschafft haben (und tatsächlich bereits in der Datenbank gespeichert wurden), in die resultierende Sammlung zu stapeln, was den Heap in die Luft sprengt.

Wenn Sie jedoch ersetzen collect()mit reduce()- es wird kein Problem mehr sein , da letzteres reduziert und all verwirft die Daten , die sie durch gemacht.

Im vorgestellten Beispiel ersetzen Sie einfach collect()etwas durch reduce:

.reduce(0L, (aLong, result) -> aLong, (aLong1, aLong2) -> aLong1);

Sie müssen sich nicht einmal darum kümmern, dass die Berechnung von der abhängig ist, resultda Java keine reine FP-Sprache (Functional Programming) ist und die Daten, die am Ende des Streams nicht verwendet werden, aufgrund möglicher Nebenwirkungen nicht optimieren kann .

Averasko
quelle
3
Wenn Sie sich nicht um die Ergebnisse Ihrer Datenbankspeicherung kümmern, sollten Sie forEach verwenden ... Sie müssen reduct nicht verwenden. Es sei denn, dies diente nur zur Veranschaulichung.
DaveEdelstein
1

Hier ist das Codebeispiel

List<Integer> list = Arrays.asList(1,2,3,4,5,6,7);
int sum = list.stream().reduce((x,y) -> {
        System.out.println(String.format("x=%d,y=%d",x,y));
        return (x + y);
    }).get();

System.out.println (Summe);

Hier ist das Ausführungsergebnis:

x=1,y=2
x=3,y=3
x=6,y=4
x=10,y=5
x=15,y=6
x=21,y=7
28

Die Reduktionsfunktion behandelt zwei Parameter, der erste Parameter ist der vorherige Rückgabewert im Stream, der zweite Parameter ist der aktuelle Berechnungswert im Stream, er summiert den ersten Wert und den aktuellen Wert als ersten Wert in der nächsten Berechnung.

JetQin
quelle
0

Laut den Dokumenten

Die reduzierenden () Kollektoren sind am nützlichsten, wenn sie in einer mehrstufigen Reduktion nach groupingBy oder partitioningBy verwendet werden. Verwenden Sie stattdessen Stream.reduce (BinaryOperator), um eine einfache Reduzierung eines Streams durchzuführen.

Grundsätzlich würden Sie es also reducing()nur verwenden, wenn Sie innerhalb eines Sammels gezwungen werden. Hier ist ein weiteres Beispiel :

 For example, given a stream of Person, to calculate the longest last name 
 of residents in each city:

    Comparator<String> byLength = Comparator.comparing(String::length);
    Map<String, String> longestLastNameByCity
        = personList.stream().collect(groupingBy(Person::getCity,
            reducing("", Person::getLastName, BinaryOperator.maxBy(byLength))));

Nach diesem Tutorial ist Reduzieren manchmal weniger effizient

Die Reduktionsoperation gibt immer einen neuen Wert zurück. Die Akkumulatorfunktion gibt jedoch auch jedes Mal einen neuen Wert zurück, wenn sie ein Element eines Streams verarbeitet. Angenommen, Sie möchten die Elemente eines Streams auf ein komplexeres Objekt wie eine Sammlung reduzieren. Dies kann die Leistung Ihrer Anwendung beeinträchtigen. Wenn Ihre Reduktionsoperation das Hinzufügen von Elementen zu einer Sammlung umfasst, erstellt jedes Mal, wenn Ihre Akkumulatorfunktion ein Element verarbeitet, eine neue Sammlung, die das ineffiziente Element enthält. Es wäre effizienter für Sie, stattdessen eine vorhandene Sammlung zu aktualisieren. Sie können dies mit der Stream.collect-Methode tun, die im nächsten Abschnitt beschrieben wird ...

Daher wird die Identität in einem reduzierten Szenario "wiederverwendet", sodass sie nach .reduceMöglichkeit etwas effizienter ist .

Rogerdpack
quelle