使用asyncio.Queue实现生产者-消费者流程

8 浏览
0 Comments

使用asyncio.Queue实现生产者-消费者流程

我对如何在同时和独立操作的生产者-消费者模式中使用asyncio.Queue感到困惑。

首先,考虑一下这个例子,它紧密地遵循了asyncio.Queue文档中的示例:

import asyncio
import random
import time
async def worker(name, queue):
    while True:
        sleep_for = await queue.get()
        await asyncio.sleep(sleep_for)
        queue.task_done()
        print(f'{name} has slept for {sleep_for:0.2f} seconds')
async def main(n):
    queue = asyncio.Queue()
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)
    tasks = []
    for i in range(n):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at
    for task in tasks:
        task.cancel()
    await asyncio.gather(*tasks, return_exceptions=True)
    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
if __name__ == '__main__':
    import sys
    n = 3 if len(sys.argv) == 1 else sys.argv[1]
    asyncio.run(main())

这个脚本有一个细节:项目是同步放入队列中的,使用queue.put_nowait(sleep_for)在传统的for循环中。

我的目标是创建一个使用async def worker()(或consumer())和async def producer()的脚本。两者都应该被安排并发运行。没有一个消费者协程明确地与生产者绑定或链接。

我应该如何修改上面的程序,使生产者成为自己的协程,可以与消费者/工并发调度?

0