"Concurrent.futures vs Multiprocessing in Python 3"

8 浏览
0 Comments

"Concurrent.futures vs Multiprocessing in Python 3"

Python 3.2引入了Concurrent Futures,它似乎是旧的线程和multiprocessing模块的一些高级组合。

在执行CPU绑定任务时,使用它相比旧的多进程模块有什么优缺点?

这篇文章认为使用这个模块更容易,这是真的吗?

admin 更改状态以发布 2023年5月24日
0
0 Comments

在大多数需要并行处理的情况下,您会发现并发模块中的ProcessPoolExecutor类或多进程包中的Pool类提供了同等的功能,这归结于个人偏好。但是它们各自提供了一些使某些处理更加方便的工具。我想我会简单指出一些:

提交多个任务

每个包都有类似于内置mapitertools.starmap函数的模拟体。如果您有一个接受单个参数的工作函数,那么可以使用任一包的map方法提交多个任务:

def worker_function(x):
    # Return the square of the passed argument:
    return x ** 2
# multiprocessing.pool example:
from multiprocessing import Pool
with Pool() as pool:
    squares = pool.map(worker_function, (1, 2, 3, 4, 5, 6))
# concurrent.futures example:
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    squares = list(executor.map(worker_function, (1, 2, 3, 4, 5, 6)))

请注意,multiprocessing.pool.Pool.map方法返回一个列表,而concurrent.futures.ProcessPoolExecutor.map方法返回一个迭代器,就像内置的map方法一样。

两种map方法都接受一个chunksize参数,该参数将提交的任务分批处理为“块”,然后从任务输入队列中取出该“块”,以便池进程在获取队列中的下一个“块”之前处理完所有任务。这会导致更少但输出和读入更大的任务输入队列。对于传递给map方法的大型可迭代对象,将任务分块可以极大地提高性能。

如果未指定,则concurrent.futures.ProcessPoolExecutor的默认chunksize值为1,即不分块。对于multiprocessing.pool.Pool,默认值为None,这会导致类基于池大小和传递的可迭代对象的元素数计算出一个“适当”的chunksize值。在撰写本文时,chunksize值的计算更多或多少是计算为int(math.ceil(iterable_size / (4 * pool_size)))。当使用这些包进行多线程处理(稍后简要讨论),这两个包的默认chunksize值均为1。

如果工作函数需要多个参数,则使用concurrent.futures包要容易一些,因为它的map方法可以传递多个可迭代对象:

def worker_function(x, y):
    return x * y
x_values = (1, 2, 3)
y_values = (9, -2, -8)
with concurrent.futures.ProcessPoolExecutor() as executor:
    results = list(executor.map(worker_function, x_values, y_values))

如果每个参数都有单独的迭代对象,则必须使用multiprocessing包的starmap方法,并将参数“压缩”在一起:

def worker_function(x, y):
    return x * y
x_values = (1, 2, 3)
y_values = (9, -2, -8)
with multiprocessing.Pool() as pool:
    results = pool.starmap(worker_function, zip(x_values, y_values))

如果参数已经被组合在一起,则不必使用内置的zip函数,如下所示:

def worker_function(x, y):
    return x * y
args = (
    (1, 9), # first x, y pair of arguments
    (2, -2),
    (3, -8)
)
with multiprocessing.Pool() as pool:
    results = pool.starmap(worker_function, args)

尽快获取任务结果

当提交一批任务时,有时想要在结果(即返回值)可用时尽快获取任务结果。这两个工具都提供回调机制,以便通知提交的任务的结果可用:

使用 multiprocessing.Pool

import multiprocessing as mp
def worker_process(i):
    return i * i # square the argument
def process_result(return_value):
    print(return_value)
def main():
    pool = mp.Pool()
    for i in range(10):
        pool.apply_async(worker_process, args=(i,), callback=process_result)
    pool.close()
    pool.join()
if __name__ == '__main__':
    main()

使用回调函数和 concurrent.futures 也可以实现同样的功能,但方式相对繁琐:

import concurrent.futures
def worker_process(i):
    return i * i # square the argument
def process_result(future):
    print(future.result())
def main():
    executor = concurrent.futures.ProcessPoolExecutor()
    futures = [executor.submit(worker_process, i) for i in range(10)]
    for future in futures:
        future.add_done_callback(process_result)
    executor.shutdown()
if __name__ == '__main__':
    main()

在这种方式下,每个任务都需要单独提交,返回一个 Future 实例,然后需要将回调函数添加到该 Future 实例中。最终,当回调函数被调用时,传递的参数是已经完成的任务对应的 Future 实例,必须调用方法 result 获取实际的返回值。但实际上,使用 concurrent.futures 模块时,完全没有必要使用回调函数。可以使用 as_completed 方法实现:

import concurrent.futures
def worker_process(i):
    return i * i # square the argument
def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = [executor.submit(worker_process, i) for i in range(10)]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
if __name__ == '__main__':
    main()

通过使用字典来保存 Future 实例,容易将返回值与传递给 worker_process 的参数关联起来:

import concurrent.futures
def worker_process(i):
    return i * i # square the argument
def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = {executor.submit(worker_process, i): i for i in range(10)}
        for future in concurrent.futures.as_completed(futures):
            i = futures[future] # retrieve the value that was squared
            print(i, future.result())
if __name__ == '__main__':
    main()

multiprocessing.Pool 提供了 imapimap_unordered 两个方法,其中后者允许任务结果以任意顺序返回,但不保证按完成顺序返回。这些方法被认为是 map 方法的较懒惰版本。对于方法 map,如果传递的可迭代对象没有 __len__ 属性,则首先将其转换为一个 list,并使用其长度计算有效的 chunksize 值(如果没有提供 chunksize 参数,则使用默认值 None)。因此,使用生成器或生成器表达式作为可迭代对象不能实现任何存储优化。但在使用方法 imapimap_unordered 时,可迭代对象可以是生成器或生成器表达式;它将按需迭代以产生新的待提交任务。但这需要默认的 chunksize 参数为 1,因为通常不知道可迭代对象的长度。但是,如果您对可迭代对象的长度有很好的近似值(或者与下面的示例中的确切大小相同),那么可以使用与 multiprocessing.Pool 类相同的算法提供合理的 chunksize 值:

import multiprocessing as mp
def worker_process(i):
    return i * i # square the argument
def compute_chunksize(pool_size, iterable_size):
    if iterable_size == 0:
        return 0
    chunksize, extra = divmod(iterable_size, pool_size * 4)
    if extra:
        chunksize += 1
    return chunksize
def main():
    cpu_count = mp.cpu_count()
    N = 100
    chunksize = compute_chunksize(cpu_count, N)
    with mp.Pool() as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunksize):
            print(result)
if __name__ == '__main__':
    main()

但是,使用 imap_unordered 没有一种简单的方式将结果与提交的作业联系起来,除非工作进程返回原始调用参数和返回值。另一方面,为 imap_unorderedimap 指定一个 chunksize 能够以可预测的顺序返回结果,应该比反复调用 apply_async 方法更有效,后者基本上相当于使用 chunksize 值为 1。但是,如果确实需要按完成顺序处理结果,则必须使用具有回调函数的方法 apply_async。然而,根据实验结果,如果使用值为 1 的 chunksize 来调用 imap_unordered,则结果将按完成顺序返回。

ProcessPoolExecutormap方法最好与multiprocessing包中的Pool.imap方法进行比较,因为它们有几个相似之处和一个重要的区别。相似之处:首先,该方法不会将其传递的生成器表达式的输入可迭代参数转换为列表,以计算有效的块大小值,这就是为什么块大小参数默认为1并且如果您传递大的可迭代对象,则应考虑指定适当的块大小值。其次,ProcessPoolExecutor.map方法返回一个结果可迭代对象,需要迭代以检索来自工作函数的所有返回值,这些结果可以在生成后立即使用,除了一个区别:与Pool.imap方法不同,ProcessPoolExecuter.map方法直到已迭代传递给它的输入可迭代对象的所有元素并将它们放在任务队列中后才返回其结果可迭代对象,即输入可迭代对象不懒惰评估。因此,直到发生这种情况,您无法开始从工作函数检索结果,即使在迭代和排队所有输入任务之前可能已生成许多结果。它还表明,如果您有一个情况,可以更快地生成输入,而工作函数的结果生成可能会导致输入任务队列存储需求非常大。

提交任务并阻塞,直到完成

multiprocessing.Pool类具有一种方法apply,它将任务提交到池中并阻塞,直到结果准备就绪。返回值仅为传递给apply函数的工作函数的返回值。例如:

import multiprocessing as mp
def worker_process(i):
    return i * i # square the argument
def main():
    with mp.Pool() as pool:
        print(pool.apply(worker_process, args=(6,)))
        print(pool.apply(worker_process, args=(4,)))
if __name__ == '__main__':
    main()

concurrent.futures.ProcessPoolExecutor类没有这样的等效项。您必须先发出submit,然后对返回的Future实例调用result。这并不是什么困难,但Pool.apply方法更方便,它适用于需要阻塞任务提交的用例。这种情况发生在需要线程处理的处理中,因为线程中大部分工作都是严重依赖I/O的,除了可能一个非常CPU绑定的函数。创建线程的主程序首先创建一个multiprocessing.Pool实例,并将其作为参数传递给所有线程。当线程需要调用严重CPU绑定的函数时,现在使用Pool.apply方法运行该函数,从而在另一个进程中运行代码并释放当前进程以允许其他线程运行。

多处理还是多线程?

很多人关注了 concurrent.futures 模块有两个拥有相同接口的类,ProcessPoolExecutorThreadPoolExecutor。这是一个很好的特性。但是 multiprocessing 模块也有一个未记录文档的 ThreadPool 类,其接口与 Pool 相同:

>>> from multiprocessing.pool import Pool
>>> from multiprocessing.pool import ThreadPool
>>> dir(Pool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>> dir(ThreadPool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>>

注意,你也可以用以下方法执行多线程操作:

# This Pool is a function with the same interface as the
# multiprocessing.pool.ThreadPool.__init__ initializer and returns a
# mulitprocessing.pool.ThreadPool instance:
from multiprocessing.dummy import Pool

提交单个任务可以使用 ProcessPoolExecutor.submit,该方法返回一个 Future 实例,或者使用 Pool.apply_async,该方法返回一个 AsyncResult 实例,并指定超时值以获取结果:

from concurrent.futures import ProcessPoolExecutor, TimeoutError
from time import sleep
def worker_1():
    while True:
        print('hanging')
        sleep(1)
def main():
    with ProcessPoolExecutor(1) as pool:
        future = pool.submit(worker_1)
        try:
            future.result(3) # kill task after 3 seconds?
        except TimeoutError:
            print('timeout')
if __name__ == '__main__':
    main()
    print("return from main()")

输出:

hanging
hanging
hanging
timeout
hanging
hanging
hanging
hanging
hanging
hanging
hanging
etc.

当主进程在调用 future.result(3) 后3秒后任务还未完成,则会抛出 TimeoutError 异常。但是任务仍然在运行,占用着进程。因此 with ProcessPoolExecutor(1) as pool: 无法退出,程序也无法停止。

from multiprocessing import Pool, TimeoutError
from time import sleep
def worker_1():
    while True:
        print('hanging')
        sleep(1)
def main():
    with Pool(1) as pool:
        result = pool.apply_async(worker_1, args=())
        try:
            result.get(3) # kill task after 3 seconds?
        except TimeoutError:
            print('timeout')
if __name__ == '__main__':
    main()
    print("return from main()")

输出:

hanging
hanging
hanging
timeout
return from main()

然而,这次即使已经超时的任务仍在运行且占用着进程,with 语句块也能正常退出,程序正常终止。原因在于,Pool 实例的上下文管理器在块退出时会执行 terminate 调用,从而立即终止池中所有进程。这与 ProcessPoolExecutor 实例的上下文管理器形成对比,其在管理块退出时执行调用 shutdown(wait=True),在等待池中所有进程终止。如果你正在使用上下文管理器来处理池终止和超时可能存在的情况,看起来更适用于 multiprocessing.Pool。更新:在 Python 3.9 中,shutdown 方法新加了一个参数 `cancel_futures`。因此,如果显式调用 shutdown(wait=False, cancel_futures=True),就可以终止所有正在运行和等待运行的任务,而不必依赖使用上下文管理器时隐式调用的 shutdown 的默认行为。

但由于 multiprocessing.Pool 的上下文管理器只调用 terminate,而不是先调用 close,再调用 join,所以在退出 with 块之前,你必须确保所有已提交的作业都已完成。例如,通过使用阻塞同步调用(例如 map)提交作业,或通过在调用 apply_async 返回的 AsyncResult 对象上调用 get,迭代调用 imap 的结果,或在池实例上调用 close 后再调用 join 确保完成所有作业。

尽管使用ProcessPoolExecutor时无法在超时任务完成之前退出,但是您可以取消尚未运行的提交任务的启动。在以下演示中,我们有一个大小为1的池,因此作业只能连续运行。我们依次提交3个作业,其中前两个作业由于调用time.sleep(3)而需要3秒才能运行。我们立即尝试取消前两个作业。第一次尝试取消失败,因为第一个作业已经在运行。但是由于池只有一个进程,第二个作业必须等待3秒钟才能开始运行,直到第一个作业完成,因此取消成功。最后,作业3将在作业1完成后几乎立即开始并结束,这将是我们开始作业提交后约3秒钟:

from concurrent.futures import ProcessPoolExecutor
import time
def worker1(i):
    time.sleep(3)
    print('Done', i)
def worker2():
    print('Hello')
def main():
    with ProcessPoolExecutor(max_workers=1) as executor:
        t = time.time()
        future1 = executor.submit(worker1, 1)
        future2 = executor.submit(worker1, 2)
        future3 = executor.submit(worker2)
        # this will fail since this task is already running:
        print(future1.cancel())
        # this will succeed since this task hasn't started (it's waiting for future1 to complete):
        print(future2.cancel())
        future3.result() # wait for completion
        print(time.time() - t)
if __name__ == '__main__':
    main()

打印:

False
True
Done 1
Hello
3.1249606609344482

0
0 Comments

我不认为concurrent.futures更"高级" - 它是一个简单的接口,无论您使用多个线程还是多个进程作为底层并行化策略,它都可以很好地工作。

所以,就像几乎所有的"简单接口"一样,涉及到很多相同的权衡:它的学习曲线较浅,主要是因为要学习的东西太少;但是,因为它提供的选项较少,可能最终会以富有经验的接口无法达到的方式使你感到沮丧。

就CPU密集型任务而言,这要求得更加具体才能说出有意义的东西。对于CPU密集型任务在CPython下,你需要多进程而不是多线程才有可能获得加速。但是,你能获得多少(如果有的话)加速取决于你的硬件、你的操作系统,特别是你的具体任务所需的进程间通信量。在底层,所有的进程间并行化策略都依赖于相同的操作系统原语 - 你用来获取这些原语的高级API不是底线速度的主要因素。

编辑:例子

这是你引用文章中显示的最终代码,但我添加了一个需要使它工作的导入语句:

from concurrent.futures import ProcessPoolExecutor
def pool_factorizer_map(nums, nprocs):
    # Let the executor divide the work among processes by using 'map'.
    with ProcessPoolExecutor(max_workers=nprocs) as executor:
        return {num:factors for num, factors in
                                zip(nums,
                                    executor.map(factorize_naive, nums))}

这是完全相同的,只是使用multiprocessing而不是:

import multiprocessing as mp
def mp_factorizer_map(nums, nprocs):
    with mp.Pool(nprocs) as pool:
        return {num:factors for num, factors in
                                zip(nums,
                                    pool.map(factorize_naive, nums))}

请注意,使用multiprocessing.Pool对象作为上下文管理器是在Python 3.3中添加的。

至于哪个更容易使用,它们本质上是完全相同的。

一个区别是Pool支持很多不同的做事方式,你可能意识不到它有多么容易,直到你爬上了学习曲线。

再次强调,所有这些不同的方式既是一种优点,也是一种缺点。它们的优点在于在某些情况下可能需要灵活性。它们的缺点在于"最好只有一种明显的方法来做它"。一个完全(如果有可能)只使用concurrent.futures的项目在长期维护方面可能会更容易,因为它的最小API如何使用缺乏不必要的新颖性。

0