使用asyncio.Queue实现生产者-消费者流程
使用asyncio.Queue实现生产者-消费者流程
我对如何在同时和独立操作的生产者-消费者模式中使用asyncio.Queue
感到困惑。
首先,考虑一下这个例子,它紧密地遵循了asyncio.Queue
文档中的示例:
import asyncio import random import time async def worker(name, queue): while True: sleep_for = await queue.get() await asyncio.sleep(sleep_for) queue.task_done() print(f'{name} has slept for {sleep_for:0.2f} seconds') async def main(n): queue = asyncio.Queue() total_sleep_time = 0 for _ in range(20): sleep_for = random.uniform(0.05, 1.0) total_sleep_time += sleep_for queue.put_nowait(sleep_for) tasks = [] for i in range(n): task = asyncio.create_task(worker(f'worker-{i}', queue)) tasks.append(task) started_at = time.monotonic() await queue.join() total_slept_for = time.monotonic() - started_at for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) print('====') print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') print(f'total expected sleep time: {total_sleep_time:.2f} seconds') if __name__ == '__main__': import sys n = 3 if len(sys.argv) == 1 else sys.argv[1] asyncio.run(main())
这个脚本有一个细节:项目是同步放入队列中的,使用queue.put_nowait(sleep_for)
在传统的for循环中。
我的目标是创建一个使用async def worker()
(或consumer()
)和async def producer()
的脚本。两者都应该被安排并发运行。没有一个消费者协程明确地与生产者绑定或链接。
我应该如何修改上面的程序,使生产者成为自己的协程,可以与消费者/工并发调度?