使用`Concurrent`的线程池执行程序:对于不同数量的工作者,没有改进。

9 浏览
0 Comments

使用`Concurrent`的线程池执行程序:对于不同数量的工作者,没有改进。

我正在尝试使用Concurrent实现并行任务。请在下面找到一个示例代码:\n

import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
import concurrent.futures
# 获取CPU数量
cpu_num = len(os.sched_getaffinity(0))
print("可用的CPU数量:", cpu_num)
# 最大工作线程数
max_Worker = 1
# 一个虚拟输入数组
n = 1000000
array = list(range(n))
results = []
# 一个虚拟的函数应用于数组中的每个元素
def task(i):
    return i**2
x = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_Worker) as executor:
    features = {executor.submit(task, j) for j in array}
    
    # 实际的函数很重,并且我们需要确保每次运行都完成
    for future in concurrent.futures.as_completed(features):
        results.append(future.result())
results = [future.result() for future in features]
y = time.time()
print('=========================================')
print(f"训练数据准备时间(秒):{(y-x)}")
print('=========================================')

\n现在是我的问题:\n

    \n

  1. 虽然没有错误,但这段代码是否正确/优化?
  2. \n

  3. 在调整工作线程数时,速度没有改善(例如,1个线程与16个线程之间没有区别)。那么问题出在哪里,如何解决?
  4. \n

\n提前感谢。

0
0 Comments

使用并发的线程池执行器(Thread Pool Executor using Concurrent: no improvement for various number of workers)

在上述内容中,我们可以看到通过使用并发的线程池执行器来处理任务,但是无论使用多少个工作线程,都没有提升性能。下面我们将整理出问题的出现原因和解决方法,并将内容整理成一篇文章。

问题出现的原因:

- 在之前的评论中提到的开销之外,还需要考虑创建进程池本身的开销。

- 使用默认的chunksize值为1时,对于大量的任务提交,性能并不高效。

- 虚拟机的32个虚拟CPU比本地计算机的8个CPU性能更慢。

解决方法:

- 可以尝试使用具有更智能的默认chunksize值的multiprocessing包中的multiprocessing.pool.Pool类。

- 对于大量的任务,可以尝试使用显式指定chunksize值的map方法。

- 可以尝试使用imap和imap_unordered方法,并根据适当的chunksize值使用生成器来处理任务。

下面是一段示例代码,演示了使用并发的线程池执行器处理任务的过程:

import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
from multiprocessing import Pool
# 定义一个任务函数
def task(i):
    return i**2
# required for Windows:
if __name__ == '__main__':
    # 设置任务数量
    n = 100
    # 获取CPU数量
    cpu_num = os.cpu_count()
    print("Number of CPUs available: ",cpu_num)
    # 串行执行任务
    t1 = time.time()
    results = [task(i) for i in range(n)]
    print('Non-multiprocessing time:', time.time() - t1, results[-1])
    # 使用ProcessPoolExecutor的submit方法执行任务
    t1 = time.time()
    with PE(max_workers=cpu_num) as executor:
        futures = [executor.submit(task, i) for i in range(n)]
        results = [future.result() for future in futures]
    print('Multiprocessing time using submit:', time.time() - t1,  results[-1])
    # 使用ProcessPoolExecutor的map方法执行任务
    t1 = time.time()
    with PE(max_workers=cpu_num) as executor:
        results = list(executor.map(task, range(n)))
    print('Multiprocessing time using map:', time.time() - t1, results[-1])
    # 使用ProcessPoolExecutor的map方法和指定chunksize执行任务
    chunksize = n // (4 * cpu_num)
    t1 = time.time()
    with PE(max_workers=cpu_num) as executor:
        results = list(executor.map(task, range(n), chunksize=chunksize))
    print(f'Multiprocessing time using map: {time.time() - t1}, chunksize: {chunksize}', results[-1])
    # 使用multiprocessing.pool.Pool的map方法执行任务
    t1 = time.time()
    with Pool(cpu_num) as executor:
        results = executor.map(task, range(n))
    print('Multiprocessing time using Pool.map:', time.time() - t1, results[-1])

打印结果如下:

Non-multiprocessing time: 0.027019739151000977 9999800001
Number of CPUs available:  8
Multiprocessing time using submit: 77.34723353385925 9999800001
Multiprocessing time using map: 79.52981925010681 9999800001
Multiprocessing time using map: 0.30500149726867676, chunksize: 3125 9999800001
Multiprocessing time using Pool.map: 0.2799997329711914 9999800001

通过以上的内容,我们可以看到在使用并发的线程池执行器时,无论使用多少个工作线程,都没有提升性能。为了解决这个问题,我们可以尝试使用multiprocessing包中的multiprocessing.pool.Pool类,并指定合适的chunksize值。另外,对于大量的任务,我们也可以尝试使用imap和imap_unordered方法,并根据适当的chunksize值使用生成器来处理任务。同时,我们还可以考虑虚拟机性能较差的问题,可能需要优化虚拟机的配置或者调整任务的分配策略。

这些解决方法可以提高并行任务处理的性能,并减少执行时间。希望以上内容对您有所帮助。

0