FastAPI 多种认证方式(任选其一)实现指南

碧海醫心
发布: 2025-11-25 14:08:53
原创
540人浏览过

FastAPI 多种认证方式(任选其一)实现指南

本教程详细阐述了如何在 fastapi 中实现多种认证机制(如 basic auth 和 jwt auth),并允许客户端任选其一进行认证。核心方法是修改各个认证依赖项,使其在认证失败时返回 `none` 而非立即抛出异常,从而使一个组合认证依赖能够基于“或”逻辑判断任一认证是否成功,最终实现灵活的多重认证支持。

在构建现代 Web API 时,支持多种认证方式以满足不同客户端或场景的需求是常见的实践。例如,您可能希望同时支持传统的 HTTP Basic 认证和基于令牌的 JWT 认证。然而,在 FastAPI 中直接将多个认证依赖项组合使用时,默认行为可能会导致问题:FastAPI 会强制所有依赖项都通过验证,如果其中一个依赖项在解析阶段抛出 HTTPException,那么后续的依赖项将不会被执行,从而无法实现“任选其一”的逻辑。

理解 FastAPI 依赖注入的默认行为

FastAPI 的依赖注入系统在处理多个 Depends 依赖时,遵循严格的顺序和错误处理机制。当一个依赖项(例如,一个认证函数)内部抛出 HTTPException 时,FastAPI 会立即捕获该异常并返回相应的 HTTP 响应,而不会继续执行后续的依赖项或路由处理函数。这意味着,如果您尝试通过一个自定义的组合依赖函数来包装多个认证依赖,并期望它们能像逻辑“或”一样工作,那么默认情况下是行不通的。

例如,以下尝试实现“或”逻辑的组合认证依赖:

# 假设 jwt_logged_user 和 basic_logged_user 在认证失败时会抛出 HTTPException
def auth_user(jwt_auth: Any = Depends(jwt_logged_user),
              basic_auth: Any = Depends(basic_logged_user)):
    if jwt_auth or basic_auth:
        return jwt_auth or basic_auth
    raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid Credentials')
登录后复制

如果 jwt_logged_user 认证失败并抛出异常,那么 basic_auth 依赖将永远不会被解析,auth_user 函数体也永远不会执行,从而无法检查 Basic Auth 是否有效。

核心解决方案:优雅地处理认证失败

要实现“任选其一”的认证逻辑,关键在于修改各个独立的认证依赖项,使其在认证失败时不再立即抛出 HTTPException,而是返回一个指示失败的值(例如 None)。这样,组合认证依赖就可以接收到所有独立认证的结果,并根据这些结果进行逻辑判断。

1. 修改 HTTP Basic 认证依赖

对于 HTTP Basic 认证,FastAPI 提供了 HTTPBasic 类。通过设置 auto_error=False,可以禁用其在认证失败时自动抛出 HTTPException 的行为。

首先,更新 HTTPBasic 实例:

from fastapi.security import HTTPBasic, HTTPBasicCredentials
from typing import Annotated, Optional
import secrets

# ... 其他导入 ...

# 将 auto_error 设置为 False
security = HTTPBasic(auto_error=False)

def basic_logged_user(credentials: Annotated[Optional[HTTPBasicCredentials], Depends(security)]):
    """
    处理 HTTP Basic 认证。
    如果认证失败,返回 None 而非抛出异常。
    """
    if credentials is None:
        # 没有提供 Basic 认证凭据,或者凭据格式不正确 (由 auto_error=False 处理)
        return None

    current_username_bytes = credentials.username.encode("utf8")
    correct_username_bytes = settings.SESSION_LOGIN_USER.encode("utf8")
    is_correct_username = secrets.compare_digest(
        current_username_bytes, correct_username_bytes
    )
    current_password_bytes = credentials.password.encode("utf8")
    correct_password_bytes = settings.SESSION_LOGIN_PASS.encode("utf8")
    is_correct_password = secrets.compare_digest(
        current_password_bytes, correct_password_bytes
    )

    if not (is_correct_username and is_correct_password):
        # 凭据不匹配,返回 None
        return None

    # 认证成功,返回用户名
    return credentials.username
登录后复制

关键点:

智谱AI开放平台
智谱AI开放平台

智谱AI大模型开放平台-新一代国产自主通用AI开放平台

智谱AI开放平台 85
查看详情 智谱AI开放平台
  • security = HTTPBasic(auto_error=False):禁用 HTTPBasic 在凭据缺失或格式错误时自动抛出 401 异常。
  • credentials: Annotated[Optional[HTTPBasicCredentials], Depends(security)]:声明 credentials 为 Optional 类型,因为 security 在 auto_error=False 时可能返回 None。
  • if credentials is None::检查是否提供了凭据。
  • if not (is_correct_username and is_correct_password): return None:在自定义逻辑中,如果凭据无效,也返回 None。

2. 修改 JWT 认证依赖

对于 JWT 认证,通常会使用 OAuth2PasswordBearer。同样,通过设置 auto_error=False 来禁用其自动错误处理。此外,您的令牌验证函数 (utils.verify_token) 也应该被修改,以在验证失败时返回 None 或被 try-except 块包裹。

假设 utils.OAuth2_scheme 是 OAuth2PasswordBearer 的实例,并且 utils.verify_token 函数负责验证 JWT。

from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from typing import Annotated, Optional
from jose import JWTError, jwt # 假设您使用 jose 库
from pydantic import BaseModel

# ... 其他导入 ...

# 假设 utils 模块包含 OAuth2_scheme 和 verify_token
# utils.OAuth2_scheme 应该被初始化为 auto_error=False
# 例如:
# class TokenData(BaseModel):
#     username: Optional[str] = None
# class Utils:
#     OAuth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
#     SECRET_KEY = "your-secret-key" # 替换为实际的密钥
#     ALGORITHM = "HS256"
#     def verify_token(self, token: str) -> dict:
#         try:
#             payload = jwt.decode(token, self.SECRET_KEY, algorithms=[self.ALGORITHM])
#             username: str = payload.get("sub")
#             if username is None:
#                 raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload")
#             return {"username": username}
#         except JWTError:
#             raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
# utils = Utils()


def jwt_logged_user(token: Annotated[Optional[str], Depends(utils.OAuth2_scheme)],
                    db: Session = Depends(db_session)):
    """
    处理 JWT 认证。
    如果认证失败,返回 None 而非抛出异常。
    """
    if token is None:
        # 没有提供 JWT 令牌,或者令牌格式不正确 (由 auto_error=False 处理)
        return None

    try:
        # 尝试验证令牌,并从 payload 中获取用户名
        # 假设 utils.verify_token 在验证失败时会抛出 HTTPException 或 JWTError
        payload = utils.verify_token(token) # 假设返回一个包含用户信息的字典
        username = payload.get("username") # 根据实际 payload 结构调整

        if username is None:
            return None # 令牌有效但没有找到用户名信息

        # 查询数据库以验证用户是否存在
        user = db.query(User).filter(User.username == username).first()
        if user is None:
            return None # 用户不存在

        # 认证成功,返回用户名
        return user.username
    except HTTPException:
        # 捕获 verify_token 内部抛出的 HTTPException
        return None
    except JWTError: # 捕获 jose 库的 JWT 错误
        return None
    except Exception:
        # 捕获其他未知错误
        return None
登录后复制

关键点:

  • utils.OAuth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False):确保 OAuth2PasswordBearer 实例设置了 auto_error=False。
  • token: Annotated[Optional[str], Depends(utils.OAuth2_scheme)]:声明 token 为 Optional 类型。
  • if token is None::检查是否提供了令牌。
  • try...except 块:包裹 utils.verify_token 调用,捕获可能抛出的异常,并在捕获到异常时返回 None。
  • return user.username:认证成功后返回用户名,与 Basic 认证的返回类型保持一致。

3. 实现组合认证逻辑

现在,当 basic_logged_user 和 jwt_logged_user 认证失败时都返回 None,我们可以创建一个新的组合依赖项来检查任一认证是否成功。

from fastapi import HTTPException, status, Depends
from typing import Annotated, Optional, Union

# ... 其他导入 ...

def auth_user(jwt_username: Annotated[Optional[str], Depends(jwt_logged_user)],
              basic_username: Annotated[Optional[str], Depends(basic_logged_user)]) -> str:
    """
    组合认证依赖,允许通过 JWT 或 Basic Auth 中的任一种进行认证。
    如果任一认证成功,返回对应的用户名。
    如果两种认证都失败,则抛出 401 异常。
    """
    if jwt_username:
        # JWT 认证成功
        return jwt_username
    if basic_username:
        # Basic Auth 认证成功
        return basic_username

    # 如果两种认证都失败,则抛出未授权异常
    raise HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail='Invalid Credentials',
        headers={"WWW-Authenticate": "Basic, Bearer"}, # 提示支持的认证方式
    )

# 路由使用组合认证依赖
@router.get("/users/")
async def get_users(db: Session = Depends(db_session), 
                    logged_username: Annotated[str, Depends(auth_user)]):
    """
    获取用户列表,需要通过 JWT 或 Basic Auth 认证。
    """
    # 假设您需要根据用户名进行某些操作,例如权限检查
    print(f"Authenticated user: {logged_username}") 
    query_users = db.query(User).all()
    return query_users
登录后复制

关键点:

  • jwt_username: Annotated[Optional[str], Depends(jwt_logged_user)] 和 basic_username: Annotated[Optional[str], Depends(basic_logged_user)]:接收两个子认证依赖的结果,它们现在可以是 None。
  • if jwt_username: return jwt_username:如果 JWT 认证成功(返回了用户名),则直接返回该用户名。
  • if basic_username: return basic_username:如果 JWT 认证失败但 Basic Auth 成功,则返回 Basic Auth 的用户名。
  • raise HTTPException(...):只有当所有子认证都失败时,才由 auth_user 统一抛出 401 异常。这样确保了只有在没有任何有效凭据的情况下才拒绝访问。
  • headers={"WWW-Authenticate": "Basic, Bearer"}:在 401 响应头中明确告知客户端支持的认证方式。

完整示例代码结构

为了更好地理解,以下是一个整合了上述修改的 FastAPI 应用片段:

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials, OAuth2PasswordBearer
from sqlalchemy.orm import Session
from typing import Annotated, Optional, Union
import secrets
from jose import JWTError, jwt # 假设使用 jose 库

# 假设这些是您的配置和模型
class Settings:
    SESSION_LOGIN_USER = "admin"
    SESSION_LOGIN_PASS = "securepassword"
    JWT_SECRET_KEY = "your-super-secret-jwt-key" # 替换为实际的密钥
    JWT_ALGORITHM = "HS256"

settings = Settings()

# 假设 db_session 是您的数据库会话依赖
# 假设 User 是您的 SQLAlchemy 用户模型
class User: # 简化示例
    def __init__(self, username):
        self.username = username
    def __repr__(self):
        return f"<User {self.username}>"

class MockDBSession: # 模拟数据库会话
    def query(self, model):
        return self
    def filter(self, *args, **kwargs):
        return self
    def first(self):
        return User("testuser") # 模拟返回一个用户
    def all(self):
        return [User("user1"), User("user2")]

def get_db():
    db = MockDBSession()
    try:
        yield db
    finally:
        pass # 实际应用中会关闭会话

db_session = Depends(get_db)

# JWT 相关的工具类
class Utils:
    OAuth2_scheme = OAuth2PasswordBearer(tokenUrl="/token", auto_error=False)
    SECRET_KEY = settings.JWT_SECRET_KEY
    ALGORITHM = settings.JWT_ALGORITHM

    def verify_token(self, token: str) -> dict:
        try:
            payload = jwt.decode(token, self.SECRET_KEY, algorithms=[self.ALGORITHM])
            username: str = payload.get("sub")
            if username is None:
                raise JWTError("Invalid token payload: no subject")
            return {"username": username}
        except JWTError as e:
            # 在这里可以记录错误,但不再抛出 HTTPException
            raise e # 重新抛出 JWTError,由 jwt_logged_user 捕获
        except Exception as e:
            raise e

utils = Utils()

router = APIRouter()

# --- 独立认证依赖项 ---

# Basic Auth
security = HTTPBasic(auto_error=False)

def basic_logged_user(credentials: Annotated[Optional[HTTPBasicCredentials], Depends(security)]) -> Optional[str]:
    if credentials is None:
        return None

    current_username_bytes = credentials.username.encode("utf8")
    correct_username_bytes = settings.SESSION_LOGIN_USER.encode("utf8")
    is_correct_username = secrets.compare_digest(
        current_username_bytes, correct_username_bytes
    )
    current_password_bytes = credentials.password.encode("utf8")
    correct_password_bytes = settings.SESSION_LOGIN_PASS.encode("utf8")
    is_correct_password = secrets.compare_digest(
        current_password_bytes, correct_password_bytes
    )

    if not (is_correct_username and is_correct_password):
        return None

    return credentials.username

# JWT Auth
def jwt_logged_user(token: Annotated[Optional[str], Depends(utils.OAuth2_scheme)],
                    db: Session = Depends(db_session)) -> Optional[str]:
    if token is None:
        return None

    try:
        payload = utils.verify_token(token)
        username = payload.get("username")

        if username is None:
            return None

        user = db.query(User).filter(User.username == username).first()
        if user is None:
            return None

        return user.username
    except JWTError:
        return None
    except Exception:
        return None

# --- 组合认证依赖项 ---

def auth_user(jwt_username: Annotated[Optional[str], Depends(jwt_logged_user)],
              basic_username: Annotated[Optional[str], Depends(basic_logged_user)]) -> str:
    if jwt_username:
        return jwt_username
    if basic_username:
        return basic_username

    raise HTTPException(
        status_code=status.
登录后复制

以上就是FastAPI 多种认证方式(任选其一)实现指南的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号