使用多进程队列、线程池和锁的简单示例
使用多进程队列、线程池和锁的简单示例
我尝试阅读http://docs.python.org/dev/library/multiprocessing.html上的文档,但我仍然对多进程队列、池和锁的概念感到困惑。目前,我能够构建下面的示例。\n关于队列和池,我不确定我是否正确理解了这个概念,如果我理解错了,请纠正我。我想要实现的目标是同时处理两个请求(在这个示例中,数据列表有8个),那么我应该使用什么?使用池创建两个进程,每个进程可以处理两个不同的队列(最多2个),还是只使用队列每次处理两个输入?锁的作用是正确地打印输出。\n
import multiprocessing import time data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'], ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7'] ) def mp_handler(var1): for indata in var1: p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1])) p.start() def mp_worker(inputs, the_time): print " Processs %s\tWaiting %s seconds" % (inputs, the_time) time.sleep(int(the_time)) print " Process %s\tDONE" % inputs if __name__ == '__main__': mp_handler(data)
这是一个使用多进程队列进行并行处理的简单示例。这个问题的出现是因为作者在搜索使用多进程和队列的示例时,这篇文章在谷歌搜索结果中排名第一。下面是解决这个问题的方法。
问题的解决方法是创建一个名为Renderer的类,该类可以实例化,并将项目放入队列中,然后等待队列完成。代码如下:
from multiprocessing import JoinableQueue from multiprocessing.context import Process class Renderer: queue = None def __init__(self, nb_workers=2): self.queue = JoinableQueue() self.processes = [Process(target=self.upload) for i in range(nb_workers)] for p in self.processes: p.start() def render(self, item): self.queue.put(item) def upload(self): while True: item = self.queue.get() if item is None: break # process your item here self.queue.task_done() def terminate(self): """ wait until queue is empty and terminate processes """ self.queue.join() for p in self.processes: p.terminate() r = Renderer() r.render(item1) r.render(item2) r.terminate()
在代码中,item1
和item2
是任务或输入参数,它们将在两个不同的进程中并行执行。
多进程编程是一种常见的并发编程技术,可以有效地提高程序的执行效率。然而,在实际应用中,多进程编程也会遇到一些问题,比如多个进程同时输出到终端可能导致输出混乱的情况。本文将介绍一个使用多进程队列、进程池和锁的简单示例,并解决输出混乱的问题。
首先,我们来看一下示例代码:
import multiprocessing import sys THREADS = 3 # Used to prevent multiple threads from mixing thier output GLOBALLOCK = multiprocessing.Lock() def func_worker(args): """This function will be called by each thread. This function can not be a class method. """ # Expand list of args into named args. str1, str2 = args del args # Work # ... # Serial-only Portion GLOBALLOCK.acquire() print(str1) print(str2) GLOBALLOCK.release() def main(argp=None): """Multiprocessing Spawn Example """ # Create the number of threads you want pool = multiprocessing.Pool(THREADS) # Define two jobs, each with two args. func_args = [ ('Hello', 'World',), ('Goodbye', 'World',), ] try: pool.map_async(func_worker, func_args).get() except KeyboardInterrupt: # Allow ^C to interrupt from any thread. sys.stdout.write('\033[0m') sys.stdout.write('User Interupt\n') pool.close() if __name__ == '__main__': main()
在这个例子中,我们创建了一个进程池,然后定义了两个任务,每个任务有两个参数。每个任务的执行函数是`func_worker`,这个函数会被每个进程调用。
`func_worker`函数首先将传入的参数解包,并进行一些工作。然后,为了防止多个进程同时输出导致输出混乱,我们使用了一个全局锁`GLOBALLOCK`。在输出之前,我们先获取锁,然后输出参数的值,最后释放锁。
在`main`函数中,我们使用`pool.map_async(func_worker, func_args).get()`来启动任务的执行。`map_async`函数将任务分配给进程池中的进程,并返回一个异步结果对象。我们通过调用`get()`方法来等待所有任务执行完成,并获取结果。
为了防止程序无法退出,我们在捕获到`KeyboardInterrupt`异常时,调用`pool.close()`方法关闭进程池。
至此,我们已经完成了一个简单的使用多进程队列、进程池和锁的示例。通过使用锁来控制输出的顺序,我们避免了输出混乱的问题。
关于`.map_async()`函数,有一些问题需要解答。`.map_async()`和`.map()`函数有什么区别?`.get()`方法的参数`timeout`是用来做什么的?是否可以只调用`.get()`方法来获取结果?
`.map_async()`函数和`.map()`函数的区别在于返回值的类型不同。`.map_async()`返回的是一个异步结果对象,而`.map()`直接返回结果列表。`.get()`方法用于等待异步结果对象的执行完成,并返回结果。
`.get()`方法的参数`timeout`是设置等待的超时时间的。如果设置了超时时间,那么在超过指定时间后,`get()`方法会抛出一个`TimeoutError`异常。如果没有设置超时时间,则`get()`方法会一直等待直到所有结果都可用,并返回结果列表。
因此,可以使用`.get()`方法来获取完成的结果列表,也可以使用轮询循环来检查结果是否可用。此外,还可以在`map_async()`调用中传递一个回调函数,当结果可用时会调用该函数。
通过使用多进程队列、进程池和锁,我们可以简单地解决多进程输出混乱的问题。这种方法可以提高程序的执行效率,并确保输出的准确性。
使用多进程队列(Multiprocessing Queue)、进程池(Pool)和锁(Lock)是解决问题的最佳方案。通过使用进程池,可以最大程度地减少对原始代码的修改。下面是稍作修改后的程序,使用了进程池,并将进程数限制为2:
import multiprocessing import time data = ( ['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'], ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7'] ) def mp_worker(inputs, the_time): print "Processs %s\tWaiting %s seconds" % (inputs, the_time) time.sleep(int(the_time)) print "Process %s\tDONE" % inputs def mp_handler(): p = multiprocessing.Pool(2) p.map(mp_worker, data) if __name__ == '__main__': mp_handler()
注意,`mp_worker()`函数现在接受一个参数(一个包含两个先前参数的元组),因为`map()`函数将输入数据分块为子列表,每个子列表作为单个参数传递给工作函数。
输出结果为:
Processs a Waiting 2 seconds Processs b Waiting 4 seconds Process a DONE Processs c Waiting 6 seconds Process b DONE Processs d Waiting 8 seconds Process c DONE Processs e Waiting 1 seconds Process e DONE Processs f Waiting 3 seconds Process d DONE Processs g Waiting 5 seconds Process f DONE Processs h Waiting 7 seconds Process g DONE Process h DONE
如果想要每个进程池都有一个锁,使得进程成对运行,例如:
A waiting B waiting | A done , B done | C waiting , D waiting | C done, D done | ...
可以将处理函数更改为针对每对数据启动进程池(2个进程):
def mp_handler(): subdata = zip(data[0::2], data[1::2]) for task1, task2 in subdata: p = multiprocessing.Pool(2) p.map(mp_worker, (task1, task2))
现在输出结果为:
Processs a Waiting 2 seconds Processs b Waiting 4 seconds Process a DONE Process b DONE Processs c Waiting 6 seconds Processs d Waiting 8 seconds Process c DONE Process d DONE Processs e Waiting 1 seconds Processs f Waiting 3 seconds Process e DONE Process f DONE Processs g Waiting 5 seconds Processs h Waiting 7 seconds Process g DONE Process h DONE
如果想要处理的数据不是成对的,而是任意个数,可以修改代码来适应不同的进程池大小:
def mp_handler(): pool_size = 2 subdata = [data[i:i+pool_size] for i in range(0, len(data), pool_size)] for chunk in subdata: p = multiprocessing.Pool(pool_size) p.map(mp_worker, chunk)
这样,无论数据列表中有多少个值,都可以调整进程池的大小来处理列表。
希望这些代码能够帮助你解决问题。如果你有其他问题,请在Stack Overflow上提问,可能会有其他人能够给出更好的答案。