使用队列会导致 asyncio 异常 "got Future attached to a different loop"。
使用队列会导致 asyncio 异常 "got Future attached to a different loop"。
我试图使用asyncio队列运行这段简单的代码,但是捕获异常,甚至是嵌套异常。
我希望在使用asyncio创建队列时得到一些帮助,以便正确运行:
import asyncio, logging logging.basicConfig(level=logging.DEBUG) logging.getLogger("asyncio").setLevel(logging.WARNING) num_workers = 1 in_queue = asyncio.Queue() out_queue = asyncio.Queue() tasks = [] async def run(): for request in range(1): await in_queue.put(request) # 每个任务从'in_queue'消费并向'out_queue'生产: for i in range(num_workers): tasks.append(asyncio.create_task(worker(name=f'worker-{i}'))) # tasks.append(asyncio.create_task(saver())) print('waiting for queues...') await in_queue.join() # await out_queue.join() print('all queues done') for task in tasks: task.cancel() print('waiting until all tasks cancelled') await asyncio.gather(*tasks, return_exceptions=True) print('done') async def worker(name): while True: try: print(f"{name} started") num = await in_queue.get() print(f'{name} got {num}') await asyncio.sleep(0) # await out_queue.put(num) except Exception as e: print(f"{name} exception {e}") finally: print(f"{name} ended") in_queue.task_done() async def saver(): while True: try: print("saver started") num = await out_queue.get() print(f'saver got {num}') await asyncio.sleep(0) print("saver ended") except Exception as e: print(f"saver exception {e}") finally: out_queue.task_done() asyncio.run(run(), debug=True) print('Done!')
输出:
waiting for queues... worker-0 started worker-0 got 0 worker-0 ended worker-0 started worker-0 exception worker-0 ended ERROR:asyncio:unhandled exception during asyncio.run() shutdown task:exception=ValueError('task_done() called too many times') created at Python37\lib\asyncio\tasks.py:325> Traceback (most recent call last): File "Python37\lib\asyncio\runners.py", line 43, in run return loop.run_until_complete(main) File "Python37\lib\asyncio\base_events.py", line 573, in run_until_complete return future.result() File "temp4.py", line 23, in run await in_queue.join() File "Python37\lib\asyncio\queues.py", line 216, in join await self._finished.wait() File "Python37\lib\asyncio\locks.py", line 293, in wait await fut RuntimeError: Task cb=[_run_until_complete_cb() at Python37\lib\asyncio\base_events.py:158] created at Python37\lib\asyncio\base_events.py:552> got Future attached to a different loop 在处理上述异常期间,发生了另一个异常: Traceback (most recent call last): File "temp4.py", line 46, in worker in_queue.task_done() File "Python37\lib\asyncio\queues.py", line 202, in task_done raise ValueError('task_done() called too many times') ValueError: task_done() called too many times Traceback (most recent call last): File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1664, in main() File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1658, in main globals = debugger.run(setup['file'], None, None, is_module) File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1068, in run pydev_imports.execfile(file, globals, locals) # execute the script File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile exec(compile(contents+"\n", file, 'exec'), glob, loc) File "temp4.py", line 63, in asyncio.run(run(), debug=True) File "Python37\lib\asyncio\runners.py", line 43, in run return loop.run_until_complete(main) File "Python37\lib\asyncio\base_events.py", line 573, in run_until_complete return future.result() File "temp4.py", line 23, in run await in_queue.join() File "Python37\lib\asyncio\queues.py", line 216, in join await self._finished.wait() File "Python37\lib\asyncio\locks.py", line 293, in wait await fut RuntimeError: Task cb=[_run_until_complete_cb() at Python37\lib\asyncio\base_events.py:158] created at Python37\lib\asyncio\base_events.py:552> got Future attached to a different loop
这是基本的流程,以后我想在更多的工作线程上运行更多的请求,每个工作线程将从'in_queue'移动数字到'out_queue',然后saver将打印来自'out_queue'的数字。