为什么IEnumerable.ToObservable很慢?

23 浏览
0 Comments

为什么IEnumerable.ToObservable很慢?

我正在尝试枚举一个大的IEnumerable一次,并且使用不同的操作符(CountSumAverage等)连接观察枚举结果。显而易见的方法是使用ToObservable方法将其转换为IObservable,然后向其订阅一个观察者。我注意到这比其他方法要慢得多,比如简单地循环并在每次迭代时通知观察者,或者使用Observable.Create方法而不是ToObservable。这个差异很大:它的速度慢了20到30倍。它就是它,或者我做错了什么吗?

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 10_000_000;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }
    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}

输出:

ToObservable: 7,576 msec
Loop & Notify: 273 msec
Observable.Create: 511 msec

.NET Core 3.0,C# 8,System.Reactive 4.3.2,Windows 10,Console App,Release built


更新:这里是我想实现的实际功能的示例:

var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i);
var subject = new Subject();
var cntTask = subject.Count().ToTask();
var sumTask = subject.Sum().ToTask();
var avgTask = subject.Average().ToTask();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");

输出:

Count: 10,000,000, Sum: 49,999,995,000,000, Average: 4,999,999.5

与使用标准的LINQ运算符相比,这种方法的重要差异在于源可枚举对象只被枚举一次。


再观察: 使用ToObservable(Scheduler.Immediate)比使用ToObservable()轻微地快(大约快20%)。

admin 更改状态以发布 2023年5月23日
0
0 Comments

因为主题没有做任何事情。

看起来循环语句的表现在两种情况下是不同的:

for(int i=0;i<1000000;i++)
    total++;

或者

for(int i=0;i<1000000;i++)
    DoHeavyJob();

如果使用另一个主题,它的OnNext实现较慢,结果会更能接受。

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 100;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }
    class My_Slow_Subject : SubjectBase
    {
        public override void OnNext(int value)
        {
            //do a job which spend 3ms
            System.Threading.Thread.Sleep(3);
        }
        bool _disposed;
        public override bool IsDisposed => _disposed;
        public override void Dispose() => _disposed = true;
        public override void OnCompleted() { }
        public override void OnError(Exception error) { }
        public override bool HasObservers => false;
        public override IDisposable Subscribe(IObserver observer) 
                => throw new NotImplementedException();
    }
    static SubjectBase CreateSubject()
    {
        return new My_Slow_Subject();
    }
    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}

输出

ToObservable: 434 msec
Loop & Notify: 398 msec
Observable.Create: 394 msec

ToObservable支持System.Reactive.Concurrency.IScheduler

这意味着您可以实现自己的IScheduler并决定何时运行每个任务。

希望这可以帮到您。

问候

0
0 Comments

这是一个有素质的可观察对象和一个“自己编写,因为你认为更快比较好,但实际上并不是”的可观察对象之间的区别。

当你深入到源代码中时,你会发现这个可爱的小行:

scheduler.Schedule(this, (IScheduler innerScheduler, _ @this) => @this.LoopRec(innerScheduler));

实际上,这是在每个计划的递归迭代中一次调用hasNext = enumerator.MoveNext();

这允许你选择你的.ToObservable(schedulerOfYourChoice)调用的调度程序。

使用其他选项,你创建了一系列调用.OnNext几乎什么也不做。 Method2甚至没有.Subscribe调用。

Method2Method1都使用当前线程运行,并且在订阅完成之前运行到完成。它们是阻塞调用。它们可能会引起竞争条件。

Method1是唯一一个作为可观察对象表现良好的方法。它是异步的,可以独立于订阅者运行。

请记住,可观察对象是一个随时间运行的集合。它们通常有一个异步源或计时器,或者响应外部刺激。它们通常不运行在一个普通的可枚举对象上。如果你正在处理的是一个可枚举对象,那么同步工作通常应该是更快的。

速度不是 Rx 的目标。对时间为基础的推送值执行复杂查询是目标。

0