Zippen von Streams mit JDK8 mit Lambda (java.util.stream.Streams.zip)

149

In JDK 8 mit Lambda b93 gab es in b93 eine Klasse java.util.stream.Streams.zip, mit der Streams komprimiert werden konnten (dies wird im Tutorial Exploring Java8 Lambdas. Teil 1 von Dhananjay Nene veranschaulicht ). Diese Funktion:

Erstellt einen faulen und sequentiellen kombinierten Stream, dessen Elemente das Ergebnis der Kombination der Elemente zweier Streams sind.

In b98 ist dies jedoch verschwunden. Tatsächlich ist die StreamsKlasse in bava in java.util.stream nicht einmal zugänglich .

Wurde diese Funktionalität verschoben und wenn ja, wie kann ich Streams mit b98 präzise komprimieren?

Die Anwendung, an die ich denke, befindet sich in dieser Java-Implementierung von Shen , in der ich die Zip-Funktionalität in der ersetzt habe

  • static <T> boolean every(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)
  • static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)

Funktionen mit ziemlich ausführlichem Code (der keine Funktionalität von b98 verwendet).

Artella
quelle
3
Ich habe gerade herausgefunden, dass es anscheinend vollständig entfernt wurde: mail.openjdk.java.net/pipermail/lambda-libs-spec-observers/…
artella
"Exploring Java8 Lambdas. Part 1" - neuer Link für diesen Artikel ist blog.dhananjaynene.com/2013/02/exploring-java8-lambdas-part-1
Aleksei Egorov

Antworten:

77

Ich brauchte das auch, also habe ich einfach den Quellcode von b93 genommen und ihn in eine "util" -Klasse eingeordnet. Ich musste es leicht ändern, um mit der aktuellen API zu arbeiten.

Als Referenz hier ist der Arbeitscode (auf eigenes Risiko ...):

public static<A, B, C> Stream<C> zip(Stream<? extends A> a,
                                     Stream<? extends B> b,
                                     BiFunction<? super A, ? super B, ? extends C> zipper) {
    Objects.requireNonNull(zipper);
    Spliterator<? extends A> aSpliterator = Objects.requireNonNull(a).spliterator();
    Spliterator<? extends B> bSpliterator = Objects.requireNonNull(b).spliterator();

    // Zipping looses DISTINCT and SORTED characteristics
    int characteristics = aSpliterator.characteristics() & bSpliterator.characteristics() &
            ~(Spliterator.DISTINCT | Spliterator.SORTED);

    long zipSize = ((characteristics & Spliterator.SIZED) != 0)
            ? Math.min(aSpliterator.getExactSizeIfKnown(), bSpliterator.getExactSizeIfKnown())
            : -1;

    Iterator<A> aIterator = Spliterators.iterator(aSpliterator);
    Iterator<B> bIterator = Spliterators.iterator(bSpliterator);
    Iterator<C> cIterator = new Iterator<C>() {
        @Override
        public boolean hasNext() {
            return aIterator.hasNext() && bIterator.hasNext();
        }

        @Override
        public C next() {
            return zipper.apply(aIterator.next(), bIterator.next());
        }
    };

    Spliterator<C> split = Spliterators.spliterator(cIterator, zipSize, characteristics);
    return (a.isParallel() || b.isParallel())
           ? StreamSupport.stream(split, true)
           : StreamSupport.stream(split, false);
}
Siki
quelle
1
Sollte der resultierende Stream nicht sein, SIZEDwenn einer der Streams ist SIZED, nicht beide?
Didier L
5
Das glaube ich nicht. Beide Streams müssen vorhanden sein, damit SIZEDdiese Implementierung funktioniert. Es hängt tatsächlich davon ab, wie Sie das Zippen definieren. Sollten Sie beispielsweise zwei Streams unterschiedlicher Größe komprimieren können? Wie würde der resultierende Stream dann aussehen? Ich glaube, aus diesem Grund wurde diese Funktion in der API tatsächlich weggelassen. Es gibt viele Möglichkeiten, dies zu tun, und der Benutzer kann entscheiden, welches Verhalten das "richtige" sein soll. Würden Sie die Elemente aus dem längeren Stream verwerfen oder die kürzere Liste auffüllen? Wenn ja, mit welchen Werten?
Siki
Sofern mir nichts fehlt, ist keine Besetzung erforderlich (z Spliterator<A>. B. zu ).
Jub0bs
Gibt es eine Website, auf der der Java 8 b93-Quellcode gehostet wird? Ich habe Probleme, es zu finden.
Starwarswii
42

zip ist eine der Funktionen der Protonpack-Bibliothek .

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");

List<String> zipped = StreamUtils.zip(streamA,
                                      streamB,
                                      (a, b) -> a + " is for " + b)
                                 .collect(Collectors.toList());

assertThat(zipped,
           contains("A is for Apple", "B is for Banana", "C is for Carrot"));
Dominic Fox
quelle
1
auch in StreamEx gefunden: amaembo.github.io/streamex/javadoc/one/util/streamex/…
tokland
33

Wenn Sie Guava in Ihrem Projekt haben, können Sie die Streams.zip- Methode verwenden (wurde in Guava 21 hinzugefügt):

Gibt einen Stream zurück, in dem jedes Element das Ergebnis der Übergabe des entsprechenden Elements von StreamA und StreamB an die Funktion ist. Der resultierende Stream ist nur so lang wie der kürzere der beiden Eingabeströme. Wenn ein Stream länger ist, werden seine zusätzlichen Elemente ignoriert. Der resultierende Strom ist nicht effizient spaltbar. Dies kann die parallele Leistung beeinträchtigen.

 public class Streams {
     ...

     public static <A, B, R> Stream<R> zip(Stream<A> streamA,
             Stream<B> streamB, BiFunction<? super A, ? super B, R> function) {
         ...
     }
 }
ZhekaKozlov
quelle
26

Zippen von zwei Streams mit JDK8 mit Lambda ( Kern ).

public static <A, B, C> Stream<C> zip(Stream<A> streamA, Stream<B> streamB, BiFunction<A, B, C> zipper) {
    final Iterator<A> iteratorA = streamA.iterator();
    final Iterator<B> iteratorB = streamB.iterator();
    final Iterator<C> iteratorC = new Iterator<C>() {
        @Override
        public boolean hasNext() {
            return iteratorA.hasNext() && iteratorB.hasNext();
        }

        @Override
        public C next() {
            return zipper.apply(iteratorA.next(), iteratorB.next());
        }
    };
    final boolean parallel = streamA.isParallel() || streamB.isParallel();
    return iteratorToFiniteStream(iteratorC, parallel);
}

public static <T> Stream<T> iteratorToFiniteStream(Iterator<T> iterator, boolean parallel) {
    final Iterable<T> iterable = () -> iterator;
    return StreamSupport.stream(iterable.spliterator(), parallel);
}
Karol Król
quelle
2
Schöne Lösung und (relativ) kompakt! Erfordert, dass Sie import java.util.function.*;und import java.util.stream.*;oben in Ihrer Datei setzen.
sffc
Beachten Sie, dass dies eine Terminaloperation im Stream ist. Dies bedeutet, dass für unendliche Streams diese Methode zusammenbricht
smac89
2
So viele nutzlose Wrapper: Hier () -> iteratorund hier wieder : iterable.spliterator(). Warum nicht direkt ein Spliteratoranstatt ein implementieren Iterator? Überprüfen Sie @
Miguel Gamboa
20

Da ich mir keine Verwendung des Zippens für andere als indizierte Sammlungen (Listen) vorstellen kann und ein großer Fan von Einfachheit bin, wäre dies meine Lösung:

<A,B,C>  Stream<C> zipped(List<A> lista, List<B> listb, BiFunction<A,B,C> zipper){
     int shortestLength = Math.min(lista.size(),listb.size());
     return IntStream.range(0,shortestLength).mapToObj( i -> {
          return zipper.apply(lista.get(i), listb.get(i));
     });        
}
Rafael
quelle
1
Ich denke mapToObjectsollte sein mapToObj.
Seanf
Wenn die Liste nicht ist RandomAccess(z. B. auf verknüpften Listen), ist dies sehr langsam
avmohan
Bestimmt. Die meisten Java-Entwickler sind sich jedoch bewusst, dass LinkedList eine schlechte Leistung für Indexzugriffsvorgänge aufweist.
Rafael
11

Die Methoden der von Ihnen genannten Klasse wurden Streamzugunsten der Standardmethoden in die Schnittstelle selbst verschoben . Aber es scheint, dass die zipMethode entfernt wurde. Möglicherweise, weil nicht klar ist, wie das Standardverhalten für Streams unterschiedlicher Größe aussehen soll. Die Implementierung des gewünschten Verhaltens ist jedoch unkompliziert:

static <T> boolean every(
  Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
    Iterator<T> it=c2.iterator();
    return c1.stream().allMatch(x->!it.hasNext()||pred.test(x, it.next()));
}
static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
    Iterator<T> it=c2.iterator();
    return c1.stream().filter(x->it.hasNext()&&pred.test(x, it.next()))
      .findFirst().orElse(null);
}
Holger
quelle
Ist das, was predicateSie an den Filter übergeben haben, nicht zustandsbehaftet ? Dies verstößt gegen den Methodenvertrag und funktioniert insbesondere bei paralleler Verarbeitung des Streams nicht.
Andreas
2
@Andreas: Keine der Lösungen unterstützt die Parallelverarbeitung. Da meine Methoden keinen Stream zurückgeben, stellen sie sicher, dass die Streams nicht parallel ausgeführt werden. In ähnlicher Weise gibt der Code der akzeptierten Antwort einen Stream zurück, der parallel geschaltet werden kann, aber tatsächlich nichts parallel macht. Staatliche Prädikate werden jedoch entmutigt, verstoßen jedoch nicht gegen den Vertrag. Sie können sogar im parallelen Kontext verwendet werden, wenn Sie sicherstellen, dass die Statusaktualisierung threadsicher ist. In einigen Situationen sind sie unvermeidbar, z. B. ist das Verwandeln eines Streams in ein eindeutiges Prädikat per se ein Statefull-Prädikat .
Holger
2
@ Andrew: Sie können erraten, warum diese Operationen aus der Java-API entfernt wurden ...
Holger
8

Ich schlage diese Implementierung demütig vor. Der resultierende Stream wird auf den kürzeren der beiden Eingabestreams abgeschnitten.

public static <L, R, T> Stream<T> zip(Stream<L> leftStream, Stream<R> rightStream, BiFunction<L, R, T> combiner) {
    Spliterator<L> lefts = leftStream.spliterator();
    Spliterator<R> rights = rightStream.spliterator();
    return StreamSupport.stream(new AbstractSpliterator<T>(Long.min(lefts.estimateSize(), rights.estimateSize()), lefts.characteristics() & rights.characteristics()) {
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            return lefts.tryAdvance(left->rights.tryAdvance(right->action.accept(combiner.apply(left, right))));
        }
    }, leftStream.isParallel() || rightStream.isParallel());
}
Doradus
quelle
Ich mag Ihren Vorschlag. Aber ich stimme dem nicht ganz zu .., leftStream.isParallel() || rightStream.isParallel(). Ich denke, es hat keine Auswirkung, da es AbstractSpliteratorstandardmäßig eine begrenzte Parallelität bietet. Ich denke also, dass das Endergebnis das gleiche sein wird wie das Bestehen false.
Miguel Gamboa
@ MiguelGamboa - danke für deinen Kommentar. Ich bin mir nicht sicher, was Sie unter "standardmäßig eingeschränkte Parallelität" verstehen. Haben Sie einen Link zu einigen Dokumenten?
Doradus
6

Die Lazy-Seq-Bibliothek bietet Zip-Funktionen.

https://github.com/nurkiewicz/LazySeq

Diese Bibliothek ist stark inspiriert scala.collection.immutable.Streamund zielt darauf ab, eine unveränderliche, threadsichere und einfach zu verwendende Implementierung von verzögerten Sequenzen bereitzustellen, möglicherweise unendlich.

Nick Siderakis
quelle
5

Mit der neuesten Guava-Bibliothek (für die StreamsKlasse) sollten Sie dazu in der Lage sein

final Map<String, String> result = 
    Streams.zip(
        collection1.stream(), 
        collection2.stream(), 
        AbstractMap.SimpleEntry::new)
    .collect(Collectors.toMap(e -> e.getKey(), e  -> e.getValue()));
Dan Borza
quelle
2

Würde das für dich funktionieren? Es ist eine kurze Funktion, die träge über die zu komprimierenden Streams auswertet, sodass Sie sie mit unendlichen Streams versorgen können (es muss nicht die Größe der zu komprimierenden Streams annehmen).

Wenn die Streams endlich sind, stoppt sie, sobald einem der Streams die Elemente ausgehen.

import java.util.Objects;
import java.util.function.BiFunction;
import java.util.stream.Stream;

class StreamUtils {
    static <ARG1, ARG2, RESULT> Stream<RESULT> zip(
            Stream<ARG1> s1,
            Stream<ARG2> s2,
            BiFunction<ARG1, ARG2, RESULT> combiner) {
        final var i2 = s2.iterator();
        return s1.map(x1 -> i2.hasNext() ? combiner.apply(x1, i2.next()) : null)
                .takeWhile(Objects::nonNull);
    }
}

Hier ist ein Unit-Test-Code (viel länger als der Code selbst!)

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;

class StreamUtilsTest {
    @ParameterizedTest
    @MethodSource("shouldZipTestCases")
    <ARG1, ARG2, RESULT>
    void shouldZip(
            String testName,
            Stream<ARG1> s1,
            Stream<ARG2> s2,
            BiFunction<ARG1, ARG2, RESULT> combiner,
            Stream<RESULT> expected) {
        var actual = StreamUtils.zip(s1, s2, combiner);

        assertEquals(
                expected.collect(Collectors.toList()),
                actual.collect(Collectors.toList()),
                testName);
    }

    private static Stream<Arguments> shouldZipTestCases() {
        return Stream.of(
                Arguments.of(
                        "Two empty streams",
                        Stream.empty(),
                        Stream.empty(),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "One singleton and one empty stream",
                        Stream.of(1),
                        Stream.empty(),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "One empty and one singleton stream",
                        Stream.empty(),
                        Stream.of(1),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "Two singleton streams",
                        Stream.of("blah"),
                        Stream.of(1),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("blah", 1))),
                Arguments.of(
                        "One singleton, one multiple stream",
                        Stream.of("blob"),
                        Stream.of(2, 3),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("blob", 2))),
                Arguments.of(
                        "One multiple, one singleton stream",
                        Stream.of("foo", "bar"),
                        Stream.of(4),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("foo", 4))),
                Arguments.of(
                        "Two multiple streams",
                        Stream.of("nine", "eleven"),
                        Stream.of(10, 12),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("nine", 10), pair("eleven", 12)))
        );
    }

    private static List<Object> pair(Object o1, Object o2) {
        return List.of(o1, o2);
    }

    static private <T1, T2> List<Object> combine(T1 o1, T2 o2) {
        return List.of(o1, o2);
    }

    @Test
    void shouldLazilyEvaluateInZip() {
        final var a = new AtomicInteger();
        final var b = new AtomicInteger();
        final var zipped = StreamUtils.zip(
                Stream.generate(a::incrementAndGet),
                Stream.generate(b::decrementAndGet),
                (xa, xb) -> xb + 3 * xa);

        assertEquals(0, a.get(), "Should not have evaluated a at start");
        assertEquals(0, b.get(), "Should not have evaluated b at start");

        final var takeTwo = zipped.limit(2);

        assertEquals(0, a.get(), "Should not have evaluated a at take");
        assertEquals(0, b.get(), "Should not have evaluated b at take");

        final var list = takeTwo.collect(Collectors.toList());

        assertEquals(2, a.get(), "Should have evaluated a after collect");
        assertEquals(-2, b.get(), "Should have evaluated b after collect");
        assertEquals(List.of(2, 4), list);
    }
}
dominant
quelle
Ich musste das takeWhileam Ende fallen lassen, das scheint nicht in Java8 zu sein, aber es ist kein Problem, da der Angerufene alle Nullen herausfiltern kann, die auftreten, wenn die komprimierten Streams nicht die gleiche Größe haben. Ich denke, dass diese Antwort die Antwort Nummer 1 sein sollte, da sie konsistent und verständlich ist. Nochmals tolle Arbeit, danke.
Simbo1905
1
public class Tuple<S,T> {
    private final S object1;
    private final T object2;

    public Tuple(S object1, T object2) {
        this.object1 = object1;
        this.object2 = object2;
    }

    public S getObject1() {
        return object1;
    }

    public T getObject2() {
        return object2;
    }
}


public class StreamUtils {

    private StreamUtils() {
    }

    public static <T> Stream<Tuple<Integer,T>> zipWithIndex(Stream<T> stream) {
        Stream<Integer> integerStream = IntStream.range(0, Integer.MAX_VALUE).boxed();
        Iterator<Integer> integerIterator = integerStream.iterator();
        return stream.map(x -> new Tuple<>(integerIterator.next(), x));
    }
}
robby_pelssers
quelle
1

Die Cyclops-Reaktion von AOL , zu der ich beitrage, bietet auch Zipping-Funktionen, sowohl über eine erweiterte Stream-Implementierung , die auch die ReactiveSeq-Schnittstelle für reaktive Streams implementiert, als auch über StreamUtils, die Standard-Java-Streams über statische Methoden weitgehend dieselbe Funktionalität bietet.

 List<Tuple2<Integer,Integer>> list =  ReactiveSeq.of(1,2,3,4,5,6)
                                                  .zip(Stream.of(100,200,300,400));


  List<Tuple2<Integer,Integer>> list = StreamUtils.zip(Stream.of(1,2,3,4,5,6),
                                                  Stream.of(100,200,300,400));

Es bietet auch allgemeinere anwendungsbezogene Reißverschlüsse. Z.B

   ReactiveSeq.of("a","b","c")
              .ap3(this::concat)
              .ap(of("1","2","3"))
              .ap(of(".","?","!"))
              .toList();

   //List("a1.","b2?","c3!");

   private String concat(String a, String b, String c){
    return a+b+c;
   }

Und sogar die Möglichkeit, jedes Element in einem Stream mit jedem Element in einem anderen zu koppeln

   ReactiveSeq.of("a","b","c")
              .forEach2(str->Stream.of(str+"!","2"), a->b->a+"_"+b);

   //ReactiveSeq("a_a!","a_2","b_b!","b_2","c_c!","c2")
John McClean
quelle
0

Wenn jemand dies noch braucht, gibt es eine StreamEx.zipWithFunktion in der Streamex- Bibliothek:

StreamEx<String> givenNames = StreamEx.of("Leo", "Fyodor")
StreamEx<String> familyNames = StreamEx.of("Tolstoy", "Dostoevsky")
StreamEx<String> fullNames = givenNames.zipWith(familyNames, (gn, fn) -> gn + " " + fn);

fullNames.forEach(System.out::println);  // prints: "Leo Tolstoy\nFyodor Dostoevsky\n"
const.grigoryev
quelle
-1

Das ist toll. Ich musste zwei Streams in eine Map komprimieren, wobei ein Stream der Schlüssel und der andere der Wert war

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");    
final Stream<Map.Entry<String, String>> s = StreamUtils.zip(streamA,
                    streamB,
                    (a, b) -> {
                        final Map.Entry<String, String> entry = new AbstractMap.SimpleEntry<String, String>(a, b);
                        return entry;
                    });

System.out.println(s.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));

Ausgabe: {A = Apfel, B = Banane, C = Karotte}

Gnana
quelle