Ich versuche , eine große aufzuzählen IEnumerable
einmal, und beobachten Sie die Aufzählung mit verschiedenen Betreibern angebracht ( Count
, Sum
, Average
usw.). Der naheliegende Weg besteht darin, es IObservable
mit der Methode in eine umzuwandeln ToObservable
und dann einen Beobachter zu abonnieren. Mir ist aufgefallen, dass dies viel langsamer ist als andere Methoden, z. B. eine einfache Schleife ausführen und den Beobachter bei jeder Iteration benachrichtigen oder stattdessen die Observable.Create
Methode verwenden ToObservable
. Der Unterschied ist erheblich: Es ist 20-30 mal langsamer. Es ist was es ist oder mache ich etwas falsch?
using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
public static class Program
{
static void Main(string[] args)
{
const int COUNT = 10_000_000;
Method1(COUNT);
Method2(COUNT);
Method3(COUNT);
}
static void Method1(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static void Method2(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
foreach (var item in source) subject.OnNext(item);
subject.OnCompleted();
Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static void Method3(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
Observable.Create<int>(o =>
{
foreach (var item in source) o.OnNext(item);
o.OnCompleted();
return Disposable.Empty;
}).Subscribe(subject);
Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
}
Ausgabe:
ToObservable: 7,576 msec
Loop & Notify: 273 msec
Observable.Create: 511 msec
.NET Core 3.0, C # 8, System.Reactive 4.3.2, Windows 10, Konsolen-App, Release erstellt
Update: Hier ist ein Beispiel für die tatsächliche Funktionalität, die ich erreichen möchte:
var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i);
var subject = new Subject<long>();
var cntTask = subject.Count().ToTask();
var sumTask = subject.Sum().ToTask();
var avgTask = subject.Average().ToTask();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");
Ausgabe:
Anzahl: 10.000.000, Summe: 49.999.995.000.000, Durchschnitt: 4.999.999,5
Der wichtige Unterschied dieses Ansatzes gegenüber der Verwendung von Standard- LINQ- Operatoren besteht darin, dass die aufzählbare Quelle nur einmal aufgelistet wird.
Noch eine Beobachtung: Die Verwendung ToObservable(Scheduler.Immediate)
ist etwas schneller (ca. 20%) als ToObservable()
.
quelle
Antworten:
Dies ist der Unterschied zwischen einem gut erzogenen Beobachtbaren und einem "Roll-your-own-weil-du-denkst-schneller-ist-besser-aber-es-ist-nicht-beobachtbar".
Wenn Sie weit genug in die Quelle eintauchen, entdecken Sie diese schöne kleine Linie:
Der Aufruf erfolgt effektiv
hasNext = enumerator.MoveNext();
einmal pro geplanter rekursiver Iteration.Auf diese Weise können Sie den Scheduler für Ihre auswählen
.ToObservable(schedulerOfYourChoice)
Anruf .Mit den anderen Optionen, die Sie ausgewählt haben, haben Sie eine Reihe von Aufrufen erstellt,
.OnNext
die praktisch nichts bewirken.Method2
hat nicht einmal eine.Subscribe
Anruf.Sowohl von
Method2
als auchMethod1
lief mit dem aktuellen Thread und beide laufen bis zum Abschluss , bevor das Abonnement beendet ist. Sie blockieren Anrufe. Sie können Rennbedingungen verursachen.Method1
ist der einzige, der sich gut als beobachtbar verhält. Es ist asynchron und kann unabhängig vom Teilnehmer ausgeführt werden.Denken Sie daran, dass Observables Sammlungen sind, die im Laufe der Zeit ausgeführt werden. Sie haben normalerweise eine asynchrone Quelle oder einen Timer oder reagieren auf externe Reize. Sie laufen nicht oft von einer einfachen Aufzählung ab. Wenn Sie mit einer Aufzählung arbeiten, sollte erwartet werden, dass die synchrone Arbeit schneller ausgeführt wird.
Geschwindigkeit ist nicht das Ziel von Rx. Das Ziel ist es, komplexe Abfragen mit zeitbasierten, Push-Werten durchzuführen.
quelle
source.Aggregate(new { count = 0, sum = 0L }, (a, x) => new { count = a.count + 1, sum = a.sum + x }, a => new { a.count, a.sum, average = (double)a.sum / a.count })
. Nur eine Iteration und über 10x schneller als Rx.ToObservable
). Dies ist das andere Extrem, bei dem ich die beste Leistung habe, aber gezwungen bin, jeden LINQ-Operator in einem komplexen Lambda-Ausdruck erneut zu implementieren. Es ist fehleranfällig und weniger wartbar, wenn man bedenkt, dass meine tatsächlichen Berechnungen noch mehr Operatoren und Kombinationen davon beinhalten. Ich denke, dass es ziemlich verlockend ist, einen x2-Leistungspreis für eine klare und lesbare Lösung zu zahlen. Auf der anderen Seite x10 oder x20 bezahlen, nicht so viel!Weil das Subjekt nichts tut.
Es sieht so aus, als ob die Leistung der Schleifenanweisung in zwei Fällen unterschiedlich ist:
oder
Wenn Sie einen anderen Betreff mit einer langsamen OnNext-Implementierung verwenden, ist das Ergebnis akzeptabler
Ausgabe
Der ToObservable unterstützt System.Reactive.Concurrency.IScheduler
Das heißt, Sie können Ihren eigenen IScheduler implementieren und entscheiden, wann die einzelnen Aufgaben ausgeführt werden sollen
Hoffe das hilft
Grüße
quelle
subject
wird von anderen Operatoren beobachtet, die Berechnungen durchführen, also tut es nichts. Der Leistungsverlust bei der VerwendungToObservable
ist immer noch erheblich, da die Berechnungen sehr leicht sind.