feat: add ldap login

This commit is contained in:
RWDai
2026-01-02 16:17:24 +08:00
parent cddc22d2b3
commit 9bfb295238
18 changed files with 1007 additions and 18 deletions

View File

@@ -15,8 +15,10 @@ from sqlalchemy.orm import Session, joinedload
from src.config import config
from src.core.crypto import crypto_service
from src.core.logger import logger
from src.core.enums import AuthSource
from src.models.database import ApiKey, User, UserRole
from src.services.auth.jwt_blacklist import JWTBlacklistService
from src.services.auth.ldap import LDAPService
from src.services.cache.user_cache import UserCacheService
from src.services.user.apikey import ApiKeyService
@@ -92,8 +94,36 @@ class AuthService:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的Token")
@staticmethod
async def authenticate_user(db: Session, email: str, password: str) -> Optional[User]:
"""用户登录认证"""
async def authenticate_user(
db: Session, email: str, password: str, auth_type: str = "local"
) -> Optional[User]:
"""用户登录认证
Args:
db: 数据库会话
email: 邮箱/用户名
password: 密码
auth_type: 认证类型 ("local""ldap")
"""
if auth_type == "ldap":
# LDAP 认证
if not LDAPService.is_ldap_enabled(db):
logger.warning("登录失败 - LDAP 认证未启用")
return None
ldap_user = LDAPService.authenticate(db, email, password)
if not ldap_user:
return None
# 获取或创建本地用户
user = await AuthService._get_or_create_ldap_user(db, ldap_user)
return user
# 本地认证
if LDAPService.is_ldap_exclusive(db):
logger.warning("登录失败 - 仅允许 LDAP 登录")
return None
# 登录校验必须读取密码哈希,不能使用不包含 password_hash 的缓存对象
user = db.query(User).filter(User.email == email).first()
@@ -101,6 +131,11 @@ class AuthService:
logger.warning(f"登录失败 - 用户不存在: {email}")
return None
# 检查用户认证来源
if user.auth_source == AuthSource.LDAP:
logger.warning(f"登录失败 - 该用户使用 LDAP 认证: {email}")
return None
if not user.verify_password(password):
logger.warning(f"登录失败 - 密码错误: {email}")
return None
@@ -118,6 +153,42 @@ class AuthService:
logger.info(f"用户登录成功: {email} (ID: {user.id})")
return user
@staticmethod
async def _get_or_create_ldap_user(db: Session, ldap_user: dict) -> User:
"""获取或创建 LDAP 用户
Args:
ldap_user: LDAP 用户信息 {username, email, display_name}
"""
# 先按 email 查找
user = db.query(User).filter(User.email == ldap_user["email"]).first()
if user:
# 更新 auth_source如果是首次 LDAP 登录)
if user.auth_source != AuthSource.LDAP:
user.auth_source = AuthSource.LDAP
user.last_login_at = datetime.now(timezone.utc)
db.commit()
await UserCacheService.invalidate_user_cache(user.id, user.email)
logger.info(f"LDAP 用户登录成功: {ldap_user['email']} (ID: {user.id})")
return user
# 创建新用户
user = User(
email=ldap_user["email"],
username=ldap_user["username"],
password_hash="", # LDAP 用户无本地密码
auth_source=AuthSource.LDAP,
role=UserRole.USER,
is_active=True,
last_login_at=datetime.now(timezone.utc),
)
db.add(user)
db.commit()
db.refresh(user)
logger.info(f"LDAP 用户创建成功: {ldap_user['email']} (ID: {user.id})")
return user
@staticmethod
def authenticate_api_key(db: Session, api_key: str) -> Optional[tuple[User, ApiKey]]:
"""API密钥认证"""