使用工作线程的 asyncio.Queues 的等效方式

11 浏览
0 Comments

使用工作线程的 asyncio.Queues 的等效方式

我正在尝试弄清楚如何将一个使用asyncio的线程程序进行移植。我有很多代码围绕着一些标准库的Queues进行同步,基本上像这样:

import queue, random, threading, time
q = queue.Queue()
def produce():
    while True:
        time.sleep(0.5 + random.random())  # 睡眠0.5 - 1.5秒
        q.put(random.random())
def consume():
    while True: 
        value = q.get(block=True)
        print("已消费", value)
threading.Thread(target=produce).start()
threading.Thread(target=consume).start()

一个线程创建值(可能是用户输入),另一个线程对其进行处理。关键是这些线程在没有新数据时处于空闲状态,一旦有新数据,它们就会被唤醒并对其进行处理。

我正在尝试使用asyncio来实现这种模式,但似乎无法让它正常工作。

我的尝试看起来类似于这样(完全没有作用)。

import asyncio, random
q = asyncio.Queue()
@asyncio.coroutine
def produce():
    while True: 
        q.put(random.random())
        yield from asyncio.sleep(0.5 + random.random())
@asyncio.coroutine
def consume():
    while True:
        value = yield from q.get()
        print("已消费", value)
# 在此处进行一些操作以启动协程。asyncio.Task()?
loop = asyncio.get_event_loop()
loop.run_forever()

我尝试过使用协程的变化,不使用协程,将代码包装在Tasks中,尝试让它们创建或返回futures等等。

我开始觉得自己对如何使用asyncio有错误的理解(也许应该以我不知道的其他方式来实现这种模式)。

如果有任何指导意见,将不胜感激。

0