为什么IEnumerable.ToObservable很慢?
为什么IEnumerable.ToObservable很慢?
我正在尝试枚举一个大的IEnumerable
一次,并且使用不同的操作符(Count
,Sum
,Average
等)连接观察枚举结果。显而易见的方法是使用ToObservable
方法将其转换为IObservable
,然后向其订阅一个观察者。我注意到这比其他方法要慢得多,比如简单地循环并在每次迭代时通知观察者,或者使用Observable.Create
方法而不是ToObservable
。这个差异很大:它的速度慢了20到30倍。它就是它,或者我做错了什么吗?
using System; using System.Diagnostics; using System.Linq; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Reactive.Threading.Tasks; public static class Program { static void Main(string[] args) { const int COUNT = 10_000_000; Method1(COUNT); Method2(COUNT); Method3(COUNT); } static void Method1(int count) { var source = Enumerable.Range(0, count); var subject = new Subject(); var stopwatch = Stopwatch.StartNew(); source.ToObservable().Subscribe(subject); Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec"); } static void Method2(int count) { var source = Enumerable.Range(0, count); var subject = new Subject(); var stopwatch = Stopwatch.StartNew(); foreach (var item in source) subject.OnNext(item); subject.OnCompleted(); Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec"); } static void Method3(int count) { var source = Enumerable.Range(0, count); var subject = new Subject(); var stopwatch = Stopwatch.StartNew(); Observable.Create(o => { foreach (var item in source) o.OnNext(item); o.OnCompleted(); return Disposable.Empty; }).Subscribe(subject); Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec"); } }
输出:
ToObservable: 7,576 msec Loop & Notify: 273 msec Observable.Create: 511 msec
.NET Core 3.0,C# 8,System.Reactive 4.3.2,Windows 10,Console App,Release built
更新:这里是我想实现的实际功能的示例:
var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i); var subject = new Subject(); var cntTask = subject.Count().ToTask(); var sumTask = subject.Sum().ToTask(); var avgTask = subject.Average().ToTask(); source.ToObservable().Subscribe(subject); Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");
输出:
Count: 10,000,000, Sum: 49,999,995,000,000, Average: 4,999,999.5
与使用标准的LINQ运算符相比,这种方法的重要差异在于源可枚举对象只被枚举一次。
再观察: 使用ToObservable(Scheduler.Immediate)
比使用ToObservable()
轻微地快(大约快20%)。
因为主题没有做任何事情。
看起来循环语句的表现在两种情况下是不同的:
for(int i=0;i<1000000;i++) total++;
或者
for(int i=0;i<1000000;i++) DoHeavyJob();
如果使用另一个主题,它的OnNext实现较慢,结果会更能接受。
using System; using System.Diagnostics; using System.Linq; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Reactive.Threading.Tasks; public static class Program { static void Main(string[] args) { const int COUNT = 100; Method1(COUNT); Method2(COUNT); Method3(COUNT); } class My_Slow_Subject : SubjectBase{ public override void OnNext(int value) { //do a job which spend 3ms System.Threading.Thread.Sleep(3); } bool _disposed; public override bool IsDisposed => _disposed; public override void Dispose() => _disposed = true; public override void OnCompleted() { } public override void OnError(Exception error) { } public override bool HasObservers => false; public override IDisposable Subscribe(IObserver observer) => throw new NotImplementedException(); } static SubjectBase CreateSubject() { return new My_Slow_Subject(); } static void Method1(int count) { var source = Enumerable.Range(0, count); var subject = CreateSubject(); var stopwatch = Stopwatch.StartNew(); source.ToObservable().Subscribe(subject); Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec"); } static void Method2(int count) { var source = Enumerable.Range(0, count); var subject = CreateSubject(); var stopwatch = Stopwatch.StartNew(); foreach (var item in source) subject.OnNext(item); subject.OnCompleted(); Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec"); } static void Method3(int count) { var source = Enumerable.Range(0, count); var subject = CreateSubject(); var stopwatch = Stopwatch.StartNew(); Observable.Create (o => { foreach (var item in source) o.OnNext(item); o.OnCompleted(); return Disposable.Empty; }).Subscribe(subject); Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec"); } }
输出
ToObservable: 434 msec Loop & Notify: 398 msec Observable.Create: 394 msec
ToObservable支持System.Reactive.Concurrency.IScheduler
这意味着您可以实现自己的IScheduler并决定何时运行每个任务。
希望这可以帮到您。
问候
这是一个有素质的可观察对象和一个“自己编写,因为你认为更快比较好,但实际上并不是”的可观察对象之间的区别。
当你深入到源代码中时,你会发现这个可爱的小行:
scheduler.Schedule(this, (IScheduler innerScheduler, _ @this) => @this.LoopRec(innerScheduler));
实际上,这是在每个计划的递归迭代中一次调用hasNext = enumerator.MoveNext();
。
这允许你选择你的.ToObservable(schedulerOfYourChoice)
调用的调度程序。
使用其他选项,你创建了一系列调用.OnNext
几乎什么也不做。 Method2
甚至没有.Subscribe
调用。
Method2
和Method1
都使用当前线程运行,并且在订阅完成之前运行到完成。它们是阻塞调用。它们可能会引起竞争条件。
Method1
是唯一一个作为可观察对象表现良好的方法。它是异步的,可以独立于订阅者运行。
请记住,可观察对象是一个随时间运行的集合。它们通常有一个异步源或计时器,或者响应外部刺激。它们通常不运行在一个普通的可枚举对象上。如果你正在处理的是一个可枚举对象,那么同步工作通常应该是更快的。
速度不是 Rx 的目标。对时间为基础的推送值执行复杂查询是目标。