使用多进程队列、线程池和锁的简单示例

10 浏览
0 Comments

使用多进程队列、线程池和锁的简单示例

我尝试阅读http://docs.python.org/dev/library/multiprocessing.html上的文档,但我仍然对多进程队列、池和锁的概念感到困惑。目前,我能够构建下面的示例。\n关于队列和池,我不确定我是否正确理解了这个概念,如果我理解错了,请纠正我。我想要实现的目标是同时处理两个请求(在这个示例中,数据列表有8个),那么我应该使用什么?使用池创建两个进程,每个进程可以处理两个不同的队列(最多2个),还是只使用队列每次处理两个输入?锁的作用是正确地打印输出。\n

import multiprocessing
import time
data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
        ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)
def mp_handler(var1):
    for indata in var1:
        p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
        p.start()
def mp_worker(inputs, the_time):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs
if __name__ == '__main__':
    mp_handler(data)

0
0 Comments

这是一个使用多进程队列进行并行处理的简单示例。这个问题的出现是因为作者在搜索使用多进程和队列的示例时,这篇文章在谷歌搜索结果中排名第一。下面是解决这个问题的方法。

问题的解决方法是创建一个名为Renderer的类,该类可以实例化,并将项目放入队列中,然后等待队列完成。代码如下:

from multiprocessing import JoinableQueue
from multiprocessing.context import Process
class Renderer:
    queue = None
    
    def __init__(self, nb_workers=2):
        self.queue = JoinableQueue()
        self.processes = [Process(target=self.upload) for i in range(nb_workers)]
        for p in self.processes:
            p.start()
    
    def render(self, item):
        self.queue.put(item)
    
    def upload(self):
        while True:
            item = self.queue.get()
            if item is None:
                break
            # process your item here
            self.queue.task_done()
    
    def terminate(self):
        """ wait until queue is empty and terminate processes """
        self.queue.join()
        for p in self.processes:
            p.terminate()
r = Renderer()
r.render(item1)
r.render(item2)
r.terminate()

在代码中,item1item2是任务或输入参数,它们将在两个不同的进程中并行执行。

0
0 Comments

多进程编程是一种常见的并发编程技术,可以有效地提高程序的执行效率。然而,在实际应用中,多进程编程也会遇到一些问题,比如多个进程同时输出到终端可能导致输出混乱的情况。本文将介绍一个使用多进程队列、进程池和锁的简单示例,并解决输出混乱的问题。

首先,我们来看一下示例代码:

import multiprocessing
import sys
THREADS = 3
# Used to prevent multiple threads from mixing thier output
GLOBALLOCK = multiprocessing.Lock()
def func_worker(args):
    """This function will be called by each thread.
    This function can not be a class method.
    """
    # Expand list of args into named args.
    str1, str2 = args
    del args
    # Work
    # ...
    # Serial-only Portion
    GLOBALLOCK.acquire()
    print(str1)
    print(str2)
    GLOBALLOCK.release()
def main(argp=None):
    """Multiprocessing Spawn Example
    """
    # Create the number of threads you want
    pool = multiprocessing.Pool(THREADS)
    # Define two jobs, each with two args.
    func_args = [
        ('Hello', 'World',), 
        ('Goodbye', 'World',), 
    ]
    try:
        pool.map_async(func_worker, func_args).get()
    except KeyboardInterrupt:
        # Allow ^C to interrupt from any thread.
        sys.stdout.write('\033[0m')
        sys.stdout.write('User Interupt\n')
    pool.close()
if __name__ == '__main__':
    main()

在这个例子中,我们创建了一个进程池,然后定义了两个任务,每个任务有两个参数。每个任务的执行函数是`func_worker`,这个函数会被每个进程调用。

`func_worker`函数首先将传入的参数解包,并进行一些工作。然后,为了防止多个进程同时输出导致输出混乱,我们使用了一个全局锁`GLOBALLOCK`。在输出之前,我们先获取锁,然后输出参数的值,最后释放锁。

在`main`函数中,我们使用`pool.map_async(func_worker, func_args).get()`来启动任务的执行。`map_async`函数将任务分配给进程池中的进程,并返回一个异步结果对象。我们通过调用`get()`方法来等待所有任务执行完成,并获取结果。

为了防止程序无法退出,我们在捕获到`KeyboardInterrupt`异常时,调用`pool.close()`方法关闭进程池。

至此,我们已经完成了一个简单的使用多进程队列、进程池和锁的示例。通过使用锁来控制输出的顺序,我们避免了输出混乱的问题。

关于`.map_async()`函数,有一些问题需要解答。`.map_async()`和`.map()`函数有什么区别?`.get()`方法的参数`timeout`是用来做什么的?是否可以只调用`.get()`方法来获取结果?

`.map_async()`函数和`.map()`函数的区别在于返回值的类型不同。`.map_async()`返回的是一个异步结果对象,而`.map()`直接返回结果列表。`.get()`方法用于等待异步结果对象的执行完成,并返回结果。

`.get()`方法的参数`timeout`是设置等待的超时时间的。如果设置了超时时间,那么在超过指定时间后,`get()`方法会抛出一个`TimeoutError`异常。如果没有设置超时时间,则`get()`方法会一直等待直到所有结果都可用,并返回结果列表。

因此,可以使用`.get()`方法来获取完成的结果列表,也可以使用轮询循环来检查结果是否可用。此外,还可以在`map_async()`调用中传递一个回调函数,当结果可用时会调用该函数。

通过使用多进程队列、进程池和锁,我们可以简单地解决多进程输出混乱的问题。这种方法可以提高程序的执行效率,并确保输出的准确性。

0
0 Comments

使用多进程队列(Multiprocessing Queue)、进程池(Pool)和锁(Lock)是解决问题的最佳方案。通过使用进程池,可以最大程度地减少对原始代码的修改。下面是稍作修改后的程序,使用了进程池,并将进程数限制为2:

import multiprocessing
import time
data = (
    ['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
    ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)
def mp_worker(inputs, the_time):
    print "Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print "Process %s\tDONE" % inputs
def mp_handler():
    p = multiprocessing.Pool(2)
    p.map(mp_worker, data)
if __name__ == '__main__':
    mp_handler()

注意,`mp_worker()`函数现在接受一个参数(一个包含两个先前参数的元组),因为`map()`函数将输入数据分块为子列表,每个子列表作为单个参数传递给工作函数。

输出结果为:

Processs a  Waiting 2 seconds
Processs b  Waiting 4 seconds
Process a   DONE
Processs c  Waiting 6 seconds
Process b   DONE
Processs d  Waiting 8 seconds
Process c   DONE
Processs e  Waiting 1 seconds
Process e   DONE
Processs f  Waiting 3 seconds
Process d   DONE
Processs g  Waiting 5 seconds
Process f   DONE
Processs h  Waiting 7 seconds
Process g   DONE
Process h   DONE

如果想要每个进程池都有一个锁,使得进程成对运行,例如:

A waiting B waiting | A done , B done | C waiting , D waiting | C done, D done | ...

可以将处理函数更改为针对每对数据启动进程池(2个进程):

def mp_handler():
    subdata = zip(data[0::2], data[1::2])
    for task1, task2 in subdata:
        p = multiprocessing.Pool(2)
        p.map(mp_worker, (task1, task2))

现在输出结果为:

Processs a Waiting 2 seconds
Processs b Waiting 4 seconds
Process a  DONE
Process b  DONE
Processs c Waiting 6 seconds
Processs d Waiting 8 seconds
Process c  DONE
Process d  DONE
Processs e Waiting 1 seconds
Processs f Waiting 3 seconds
Process e  DONE
Process f  DONE
Processs g Waiting 5 seconds
Processs h Waiting 7 seconds
Process g  DONE
Process h  DONE

如果想要处理的数据不是成对的,而是任意个数,可以修改代码来适应不同的进程池大小:

def mp_handler():
    pool_size = 2
    subdata = [data[i:i+pool_size] for i in range(0, len(data), pool_size)]
    for chunk in subdata:
        p = multiprocessing.Pool(pool_size)
        p.map(mp_worker, chunk)

这样,无论数据列表中有多少个值,都可以调整进程池的大小来处理列表。

希望这些代码能够帮助你解决问题。如果你有其他问题,请在Stack Overflow上提问,可能会有其他人能够给出更好的答案。

0