feat(ldap): 完善 LDAP 认证功能和安全性

- 添加 LDAP 配置类型定义,移除 any 类型
- 首次配置 LDAP 时强制要求设置绑定密码
- 根据认证类型区分登录标识验证(本地需邮箱,LDAP 允许用户名)
- 添加 LDAP 过滤器转义函数防止注入攻击
- 增加 LDAP 连接超时设置
- 添加账户来源冲突检查,防止 LDAP 覆盖本地账户
- 添加用户名冲突自动重命名机制
This commit is contained in:
RWDai
2026-01-04 11:18:28 +08:00
parent 612992fa1f
commit 64bfa955f4
8 changed files with 2147 additions and 2020 deletions

View File

@@ -117,6 +117,12 @@ class AuthService:
# 获取或创建本地用户
user = await AuthService._get_or_create_ldap_user(db, ldap_user)
if not user:
# 已有本地账号但来源不匹配等情况
return None
if not user.is_active:
logger.warning(f"登录失败 - 用户已禁用: {email}")
return None
return user
# 本地认证
@@ -154,7 +160,7 @@ class AuthService:
return user
@staticmethod
async def _get_or_create_ldap_user(db: Session, ldap_user: dict) -> User:
async def _get_or_create_ldap_user(db: Session, ldap_user: dict) -> Optional[User]:
"""获取或创建 LDAP 用户
Args:
@@ -166,17 +172,33 @@ class AuthService:
if user:
# 更新 auth_source如果是首次 LDAP 登录)
if user.auth_source != AuthSource.LDAP:
user.auth_source = AuthSource.LDAP
# 避免覆盖已有本地账户(不同来源时拒绝登录)
logger.warning(
f"LDAP 登录拒绝 - 账户来源不匹配(现有:{user.auth_source}, 请求:LDAP): {ldap_user['email']}"
)
return None
user.last_login_at = datetime.now(timezone.utc)
db.commit()
await UserCacheService.invalidate_user_cache(user.id, user.email)
logger.info(f"LDAP 用户登录成功: {ldap_user['email']} (ID: {user.id})")
return user
# 检查 username 是否已被占用
username = ldap_user["username"]
existing_user_with_username = db.query(User).filter(User.username == username).first()
if existing_user_with_username:
# 如果 username 已存在,添加后缀使其唯一
base_username = username
counter = 1
while db.query(User).filter(User.username == username).first():
username = f"{base_username}_ldap_{counter}"
counter += 1
logger.info(f"LDAP 用户名冲突,使用新用户名: {base_username} -> {username}")
# 创建新用户
user = User(
email=ldap_user["email"],
username=ldap_user["username"],
username=username,
password_hash="", # LDAP 用户无本地密码
auth_source=AuthSource.LDAP,
role=UserRole.USER,