Files
Aether/src/plugins/auth/jwt.py
fawney19 4d1d863916 refactor: improve authentication and user data handling
- Replace user cache queries with direct database queries to ensure data consistency
- Fix token_type parameter in verify_token calls (access token verification)
- Fix role-based permission check using dictionary ranking instead of string comparison
- Fix logout operation to use correct JWT claim name (user_id instead of sub)
- Simplify user authentication flow by removing unnecessary cache layer
- Optimize session initialization in main.py using create_session helper
- Remove unused imports and exception variables
2025-12-18 01:09:22 +08:00

104 lines
3.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
JWT认证插件
支持JWT Bearer token认证
"""
from typing import Optional
from fastapi import Request
from sqlalchemy.orm import Session
from src.core.logger import logger
from src.models.database import User
from src.services.auth.service import AuthService
from .base import AuthContext, AuthPlugin
class JwtAuthPlugin(AuthPlugin):
"""
JWT认证插件
支持从Authorization Bearer header中提取JWT token进行认证
"""
def __init__(self):
super().__init__(name="jwt", priority=20) # 高优先级优先于API Key
def get_credentials(self, request: Request) -> Optional[str]:
"""
从Authorization header中提取JWT token
支持格式: Authorization: Bearer <token>
"""
auth_header = request.headers.get("authorization")
if auth_header and auth_header.startswith("Bearer "):
return auth_header.replace("Bearer ", "")
return None
async def authenticate(self, request: Request, db: Session) -> Optional[AuthContext]:
"""
使用JWT token进行认证
"""
# 提取JWT token
token = self.get_credentials(request)
if not token:
logger.debug("未找到JWT token")
return None
# 记录认证尝试的详细信息
logger.info(f"JWT认证尝试 - 路径: {request.url.path}, Token前20位: {token[:20]}...")
try:
# 验证JWT token
payload = await AuthService.verify_token(token, token_type="access")
logger.debug(f"JWT token验证成功, payload: {payload}")
# 从payload中提取用户信息
user_id = payload.get("user_id")
if not user_id:
logger.warning("JWT token中缺少用户ID")
return None
logger.debug(f"从JWT提取user_id: {user_id}, 类型: {type(user_id)}")
# 从数据库获取用户信息
user = db.query(User).filter(User.id == user_id).first()
if not user:
logger.warning(f"JWT认证失败 - 用户不存在: {user_id}")
return None
logger.debug(f"找到用户: {user.email}, is_active: {user.is_active}")
if not user.is_active:
logger.warning(f"JWT认证失败 - 用户已禁用: {user.email}")
return None
# 创建认证上下文
auth_context = AuthContext(
user_id=user.id,
user_name=user.username,
permissions={"can_use_api": True, "is_admin": user.role.value == "admin"},
quota_info={
"quota_usd": user.quota_usd,
"used_usd": user.used_usd,
"remaining_usd": (
None if user.quota_usd is None else user.quota_usd - user.used_usd
),
"quota_ok": True, # JWT用户通常已经通过前端验证
},
metadata={
"auth_method": "jwt",
"client_ip": request.client.host if request.client else "unknown",
"token_exp": payload.get("exp"),
},
)
logger.info("JWT认证成功")
return auth_context
except Exception as e:
logger.warning(f"JWT认证失败: {str(e)}")
return None