Wie wird in RxJava eine Variable beim Verketten von Observablen weitergegeben?

79

Ich verkette asynchrone Operationen mit RxJava und möchte eine Variable nachgeschaltet übergeben:

Observable
   .from(modifications)
   .flatmap( (data1) -> { return op1(data1); })
   ...
   .flatmap( (data2) -> { 
       // How to access data1 here ?
       return op2(data2);
   })

Es scheint ein allgemeines Muster zu sein, aber ich konnte keine Informationen darüber finden.

Julian Go
quelle

Antworten:

64

Der Rat, den ich vom Couchbase-Forum erhalten habe, ist die Verwendung verschachtelter Observablen:

Observable
   .from(modifications)
   .flatmap( (data1) -> { 
       return op1(data1)
           ...
           .flatmap( (data2) -> { 
               // I can access data1 here
               return op2(data2);
           })
   });

EDIT: Ich werde dies als akzeptierte Antwort markieren, da es am meisten empfohlen wird. Wenn Ihre Verarbeitung zu komplex ist, um alles zu verschachteln, können Sie die Lösung auch mit Funktionsaufrufen überprüfen.

Julian Go
quelle
Dies ist, was ich oft tue und bisher gut funktioniert hat.
Will
Hm das ist nicht viel anders als wenn ich "Daten außerhalb des Streams speichere".
oder
18

Eine andere Möglichkeit besteht darin, das Ergebnis von a op1auf eine org.apache.commons.lang3.tuple.PairVariable abzubilden , die die Variable enthält, und diese weiterzugeben:

Observable
   .from(modifications)
   .flatmap( (data1) -> {
       return op1(data1).map( obj -> { return Pair.of(data1,obj); });
   })
   ...
   .flatmap( (dataPair) -> { 
       // data1 is dataPair.getLeft()
       return op2(dataPair.getRight());
   })

Es funktioniert, aber es ist etwas unangenehm, Variablen in einem Pair / Triple / ... versteckt zu haben, und es wird sehr ausführlich, wenn Sie die Java 6-Notation verwenden.

Ich frage mich, ob es eine bessere Lösung gibt, vielleicht könnte ein RxJava-Operator helfen?

Julian Go
quelle
2
Ich glaube nicht, dass daran etwas falsch ist, aber wenn ich das Gefühl habe, nach der Pair-Klasse greifen zu müssen, habe ich auch das Gefühl, dass ich etwas falsch mache. Die Häufigkeit, mit der ich es verwendet und später wieder herausgerechnet habe, sobald ich ein besseres Verständnis für meine Domain habe.
Will
Dies ist die Lösung , die zu mir kam, obwohl es eine Tonne Müll Herstellung schaffen würde PairInstanzen nur zu halten , data1neben obj. Ich frage mich, ob ein combinefunktionieren würde.
Scorpiodawg
7

Flatmap kann ein zweites Argument dauern:

Observable.just("foo")
                .flatMap(foo -> Observable.range(1, 5), Pair::of)
                .subscribe(pair -> System.out.println("Result: " + pair.getFirst() + " Foo: " + pair.getSecond()));

Quelle: https://medium.com/rxjava-tidbits/rxjava-tidbits-1-use-flatmap-and-retain-original-source-value-4ec6a2de52d4

Sisyphus
quelle
Wie importiere ich ein Paar?
Dyno Cris
@ DynoCris Sie können Ihre eigenen schreiben oder es aus einigen Bibliotheken importieren (nur Google Java + Paar)
Sisyphus
6

Eine Möglichkeit wäre, einen Funktionsaufruf zu verwenden:

private static Observable<T> myFunc(final Object data1) {
    return op1(data1)
        ...
        .flatmap( (data2) -> { 
            // I can access data1 here
            return op2(data2);
        });
}

Observable
   .from(modifications)
   .flatmap( (data1) -> { return myFunc(data1); })

ABER: Korrigieren Sie mich, wenn ich falsch liege, aber es fühlt sich nicht nach reaktiver Programmierung an

Julian Go
quelle
2
Ist das viel anders als Ihre erste Option?
Will
@Will ja, ich denke, es ist funktional ähnlich wie die Verwendung von verschachtelten Aufrufen. Für eine komplexe Verarbeitung ist es wahrscheinlich doch keine schlechte Wahl.
Julian Go
3

Eigentlich haben wir eine Bibliothek, die Anrufketten vereinfacht.

https://github.com/pakoito/Komprehensions

Hinzufügen als Gradle-Abhängigkeit:

implementation 'io.reactivex.rxjava2:rxjava:2.2.1'
implementation 'com.github.pakoito.Komprehensions:komprehensions-rx2:1.3.2'

Verwendung (Kotlin):

val observable = doFlatMap(
    { Observable.from(modifications) },
    { data1 -> op1(data1) },
    { data1, data2 -> op2(data2) },
    { data1, data2, data3 -> op3(data1, data2, data3) }
)
Andrew
quelle
0

Die Lösung für diesen Thread funktioniert, aber für komplexe Ketten ist das Lesen von Code schwierig. Ich musste mehrere Werte übergeben. Ich habe eine private Klasse mit allen Parametern erstellt. Ich finde, dass Code auf diese Weise besser lesbar ist.

private class CommonData{
   private string data1;
   private string data2;

   *getters and setters*
}
...
final CommonData data = new CommonData();
Observable
   .from(modifications)
   .flatmap( (data1) -> { 
       data.setData1(data1);
       return op1(data1); 
   })
   ...
   .flatmap( (data2) -> { 
       data2 = data.getData1() + "data 2... ";
       data.setData2(data2);
       return op2(data2);
   })

ich hoffe es hilft

josesuero
quelle
2
Leider ist dies wahrscheinlich nicht threadsicher (abhängig von Ihrem Code führt RxJava die Berechnung für mehrere Threads aus)
Julian Go
Ich bin verwirrt, warum sollte eine private, nicht statische Variable thread- "unsicher" sein
josesuero
1
Wenn RxJava mehrere Threads erstellt, um Ihre Daten mit flatmap () zu versehen, wird die CommonData-Instanz von den Threads gemeinsam genutzt, auch wenn sie nicht statisch ist. (Dies sollte mit einigen Protokollierungen testbar sein, die den aktuellen Thread und die CommonData-Werte anzeigen)
Julian Go
Ich muss eine andere Lösung finden, denn wenn Sie etwas tun müssen wie: wenn (boolescher) Beobachter1 zurückgeben, sonst Beobachter2 zurückgeben; Sie müssten auf beiden Flatmaps, was wäre .. schlecht
josesuero
0

Ich weiß, dass dies eine alte Frage ist, aber mit RxJava2 & Lambda können Sie Folgendes verwenden:

Observable
.from(modifications)
.flatMap((Function<Data1, ObservableSource<Data2>>) data1 -> {
                        //Get data 2 obeservable

                            return Observable.just(new Data2())
                        }
                    }, Pair::of)

Beim nächsten Fluss (Flatmap / Map) lautet Ihr Ausgabepaar (Daten1, Daten2).

Ndivhuwo
quelle
-2

Sie können die "globale" Variable verwenden, um dies zu erreichen:

 Object[] data1Wrapper = new Object[]{null};
 Object[] data2Wrapper = new Object[]{null};
 Observable
    .from(modifications)
    .flatmap(data1 -> {
        data1Wrapper[0] = data1;
        return op1(data1)
     })
      ...
    .flatmap(data2 -> { 
        // I can access data1 here use data1Wrapper[0]
        Object data1 = data1Wrapper[0];
        data2Wrapper[0] = data2;
        return op2(data2);
     })
happyyangyuan
quelle
4
Dies ist ein schlechter Rat. Speichern Sie keine Daten außerhalb des Streams, verwenden Sie SwitchMap oder einen ähnlichen Ansatz, um Daten zu übergeben. Das ist Antimuster.
Ab dem