feat: 添加 LDAP 认证支持

- 新增 LDAP 服务和 API 接口
- 添加 LDAP 配置管理页面
- 登录页面支持 LDAP/本地认证切换
- 数据库迁移支持 LDAP 相关字段
This commit is contained in:
fawney19
2026-01-06 14:38:42 +08:00
21 changed files with 3947 additions and 2037 deletions

View File

@@ -4,9 +4,9 @@ API端点请求/响应模型定义
import re
from datetime import datetime
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Literal, Optional
from pydantic import BaseModel, ConfigDict, Field, field_validator
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
from ..core.enums import UserRole
@@ -15,17 +15,9 @@ from ..core.enums import UserRole
class LoginRequest(BaseModel):
"""登录请求"""
email: str = Field(..., min_length=3, max_length=255, description="邮箱地址")
email: str = Field(..., min_length=1, max_length=255, description="邮箱/用户名")
password: str = Field(..., min_length=1, max_length=128, description="密码")
@classmethod
@field_validator("email")
def validate_email(cls, v):
"""验证邮箱格式"""
email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
if not re.match(email_pattern, v):
raise ValueError("邮箱格式无效")
return v.lower()
auth_type: Literal["local", "ldap"] = Field(default="local", description="认证类型")
@classmethod
@field_validator("password")
@@ -36,6 +28,24 @@ class LoginRequest(BaseModel):
raise ValueError("密码不能为空")
return v
@model_validator(mode="after")
def validate_login(self):
"""根据认证类型校验并规范化登录标识"""
identifier = self.email.strip()
if not identifier:
raise ValueError("用户名/邮箱不能为空")
# 本地和 LDAP 登录都支持用户名或邮箱
# 如果是邮箱格式,转换为小写
email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
if re.match(email_pattern, identifier):
self.email = identifier.lower()
else:
self.email = identifier
return self
class LoginResponse(BaseModel):
"""登录响应"""

View File

@@ -30,7 +30,7 @@ from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import declarative_base, relationship
from ..config import config
from ..core.enums import ProviderBillingType, UserRole
from ..core.enums import AuthSource, ProviderBillingType, UserRole
Base = declarative_base()
@@ -54,6 +54,20 @@ class User(Base):
default=UserRole.USER,
nullable=False,
)
auth_source = Column(
Enum(
AuthSource,
name="authsource",
create_type=False,
values_callable=lambda x: [e.value for e in x],
),
default=AuthSource.LOCAL,
nullable=False,
)
# LDAP 标识(仅 auth_source=ldap 时使用,用于在邮箱变更/用户名冲突时稳定关联本地账户)
ldap_dn = Column(String(512), nullable=True, index=True)
ldap_username = Column(String(255), nullable=True, index=True)
# 访问限制NULL 表示不限制,允许访问所有资源)
allowed_providers = Column(JSON, nullable=True) # 允许使用的提供商 ID 列表
@@ -428,6 +442,68 @@ class SystemConfig(Base):
)
class LDAPConfig(Base):
"""LDAP认证配置表 - 单行配置"""
__tablename__ = "ldap_configs"
id = Column(Integer, primary_key=True, autoincrement=True)
server_url = Column(String(255), nullable=False) # ldap://host:389 或 ldaps://host:636
bind_dn = Column(String(255), nullable=False) # 绑定账号 DN
bind_password_encrypted = Column(Text, nullable=True) # 加密的绑定密码(允许 NULL 表示已清除)
base_dn = Column(String(255), nullable=False) # 用户搜索基础 DN
user_search_filter = Column(
String(500), default="(uid={username})", nullable=False
) # 用户搜索过滤器
username_attr = Column(String(50), default="uid", nullable=False) # 用户名属性 (uid/sAMAccountName)
email_attr = Column(String(50), default="mail", nullable=False) # 邮箱属性
display_name_attr = Column(String(50), default="cn", nullable=False) # 显示名称属性
is_enabled = Column(Boolean, default=False, nullable=False) # 是否启用 LDAP 认证
is_exclusive = Column(
Boolean, default=False, nullable=False
) # 是否仅允许 LDAP 登录(禁用本地认证)
use_starttls = Column(Boolean, default=False, nullable=False) # 是否使用 STARTTLS
connect_timeout = Column(Integer, default=10, nullable=False) # 连接超时时间(秒)
# 时间戳
created_at = Column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False
)
updated_at = Column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
nullable=False,
)
def set_bind_password(self, password: str) -> None:
"""
设置并加密绑定密码
Args:
password: 明文密码
"""
from src.core.crypto import crypto_service
self.bind_password_encrypted = crypto_service.encrypt(password)
def get_bind_password(self) -> str:
"""
获取解密后的绑定密码
Returns:
str: 解密后的明文密码
Raises:
DecryptionException: 解密失败时抛出异常
"""
from src.core.crypto import crypto_service
if not self.bind_password_encrypted:
return ""
return crypto_service.decrypt(self.bind_password_encrypted)
class Provider(Base):
"""提供商配置表"""