Warum ist IEnumerable.ToObservable so langsam?

9

Ich versuche , eine große aufzuzählen IEnumerableeinmal, und beobachten Sie die Aufzählung mit verschiedenen Betreibern angebracht ( Count, Sum, Averageusw.). Der naheliegende Weg besteht darin, es IObservablemit der Methode in eine umzuwandeln ToObservableund dann einen Beobachter zu abonnieren. Mir ist aufgefallen, dass dies viel langsamer ist als andere Methoden, z. B. eine einfache Schleife ausführen und den Beobachter bei jeder Iteration benachrichtigen oder stattdessen die Observable.CreateMethode verwenden ToObservable. Der Unterschied ist erheblich: Es ist 20-30 mal langsamer. Es ist was es ist oder mache ich etwas falsch?

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 10_000_000;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }

    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create<int>(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}

Ausgabe:

ToObservable: 7,576 msec
Loop & Notify: 273 msec
Observable.Create: 511 msec

.NET Core 3.0, C # 8, System.Reactive 4.3.2, Windows 10, Konsolen-App, Release erstellt


Update: Hier ist ein Beispiel für die tatsächliche Funktionalität, die ich erreichen möchte:

var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i);
var subject = new Subject<long>();
var cntTask = subject.Count().ToTask();
var sumTask = subject.Sum().ToTask();
var avgTask = subject.Average().ToTask();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");

Ausgabe:

Anzahl: 10.000.000, Summe: 49.999.995.000.000, Durchschnitt: 4.999.999,5

Der wichtige Unterschied dieses Ansatzes gegenüber der Verwendung von Standard- LINQ- Operatoren besteht darin, dass die aufzählbare Quelle nur einmal aufgelistet wird.


Noch eine Beobachtung: Die Verwendung ToObservable(Scheduler.Immediate)ist etwas schneller (ca. 20%) als ToObservable().

Theodor Zoulias
quelle
2
Eine einmalige Messung ist nicht allzu zuverlässig. Ziehen Sie beispielsweise in Betracht, einen Benchmark mit BenchmarkDotNet einzurichten . (Nicht verbunden)
Fildor
1
@TheodorZoulias Es steckt noch mehr dahinter. Ich würde zum Beispiel Ihren Benchmark in Frage stellen, da die Ausführungsreihenfolge innerhalb dieses einzelnen Laufs große Unterschiede verursachen kann.
Oliver
1
Stoppuhr kann ausreichen, wenn Sie Statistiken gesammelt haben. Nicht nur eine einzige Probe.
Fildor
2
@Fildor - Fair genug. Ich meine, die Zahlen sind repräsentativ für das, was man erwarten sollte.
Rätselhaftigkeit
2
@ TheodorZoulias - Gute Frage, übrigens.
Rätselhaftigkeit

Antworten:

6

Dies ist der Unterschied zwischen einem gut erzogenen Beobachtbaren und einem "Roll-your-own-weil-du-denkst-schneller-ist-besser-aber-es-ist-nicht-beobachtbar".

Wenn Sie weit genug in die Quelle eintauchen, entdecken Sie diese schöne kleine Linie:

scheduler.Schedule(this, (IScheduler innerScheduler, _ @this) => @this.LoopRec(innerScheduler));

Der Aufruf erfolgt effektiv hasNext = enumerator.MoveNext();einmal pro geplanter rekursiver Iteration.

Auf diese Weise können Sie den Scheduler für Ihre auswählen .ToObservable(schedulerOfYourChoice) Anruf .

Mit den anderen Optionen, die Sie ausgewählt haben, haben Sie eine Reihe von Aufrufen erstellt, .OnNextdie praktisch nichts bewirken. Method2hat nicht einmal eine.Subscribe Anruf.

Sowohl von Method2als auchMethod1 lief mit dem aktuellen Thread und beide laufen bis zum Abschluss , bevor das Abonnement beendet ist. Sie blockieren Anrufe. Sie können Rennbedingungen verursachen.

Method1ist der einzige, der sich gut als beobachtbar verhält. Es ist asynchron und kann unabhängig vom Teilnehmer ausgeführt werden.

Denken Sie daran, dass Observables Sammlungen sind, die im Laufe der Zeit ausgeführt werden. Sie haben normalerweise eine asynchrone Quelle oder einen Timer oder reagieren auf externe Reize. Sie laufen nicht oft von einer einfachen Aufzählung ab. Wenn Sie mit einer Aufzählung arbeiten, sollte erwartet werden, dass die synchrone Arbeit schneller ausgeführt wird.

Geschwindigkeit ist nicht das Ziel von Rx. Das Ziel ist es, komplexe Abfragen mit zeitbasierten, Push-Werten durchzuführen.

Rätselhaftigkeit
quelle
2
"Roll-your-own-weil-du-denkst-schneller-ist-besser-aber-es-ist-nicht" - ausgezeichnet !!
Fildor
Danke Enigmativity für die ausführliche Antwort! Ich habe meine Frage mit einem Beispiel aktualisiert, was ich tatsächlich erreichen möchte, nämlich einer synchronen Berechnung. Denken Sie, dass ich anstelle von reaktiven Erweiterungen nach einem anderen Tool suchen sollte, da die Leistung in meinem Fall von entscheidender Bedeutung ist?
Theodor Zoulias
@TheodorZoulias - Hier ist die unzählige Möglichkeit, Ihr Beispiel in Ihrer Frage zu machen : source.Aggregate(new { count = 0, sum = 0L }, (a, x) => new { count = a.count + 1, sum = a.sum + x }, a => new { a.count, a.sum, average = (double)a.sum / a.count }). Nur eine Iteration und über 10x schneller als Rx.
Rätselhaftigkeit
Ich habe es gerade getestet und es ist zwar schneller, aber nur etwa x2 schneller (im Vergleich zu RX ohne ToObservable). Dies ist das andere Extrem, bei dem ich die beste Leistung habe, aber gezwungen bin, jeden LINQ-Operator in einem komplexen Lambda-Ausdruck erneut zu implementieren. Es ist fehleranfällig und weniger wartbar, wenn man bedenkt, dass meine tatsächlichen Berechnungen noch mehr Operatoren und Kombinationen davon beinhalten. Ich denke, dass es ziemlich verlockend ist, einen x2-Leistungspreis für eine klare und lesbare Lösung zu zahlen. Auf der anderen Seite x10 oder x20 bezahlen, nicht so viel!
Theodor Zoulias
Wenn Sie genau das posten, was Sie versuchen, könnte ich vielleicht eine Alternative vorschlagen?
Rätselhaftigkeit
-1

Weil das Subjekt nichts tut.

Es sieht so aus, als ob die Leistung der Schleifenanweisung in zwei Fällen unterschiedlich ist:

for(int i=0;i<1000000;i++)
    total++;

oder

for(int i=0;i<1000000;i++)
    DoHeavyJob();

Wenn Sie einen anderen Betreff mit einer langsamen OnNext-Implementierung verwenden, ist das Ergebnis akzeptabler

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 100;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }

    class My_Slow_Subject : SubjectBase<int>
    {

        public override void OnNext(int value)
        {
            //do a job which spend 3ms
            System.Threading.Thread.Sleep(3);
        }


        bool _disposed;
        public override bool IsDisposed => _disposed;
        public override void Dispose() => _disposed = true;
        public override void OnCompleted() { }
        public override void OnError(Exception error) { }
        public override bool HasObservers => false;
        public override IDisposable Subscribe(IObserver<int> observer) 
                => throw new NotImplementedException();
    }

    static SubjectBase<int> CreateSubject()
    {
        return new My_Slow_Subject();
    }

    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create<int>(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}

Ausgabe

ToObservable: 434 msec
Loop & Notify: 398 msec
Observable.Create: 394 msec

Der ToObservable unterstützt System.Reactive.Concurrency.IScheduler

Das heißt, Sie können Ihren eigenen IScheduler implementieren und entscheiden, wann die einzelnen Aufgaben ausgeführt werden sollen

Hoffe das hilft

Grüße

BlazorPlus
quelle
Sie wissen, dass OP explizit von COUNT-Werten spricht, die 100.000x höher sind?
Fildor
Danke BlazorPlus für die Antwort. Ich habe meine Frage aktualisiert und ein realistischeres Beispiel für meinen Anwendungsfall hinzugefügt. Das subjectwird von anderen Operatoren beobachtet, die Berechnungen durchführen, also tut es nichts. Der Leistungsverlust bei der Verwendung ToObservableist immer noch erheblich, da die Berechnungen sehr leicht sind.
Theodor Zoulias