使用队列会导致 asyncio 异常 "got Future attached to a different loop"。

5 浏览
0 Comments

使用队列会导致 asyncio 异常 "got Future attached to a different loop"。

我试图使用asyncio队列运行这段简单的代码,但是捕获异常,甚至是嵌套异常。

我希望在使用asyncio创建队列时得到一些帮助,以便正确运行:

import asyncio, logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("asyncio").setLevel(logging.WARNING)
num_workers = 1
in_queue = asyncio.Queue()
out_queue = asyncio.Queue()
tasks = []
async def run():
    for request in range(1):
        await in_queue.put(request)
    # 每个任务从'in_queue'消费并向'out_queue'生产:
    for i in range(num_workers):
        tasks.append(asyncio.create_task(worker(name=f'worker-{i}')))
    # tasks.append(asyncio.create_task(saver()))
    print('waiting for queues...')
    await in_queue.join()
    # await out_queue.join()
    print('all queues done')
    for task in tasks:
        task.cancel()
    print('waiting until all tasks cancelled')
    await asyncio.gather(*tasks, return_exceptions=True)
    print('done')
async def worker(name):
    while True:
        try:
            print(f"{name} started")
            num = await in_queue.get()
            print(f'{name} got {num}')
            await asyncio.sleep(0)
            # await out_queue.put(num)
        except Exception as e:
            print(f"{name} exception {e}")
        finally:
            print(f"{name} ended")
            in_queue.task_done()
async def saver():
    while True:
        try:
            print("saver started")
            num = await out_queue.get()
            print(f'saver got {num}')
            await asyncio.sleep(0)
            print("saver ended")
        except Exception as e:
            print(f"saver exception {e}")
        finally:
            out_queue.task_done()
asyncio.run(run(), debug=True)
print('Done!')

输出:

waiting for queues...
worker-0 started
worker-0 got 0
worker-0 ended
worker-0 started
worker-0 exception 
worker-0 ended
ERROR:asyncio:unhandled exception during asyncio.run() shutdown
task:  exception=ValueError('task_done() called too many times') created at Python37\lib\asyncio\tasks.py:325>
Traceback (most recent call last):
  File "Python37\lib\asyncio\runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "Python37\lib\asyncio\base_events.py", line 573, in run_until_complete
    return future.result()
  File "temp4.py", line 23, in run
    await in_queue.join()
  File "Python37\lib\asyncio\queues.py", line 216, in join
    await self._finished.wait()
  File "Python37\lib\asyncio\locks.py", line 293, in wait
    await fut
RuntimeError: Task  cb=[_run_until_complete_cb() at Python37\lib\asyncio\base_events.py:158] created at Python37\lib\asyncio\base_events.py:552> got Future  attached to a different loop
在处理上述异常期间,发生了另一个异常:
Traceback (most recent call last):
  File "temp4.py", line 46, in worker
    in_queue.task_done()
  File "Python37\lib\asyncio\queues.py", line 202, in task_done
    raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times
Traceback (most recent call last):
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1664, in 
    main()
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1658, in main
    globals = debugger.run(setup['file'], None, None, is_module)
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\pydevd.py", line 1068, in run
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.1.4\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "temp4.py", line 63, in 
    asyncio.run(run(), debug=True)
  File "Python37\lib\asyncio\runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "Python37\lib\asyncio\base_events.py", line 573, in run_until_complete
    return future.result()
  File "temp4.py", line 23, in run
    await in_queue.join()
  File "Python37\lib\asyncio\queues.py", line 216, in join
    await self._finished.wait()
  File "Python37\lib\asyncio\locks.py", line 293, in wait
    await fut
RuntimeError: Task  cb=[_run_until_complete_cb() at Python37\lib\asyncio\base_events.py:158] created at Python37\lib\asyncio\base_events.py:552> got Future  attached to a different loop

这是基本的流程,以后我想在更多的工作线程上运行更多的请求,每个工作线程将从'in_queue'移动数字到'out_queue',然后saver将打印来自'out_queue'的数字。

0