并行使用异步 lambda 的 foreach
并行使用异步 lambda 的 foreach
我想要并行处理一个集合,但是在实现时遇到了困难,因此希望能得到一些帮助。
问题出在我想要在并行循环的lambda中调用C#中标记为async的方法。例如:
var bag = new ConcurrentBag
问题是计数为0,因为所有创建的线程实际上只是后台线程,并且Parallel.ForEach
不会等待完成。如果我去掉async关键字,方法会变成这样:
var bag = new ConcurrentBag
它可以工作,但是完全禁用了await的智能性,而且我必须手动处理一些异常。(为了简洁起见已移除)。
我如何实现一个在lambda中使用await关键字的Parallel.ForEach
循环?这可行吗?
Parallel.ForEach方法的原型接受一个Action
作为参数,但我希望它等待我的异步lambda。
并行foreach与异步lambda的问题出现的原因是在并行处理集合元素时,使用异步Lambda表达式时会遇到一些困难。当使用并行foreach循环时,每个元素都会以同步的方式进行处理,这可能导致性能下降,特别是当处理的任务非常耗时时。为了解决这个问题,可以使用AsyncEnumerator NuGet包中的ParallelForEachAsync扩展方法。这个方法可以让我们在处理每个元素时使用异步的Lambda表达式。
解决方法如下:
1. 首先,需要安装AsyncEnumerator NuGet包。
2. 在代码中引入Dasync.Collections命名空间。
3. 创建一个ConcurrentBag对象,用于存储处理每个元素的结果。
4. 使用myCollection.ParallelForEachAsync方法,将异步Lambda表达式作为参数传递进去。在Lambda表达式中,可以执行一些预处理操作、调用异步方法获取数据、将结果添加到ConcurrentBag中,以及执行一些后处理操作。
5. 可以通过maxDegreeOfParallelism参数指定并行处理的最大程度。
6. 最后,可以通过bag.Count获取处理完所有元素后ConcurrentBag中元素的数量。
这个解决方法可以提高并行处理集合元素的效率,特别是在处理耗时任务时。它通过使用异步Lambda表达式,充分利用了系统资源,从而加快了处理速度。同时,使用AsyncEnumerator库还可以简化代码的编写,并提供了更好的可读性和可维护性。
需要注意的是,由于AsyncEnumerator库是开源的,使用MIT许可证,所以可以在项目中免费使用。
问题出现的原因:
在并行处理异步任务时,可能会遇到以下问题:
1. 没有限制并发任务的数量,导致同时发起大量网络请求,造成系统资源的浪费和性能问题。
2. 当使用Task.Run()方法时,任务可能会以并发的方式执行,导致输出结果的顺序混乱。
解决方法:
1. 可以使用限流机制来控制并发任务的数量。可以将列表分成小批次,对每个小批次使用Task.WhenAll()方法,以避免同时创建大量任务。
2. 可以使用Stephen Toub提供的ForEachAsync扩展方法来处理更复杂的并发任务,该方法提供了错误处理的功能。
代码示例:
var bag = new ConcurrentBag
参考链接:
- stackoverflow.com/questions/10806951/…
- medium.com/.puiu/parallel-foreach-async-in-c-36756f8ebe62
在.NET 6中引入了一个新的API,即Parallel.ForEachAsync,它是一种调度异步工作的方式,可以控制并行度。上面给出了一个使用Parallel.ForEachAsync的示例代码,该代码使用HttpClient异步获取多个URL的内容,并将其保存到本地文件中。下面是这个问题的出现的原因以及解决方法。
问题的出现的原因:
在并行处理异步任务时,可能会遇到一些问题。在上面的示例代码中,使用了一个异步Lambda表达式作为参数传递给Parallel.ForEachAsync方法。这个Lambda表达式是异步执行的,它在每个URL上执行HTTP请求并将结果保存到本地文件。然而,由于并行处理的特性,可能会出现以下问题:
1. 并行度过高:如果设置的并行度过高,即同时处理的任务数量过多,可能会导致系统资源不足,导致性能下降甚至系统崩溃。
2. 异常处理困难:在并行处理任务时,可能会出现异常情况。如果没有适当的异常处理机制,异常可能会导致整个并行处理过程中断,而不是只中断出现异常的任务。
解决方法:
针对以上问题,可以采取以下解决方法:
1. 控制并行度:可以使用ParallelOptions类的MaxDegreeOfParallelism属性来控制并行度。在示例代码中,options对象设置了MaxDegreeOfParallelism属性为2,表示最多同时处理2个任务。通过适当调整并行度,可以避免系统资源过度占用的问题。
2. 异常处理:为了处理并行处理过程中的异常,可以使用try-catch语句来捕获异常并进行处理。在示例代码中,可以将try-catch语句包裹在异步Lambda表达式中,以捕获并处理每个任务可能出现的异常。此外,还可以使用Task.WhenAll方法来等待所有任务完成,并在任务完成后处理可能出现的异常。
通过使用Parallel.ForEachAsync方法和适当的并行度控制以及异常处理机制,可以实现并行处理异步任务的需求,并提高系统的性能和稳定性。在实际应用中,需要根据具体的场景和需求来选择合适的并行度和异常处理策略,以达到最佳的效果。