使用`Concurrent`的线程池执行程序:对于不同数量的工作者,没有改进。
使用`Concurrent`的线程池执行程序:对于不同数量的工作者,没有改进。
我正在尝试使用Concurrent实现并行任务。请在下面找到一个示例代码:\n
import os import time from concurrent.futures import ProcessPoolExecutor as PE import concurrent.futures # 获取CPU数量 cpu_num = len(os.sched_getaffinity(0)) print("可用的CPU数量:", cpu_num) # 最大工作线程数 max_Worker = 1 # 一个虚拟输入数组 n = 1000000 array = list(range(n)) results = [] # 一个虚拟的函数应用于数组中的每个元素 def task(i): return i**2 x = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=max_Worker) as executor: features = {executor.submit(task, j) for j in array} # 实际的函数很重,并且我们需要确保每次运行都完成 for future in concurrent.futures.as_completed(features): results.append(future.result()) results = [future.result() for future in features] y = time.time() print('=========================================') print(f"训练数据准备时间(秒):{(y-x)}") print('=========================================')
\n现在是我的问题:\n
- \n
- 虽然没有错误,但这段代码是否正确/优化?
- 在调整工作线程数时,速度没有改善(例如,1个线程与16个线程之间没有区别)。那么问题出在哪里,如何解决?
\n
\n
\n提前感谢。
使用并发的线程池执行器(Thread Pool Executor using Concurrent: no improvement for various number of workers)
在上述内容中,我们可以看到通过使用并发的线程池执行器来处理任务,但是无论使用多少个工作线程,都没有提升性能。下面我们将整理出问题的出现原因和解决方法,并将内容整理成一篇文章。
问题出现的原因:
- 在之前的评论中提到的开销之外,还需要考虑创建进程池本身的开销。
- 使用默认的chunksize值为1时,对于大量的任务提交,性能并不高效。
- 虚拟机的32个虚拟CPU比本地计算机的8个CPU性能更慢。
解决方法:
- 可以尝试使用具有更智能的默认chunksize值的multiprocessing包中的multiprocessing.pool.Pool类。
- 对于大量的任务,可以尝试使用显式指定chunksize值的map方法。
- 可以尝试使用imap和imap_unordered方法,并根据适当的chunksize值使用生成器来处理任务。
下面是一段示例代码,演示了使用并发的线程池执行器处理任务的过程:
import os import time from concurrent.futures import ProcessPoolExecutor as PE from multiprocessing import Pool # 定义一个任务函数 def task(i): return i**2 # required for Windows: if __name__ == '__main__': # 设置任务数量 n = 100 # 获取CPU数量 cpu_num = os.cpu_count() print("Number of CPUs available: ",cpu_num) # 串行执行任务 t1 = time.time() results = [task(i) for i in range(n)] print('Non-multiprocessing time:', time.time() - t1, results[-1]) # 使用ProcessPoolExecutor的submit方法执行任务 t1 = time.time() with PE(max_workers=cpu_num) as executor: futures = [executor.submit(task, i) for i in range(n)] results = [future.result() for future in futures] print('Multiprocessing time using submit:', time.time() - t1, results[-1]) # 使用ProcessPoolExecutor的map方法执行任务 t1 = time.time() with PE(max_workers=cpu_num) as executor: results = list(executor.map(task, range(n))) print('Multiprocessing time using map:', time.time() - t1, results[-1]) # 使用ProcessPoolExecutor的map方法和指定chunksize执行任务 chunksize = n // (4 * cpu_num) t1 = time.time() with PE(max_workers=cpu_num) as executor: results = list(executor.map(task, range(n), chunksize=chunksize)) print(f'Multiprocessing time using map: {time.time() - t1}, chunksize: {chunksize}', results[-1]) # 使用multiprocessing.pool.Pool的map方法执行任务 t1 = time.time() with Pool(cpu_num) as executor: results = executor.map(task, range(n)) print('Multiprocessing time using Pool.map:', time.time() - t1, results[-1])
打印结果如下:
Non-multiprocessing time: 0.027019739151000977 9999800001 Number of CPUs available: 8 Multiprocessing time using submit: 77.34723353385925 9999800001 Multiprocessing time using map: 79.52981925010681 9999800001 Multiprocessing time using map: 0.30500149726867676, chunksize: 3125 9999800001 Multiprocessing time using Pool.map: 0.2799997329711914 9999800001
通过以上的内容,我们可以看到在使用并发的线程池执行器时,无论使用多少个工作线程,都没有提升性能。为了解决这个问题,我们可以尝试使用multiprocessing包中的multiprocessing.pool.Pool类,并指定合适的chunksize值。另外,对于大量的任务,我们也可以尝试使用imap和imap_unordered方法,并根据适当的chunksize值使用生成器来处理任务。同时,我们还可以考虑虚拟机性能较差的问题,可能需要优化虚拟机的配置或者调整任务的分配策略。
这些解决方法可以提高并行任务处理的性能,并减少执行时间。希望以上内容对您有所帮助。