Unterschied zwischen CompletableFuture, Future und RxJava's Observable

193

Ich möchte den Unterschied zwischen wissen CompletableFuture, Futureund Observable RxJava.

Was ich weiß ist, dass alle asynchron sind, aber

Future.get() blockiert den Thread

CompletableFuture gibt die Rückrufmethoden an

RxJava Observable--- ähnlich wie CompletableFuturebei anderen Vorteilen (nicht sicher)

Zum Beispiel: Wenn der Client mehrere Serviceaufrufe tätigen muss und wenn wir Futures(Java) verwenden, Future.get()wird er nacheinander ausgeführt ... möchte wissen, wie es in RxJava besser ist.

Und die Dokumentation http://reactivex.io/intro.html sagt

Es ist schwierig, Futures zu verwenden, um bedingte asynchrone Ausführungsflüsse optimal zusammenzustellen (oder unmöglich, da die Latenzen jeder Anforderung zur Laufzeit variieren). Dies ist natürlich möglich, wird jedoch schnell kompliziert (und damit fehleranfällig) oder blockiert Future.get () vorzeitig, wodurch der Vorteil einer asynchronen Ausführung entfällt.

Wirklich interessiert zu wissen, wie RxJavadieses Problem gelöst wird. Ich fand es schwierig, aus der Dokumentation zu verstehen.

shiv455
quelle
Haben Sie die Dokumentation für jeden gelesen? Ich bin mit RxJava völlig unbekannt, aber die Dokumentation erscheint auf einen Blick äußerst gründlich. Es scheint nicht besonders vergleichbar mit den beiden Futures.
FThompson
Ich habe durchgemacht, aber nicht in der Lage zu bekommen, wie unterschiedlich es von Java-Futures ist ... korrigiere mich, wenn ich falsch
liege
Wie ähneln Observables Futures?
FThompson
2
Möchten Sie wissen, wo es anders ist, wie ist es anders in der Thread-Verwaltung? EX: Future.get () blockiert den Thread .... wie wird er in Observable behandelt ???
Shiv455
2
Zumindest ist es ein bisschen verwirrend für mich ... ein hoher Unterschied wäre wirklich hilfreich !!
Shiv455

Antworten:

278

Futures

Futures wurden in Java 5 (2004) eingeführt. Sie sind im Grunde genommen Platzhalter für ein Ergebnis einer Operation, die noch nicht abgeschlossen ist. Sobald der Vorgang abgeschlossen ist, Futureenthält der das Ergebnis. Eine Operation kann beispielsweise eine ausführbare oder aufrufbare Instanz sein, die an einen ExecutorService gesendet wird . Der Übermittler der Operation kann das FutureObjekt verwenden, um zu überprüfen, ob die Operation fertig ist () , oder mit der blockierenden Methode get () warten, bis sie abgeschlossen ist .

Beispiel:

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}

CompletableFutures

CompletableFutures wurden in Java 8 (2014) eingeführt. Sie sind in der Tat eine Weiterentwicklung der regulären Futures, inspiriert von Googles Listenable Futures , einem Teil der Guava- Bibliothek. Es handelt sich um Futures, mit denen Sie auch Aufgaben in einer Kette aneinanderreihen können. Sie können sie verwenden, um einem Arbeitsthread anzuweisen, "eine Aufgabe X zu erledigen, und wenn Sie fertig sind, diese andere Sache mit dem Ergebnis von X zu erledigen". Mit CompletableFutures können Sie mit dem Ergebnis der Operation etwas anfangen, ohne einen Thread zu blockieren, um auf das Ergebnis zu warten. Hier ist ein einfaches Beispiel:

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava

RxJava ist eine vollständige Bibliothek für reaktive Programmierung, die bei Netflix erstellt wurde. Auf den ersten Blick scheint es den Streams von Java 8 ähnlich zu sein . Es ist, außer es ist viel mächtiger.

Ähnlich wie bei Futures kann RxJava verwendet werden, um eine Reihe synchroner oder asynchroner Aktionen zu einer Verarbeitungspipeline zusammenzufügen. Im Gegensatz zu Futures, die zum Einmalgebrauch bestimmt sind, arbeitet RxJava mit Streams mit null oder mehr Elementen. Einschließlich nie endender Streams mit einer unendlichen Anzahl von Elementen. Es ist auch viel flexibler und leistungsfähiger dank einer unglaublich reichen Anzahl von Bedienern .

Im Gegensatz zu den Streams von Java 8 verfügt RxJava auch über einen Gegendruckmechanismus , mit dem Fälle behandelt werden können, in denen verschiedene Teile Ihrer Verarbeitungspipeline in unterschiedlichen Threads mit unterschiedlichen Raten arbeiten .

Der Nachteil von RxJava ist, dass es trotz der soliden Dokumentation aufgrund des damit verbundenen Paradigmenwechsels eine herausfordernde Bibliothek ist, etwas zu lernen. Rx-Code kann auch ein Albtraum beim Debuggen sein, insbesondere wenn mehrere Threads beteiligt sind, und noch schlimmer - wenn Gegendruck erforderlich ist.

Wenn Sie sich darauf einlassen möchten, finden Sie auf der offiziellen Website eine ganze Seite mit verschiedenen Tutorials sowie die offizielle Dokumentation und Javadoc . Sie können sich auch einige Videos wie dieses ansehen, die eine kurze Einführung in Rx geben und auch über die Unterschiede zwischen Rx und Futures sprechen.

Bonus: Java 9 Reactive Streams

Java 9s Reactive Streams aka Flow - API sind eine Reihe von Schnittstellen von verschiedenen implementierten reaktiven Strom Bibliotheken wie RxJava 2 , Akka Bäche und Vertx . Sie ermöglichen die Verbindung dieser reaktiven Bibliotheken unter Beibehaltung des wichtigen Gegendrucks.

Malz
quelle
Wäre schön, Beispielcode zu geben, wie Rx dies tut
Zinan Xing
Mit Reactive Streams können wir also RxJava, Akka und Vertx in einer Anwendung mischen?
IgorGanapolsky
1
@IgorGanapolsky Ja.
Malt
In CompletableFutures verwenden wir Rückrufmethoden. Diese Rückrufmethoden werden auch blockiert, wenn die Ausgabe einer Methode die Eingabe eines anderen Rückrufs ist. Als zukünftiger Block mit Future.get () aufrufen. Warum es heißt, dass Future.get () den Anruf blockiert, während CompletableFutures nicht blockiert. Bitte erklären Sie
Deepak
1
@ Federico Sicher. Jedes Futureist ein Platzhalter für ein einzelnes Ergebnis, das möglicherweise noch nicht abgeschlossen ist. Wenn Sie denselben Vorgang erneut ausführen, erhalten Sie eine neue FutureInstanz. RxJava befasst sich mit Strömen von Ergebnissen , die jederzeit kommen kann. Eine Reihe von Operationen kann daher eine einzelne beobachtbare RxJava zurückgeben, die eine Reihe von Ergebnissen hervorbringt. Es ist ein bisschen wie der Unterschied zwischen einem einzelnen Briefumschlag und einem Luftschlauch, der immer wieder Post herauspumpt.
Malt
20

Ich arbeite seit 0.9 mit Rx Java, jetzt bei 1.3.2, und migriere bald zu 2.x. Ich verwende dies in einem privaten Projekt, an dem ich bereits seit 8 Jahren arbeite.

Ohne diese Bibliothek würde ich überhaupt nicht mehr programmieren. Am Anfang war ich skeptisch, aber es ist ein völlig anderer Geisteszustand, den Sie schaffen müssen. Am Anfang ruhig schwierig. Ich habe manchmal stundenlang auf die Murmeln geschaut .. lol

Es ist nur eine Frage der Übung und des tatsächlichen Kennenlernens des Flusses (auch bekannt als Vertrag von Observablen und Beobachtern). Wenn Sie dort angekommen sind, werden Sie es hassen, es anders zu machen.

Für mich hat diese Bibliothek keinen wirklichen Nachteil.

Anwendungsfall: Ich habe eine Monitoransicht, die 9 Anzeigen enthält (CPU, Mem, Netzwerk usw.). Beim Starten der Ansicht abonniert die Ansicht eine Systemüberwachungsklasse, die ein beobachtbares Intervall (Intervall) zurückgibt, das alle Daten für die 9 Meter enthält. Es wird jede Sekunde ein neues Ergebnis in die Ansicht verschoben (also keine Abfrage !!!). Dieses Observable verwendet eine Flatmap, um gleichzeitig (asynchron!) Daten aus 9 verschiedenen Quellen abzurufen und das Ergebnis in ein neues Modell zu komprimieren, das Ihre Ansicht auf onNext () erhält.

Wie zum Teufel machst du das mit Futures, Completables usw. ... Viel Glück! :) :)

Rx Java löst viele Probleme bei der Programmierung für mich und macht es in gewisser Weise viel einfacher ...

Vorteile:

  • Statelss !!! (wichtig zu erwähnen, am wichtigsten vielleicht)
  • Thread-Management sofort einsatzbereit
  • Erstellen Sie Sequenzen mit einem eigenen Lebenszyklus
  • Alles ist beobachtbar, so dass die Verkettung einfach ist
  • Weniger Code zum Schreiben
  • Einzelglas auf Klassenweg (sehr leicht)
  • Sehr gleichzeitig
  • Keine Rückrufhölle mehr
  • Abonnentenbasiert (enger Vertrag zwischen Verbraucher und Hersteller)
  • Gegendruckstrategien (Leistungsschalter ähnlich)
  • Hervorragende Fehlerbehandlung und -behebung
  • Sehr schöne Dokumentation (Murmeln <3)
  • Komplette Kontrolle
  • Viel mehr ...

Nachteile: - Schwer zu testen

Kristoffer
quelle
13
~ " Ohne diese Bibliothek würde ich überhaupt nicht mehr programmieren. " Also ist RxJava das A und O für alle Softwareprojekte?
IgorGanapolsky
Ist es nützlich, auch wenn ich keinen Stream von asynchronen Ereignissen habe?
Mukesh Verma