
本文探讨了在 `asyncio` 中如何解决因直接 `await` 耗时操作导致的并发阻塞问题。通过分析一个字符流处理示例,揭示了传统 `async for` 循环中 `await` 的局限性。核心解决方案是引入 `asyncio.queue` 和 `asyncio.event`,构建生产者-消费者模式,从而实现任务的解耦与并发执行,显著提升异步应用的响应性和效率。
在 asyncio 异步编程中,我们经常需要处理数据流的生产和消费。一个常见的挑战是,当一个任务正在处理数据并遇到耗时操作时,如何确保数据生产能够持续进行,而不是被阻塞。本文将深入探讨这一问题,并提供一个基于 asyncio.Queue 和 asyncio.Event 的优雅解决方案。
考虑一个场景:我们有一个字符流生成器 stream(),它逐个生成字符;一个句子生成器 sentences_generator(),它从字符流中收集字符并生成完整的句子;以及一个句子处理器 process_sentence(),它模拟对句子的耗时处理。
初始实现可能如下所示:
import asyncio
async def stream():
char_string = "Hi. Hello. Thank you."
for char in char_string:
await asyncio.sleep(0.1) # 模拟耗时操作
print("got char:", char)
yield char
async def sentences_generator():
sentence = ""
async for char in stream():
sentence += char
if char in [".", "!", "?"]:
print("got sentence: ", sentence)
yield sentence
sentence = ""
async def process_sentence(sentence: str):
print("waiting for processing sentence: ", sentence)
await asyncio.sleep(len(sentence) * 0.1) # 模拟句子处理耗时
print("sentence processed!")
async def main():
i = 0
async for sentence in sentences_generator():
print("processing sentence: ", i)
await process_sentence(sentence) # 这里会阻塞
i += 1
# asyncio.run(main())运行上述代码,会观察到如下输出模式:
got char: H got char: i got char: . got sentence: Hi. processing sentence: 0 waiting for processing sentence: Hi. sentence processed! got char: got char: H got char: e got char: l got char: l got char: o got char: . got sentence: Hello. processing sentence: 1 waiting for processing sentence: Hello. sentence processed! ...
从输出可以看出,当 process_sentence 正在处理一个句子(即执行 await asyncio.sleep())时,字符流的生成(got char:)完全停止了。只有当前句子处理完毕后,sentences_generator 才能继续从 stream() 获取下一个字符,进而生成下一个句子。这并不是我们期望的并发行为。理想情况下,当一个句子正在被处理时,字符流应该能够持续生成,为下一个句子做准备。
问题根源:asyncio 中的 await 关键字会暂停当前协程的执行,并将控制权交还给事件循环,允许事件循环调度其他“已准备好”的协程。然而,在上述 main 函数中,async for sentence in sentences_generator(): 迭代器在每次循环中都紧跟着 await process_sentence(sentence)。这意味着 main 协程会完全等待 process_sentence 完成,才能再次从 sentences_generator 获取下一个句子。由于 sentences_generator 是在 main 内部同步迭代的,它也无法在 process_sentence 运行时继续推进。
为了实现生产者(生成句子)和消费者(处理句子)的并发执行,我们需要解耦它们,使它们能够独立运行。asyncio.Queue 和 asyncio.Event 是实现这一目标的理想工具。
生产者-消费者模式是一种经典的多线程/多进程/多任务设计模式。生产者负责生成数据并将其放入一个共享缓冲区(队列),而消费者则从缓冲区中取出数据进行处理。这种模式的关键在于,生产者和消费者可以以不同的速度运行,并且互不干扰,只要队列中有足够的空间或数据。
asyncio.Queue 是 asyncio 提供的异步队列,它具有以下特性:
asyncio.Event 是一个简单的同步原语,用于在 asyncio 任务之间进行信号通知。它主要用于:
结合 asyncio.Queue 和 asyncio.Event,我们可以构建一个健壮的生产者-消费者系统,确保消费者在队列清空且生产者已完成工作后能够优雅地退出。
我们将改造 sentences_generator 作为生产者,process_sentence 作为消费者,并使用 asyncio.gather() 来并发运行它们。
import asyncio
# 模拟全局变量,用于计数处理的句子
i = 1
async def stream():
char_string = "Hi. Hello. Thank you." # 更改了字符串以展示更长的流
for char in char_string:
await asyncio.sleep(0.1) # 模拟耗时操作
print("got char:", char)
yield char
async def sentences_generator(q: asyncio.Queue[str], flag: asyncio.Event):
"""
生产者:从字符流生成句子,并放入队列。
当所有句子生成完毕后,设置事件标志。
"""
sentence = ""
async for char in stream():
sentence += char
if char in [".", "!", "?"]:
print("got sentence: ", sentence)
await q.put(sentence) # 将句子放入队列
sentence = ""
flag.set() # 生产者完成所有工作,设置事件标志
async def process_sentence(q: asyncio.Queue[str], flag: asyncio.Event):
"""
消费者:从队列中取出句子并处理。
当队列为空且生产者已完成时,退出。
"""
global i # 使用全局计数器
while True:
# 检查退出条件:队列为空且生产者已设置完成标志
if q.empty() and flag.is_set():
break
try:
item = await asyncio.wait_for(q.get(), timeout=1.0) # 尝试从队列获取,设置超时避免无限等待
except asyncio.TimeoutError:
# 如果超时,再次检查退出条件,防止生产者完成但队列仍空的情况
if q.empty() and flag.is_set():
break
continue # 继续尝试获取
print("processing sentence: ", i)
print("waiting for processing sentence: ", item)
await asyncio.sleep(len(item) * 0.1) # 模拟句子处理耗时
print("sentence processed!")
i += 1
async def main():
global i
i = 1 # 重置计数器
event = asyncio.Event() # 创建事件对象
queue = asyncio.Queue[str]() # 创建队列对象
# 创建生产者和消费者任务
producer_task = sentences_generator(queue, event)
consumer_task = process_sentence(queue, event)
# 并发运行生产者和消费者任务
await asyncio.gather(producer_task, consumer_task)
if __name__ == "__main__":
asyncio.run(main())代码解析:
sentences_generator (生产者):
process_sentence (消费者):
main 函数:
运行优化后的代码,你将看到类似以下的输出:
got char: H got char: i got char: . got sentence: Hi. processing sentence: 1 waiting for processing sentence: Hi. got char: got char: H got char: e got char: l got char: l got char: o got char: . got sentence: Hello. sentence processed! processing sentence: 2 waiting for processing sentence: Hello. got char: got char: T got char: h got char: a got char: n got char: k got char: . got sentence: Thank. sentence processed! processing sentence: 3 waiting for processing sentence: Thank. got char: got char: y got char: o got char: u got char: . got sentence: you. sentence processed! processing sentence: 4 waiting for processing sentence: you. sentence processed!
并发分析:
从新的输出可以看出,当 process_sentence 正在等待(waiting for processing sentence: ...)时,sentences_generator 仍然在继续生成字符(got char: ...)和句子(got sentence: ...),并将它们放入队列。这正是我们期望的并发行为:生产者和消费者独立运行,通过队列进行异步通信,充分利用了 asyncio 的协作式多任务能力。
通过理解 await 的工作原理并巧妙地利用 asyncio.Queue 和 asyncio.Event,我们可以有效地构建高效、响应迅速的异步应用程序,实现复杂的任务调度和并发处理。
以上就是优化 asyncio 任务调度:使用队列实现生产者-消费者模式的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号