
在构建基于fastapi的异步应用时,我们经常需要在应用启动时初始化一些全局资源,例如数据库连接池、消息队列客户端或缓存连接。fastapi提供了@app.on_event("startup")装饰器来处理这些启动任务。同时,为了更好地管理资源生命周期,我们通常会使用异步生成器(asyncgenerator)来创建和关闭这些资源,并结合fastapi的依赖注入系统depends()。然而,当尝试在startup事件中直接将asyncgenerator与depends()结合使用时,可能会遇到意料之外的错误。
考虑一个场景,我们需要在FastAPI应用启动时获取一个Redis异步客户端,并将其用于初始化一个全局的任务队列。我们可能尝试编写如下代码:
import uvicorn
from fastapi import FastAPI, Depends
import redis.asyncio as redis
from redis.asyncio import Redis
from typing import AsyncGenerator
from rq import Queue # 假设rq是任务队列库
# 配置Redis连接
REDIS_HOST = "localhost"
REDIS_PORT = 6379
redis_pool = redis.ConnectionPool.from_url(f"redis://{REDIS_HOST}:{REDIS_PORT}")
async def get_async_redis_client() -> AsyncGenerator[Redis, None]:
"""
异步生成器,用于提供Redis客户端连接。
"""
async with Redis.from_pool(redis_pool) as client:
yield client
def process_data(data: str):
"""
模拟一个处理数据的函数。
"""
print(f"Processing data: {data}")
def create_app():
app = FastAPI(docs_url='/')
task_queue: Queue = None # 声明为None,稍后初始化
@app.on_event("startup")
async def startup_event(redis_conn: redis.asyncio.Redis = Depends(get_async_redis_client)):
"""
尝试在startup事件中使用Depends()注入Redis连接。
"""
nonlocal task_queue
task_queue = Queue("task_queue", connection=redis_conn)
print("Redis connection initialized in startup event.")
@app.post("/add_data")
async def add_data(data: str):
"""
添加数据到任务队列。
"""
if task_queue:
task_queue.enqueue(process_data, data)
return {"message": "Book in processing"}
return {"message": "Task queue not initialized", "status": "error"}
@app.get("/get_data")
async def get_data():
"""
示例接口。
"""
return {"data": "kek"}
return app
def main():
uvicorn.run(
f"{__name__}:create_app",
host='0.0.0.0', port=8888,
reload=True
)
if __name__ == '__main__':
main()当运行上述代码并尝试向/add_data端点发送POST请求时,会收到一个AttributeError: 'Depends' object has no attribute 'pipeline'的错误。这表明在startup_event函数中,redis_conn变量并没有被解析成实际的redis.asyncio.Redis对象,而仍然是一个Depends对象。
FastAPI的Depends()机制主要设计用于请求处理函数中的依赖解析。在请求处理的生命周期中,FastAPI会负责调用依赖函数(包括异步生成器),获取其yield出的值,并在请求结束后执行生成器中yield之后的清理代码。
然而,@app.on_event("startup")装饰器下的函数,其执行时机在整个应用开始接受请求之前,并且它不属于标准的请求-响应循环。FastAPI的依赖注入系统并不会像处理路由函数那样,自动解析startup事件函数参数中的Depends()。因此,redis_conn变量接收到的不是get_async_redis_client生成器yield出的Redis客户端实例,而是Depends(get_async_redis_client)这个Depends对象本身。当rq库尝试对这个Depends对象调用pipeline()方法时,自然会抛出AttributeError。
为了在应用启动时正确地初始化和管理异步生成器提供的资源,FastAPI推荐使用lifespan上下文管理器。lifespan是FastAPI 0.65.0版本引入的一种更现代、更灵活的应用生命周期管理方式,它允许我们定义一个异步上下文管理器,在应用启动前执行设置代码,并在应用关闭前执行清理代码。
通过lifespan,我们可以手动调用异步生成器,获取其yield出的资源,并将其存储在应用实例或全局变量中,供其他部分使用。
以下是使用lifespan解决上述问题的正确方法:
import uvicorn
from fastapi import FastAPI
import redis.asyncio as redis
from redis.asyncio import Redis
from typing import AsyncGenerator
from rq import Queue # 假设rq是任务队列库
from contextlib import asynccontextmanager
# 配置Redis连接
REDIS_HOST = "localhost"
REDIS_PORT = 6379
redis_pool = redis.ConnectionPool.from_url(f"redis://{REDIS_HOST}:{REDIS_PORT}")
async def get_async_redis_client() -> AsyncGenerator[Redis, None]:
"""
异步生成器,用于提供Redis客户端连接。
"""
print("Opening Redis connection...")
async with Redis.from_pool(redis_pool) as client:
yield client
print("Closing Redis connection...") # 应用关闭时执行
def process_data(data: str):
"""
模拟一个处理数据的函数。
"""
print(f"Processing data: {data}")
# 定义一个全局变量来存储任务队列
task_queue: Queue = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
FastAPI应用生命周期管理器。
在应用启动时初始化资源,在应用关闭时清理资源。
"""
global task_queue # 声明使用全局变量
# 手动调用异步生成器以获取Redis连接
# 注意:这里直接调用get_async_redis_client(),并迭代它
# app.dependency_overrides.get(get_async_redis_client, get_async_redis_client)
# 这一步是为了兼容可能存在的依赖覆盖,确保获取到的是最终的依赖函数
redis_generator_func = app.dependency_overrides.get(get_async_redis_client, get_async_redis_client)
async for redis_conn in redis_generator_func():
# 在这里,redis_conn已经是实际的Redis客户端对象
task_queue = Queue("task_queue", connection=redis_conn)
print("Redis connection and Task Queue initialized via lifespan.")
yield # 应用在此处启动并处理请求
# 应用关闭时,生成器会继续执行,清理Redis连接
print("Application shutdown: Resources released.")
def create_app():
app = FastAPI(
docs_url='/',
lifespan=lifespan # 将lifespan上下文管理器传递给FastAPI
)
@app.post("/add_data")
async def add_data(data: str):
"""
添加数据到任务队列。
"""
if task_queue:
task_queue.enqueue(process_data, data)
return {"message": "Book in processing"}
return {"message": "Task queue not initialized", "status": "error"}
@app.get("/get_data")
async def get_data():
"""
示例接口。
"""
return {"data": "kek"}
return app
def main():
uvicorn.run(
f"{__name__}:create_app",
host='0.0.0.0', port=8888,
reload=True
)
if __name__ == '__main__':
main()在这个修正后的代码中:
在FastAPI中,Depends()装饰器是为请求处理函数设计的依赖注入机制,不适用于@app.on_event("startup")事件。当需要在应用启动时利用AsyncGenerator初始化全局资源时,正确的做法是使用FastAPI的lifespan上下文管理器。通过手动调用异步生成器并将其结果存储在全局变量中,我们可以确保资源在应用启动时被正确初始化,并在应用关闭时被优雅地清理,从而避免因依赖解析不当导致的AttributeError。这种方法不仅解决了特定问题,也体现了FastAPI在应用生命周期管理上的灵活性和强大功能。
以上就是FastAPI启动事件中AsyncGenerator依赖注入的正确实践的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号