feat: 实现邮箱验证注册功能

添加完整的邮箱验证注册系统,包括验证码发送、验证和限流控制:
  - 新增邮箱验证服务模块(email_sender, email_template, email_verification)
  - 更新认证API支持邮箱验证注册流程
  - 添加注册对话框和验证码输入组件
  - 完善IP限流器支持邮箱验证场景
  - 修复前端类型定义问题,升级esbuild依赖

  🤖 Generated with [Claude Code](https://claude.com/claude-code)

  Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
RWDai
2025-12-30 17:15:48 +08:00
parent b89a4af0cf
commit 6bd8cdb9cf
21 changed files with 2464 additions and 404 deletions

View File

@@ -28,6 +28,8 @@ class IPRateLimiter:
"register": 3, # 注册接口
"api": 60, # API 接口
"public": 60, # 公共接口
"verification_send": 3, # 发送验证码接口
"verification_verify": 10, # 验证验证码接口
}
@staticmethod

View File

@@ -0,0 +1,9 @@
"""
邮箱验证服务模块
"""
from .email_sender import EmailSenderService
from .email_template import EmailTemplate
from .email_verification import EmailVerificationService
__all__ = ["EmailVerificationService", "EmailSenderService", "EmailTemplate"]

View File

@@ -0,0 +1,365 @@
"""
邮件发送服务
提供 SMTP 邮件发送功能
"""
import asyncio
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Optional, Tuple
try:
import aiosmtplib
AIOSMTPLIB_AVAILABLE = True
except ImportError:
AIOSMTPLIB_AVAILABLE = False
aiosmtplib = None
from sqlalchemy.orm import Session
from src.core.logger import logger
from src.services.system.config import ConfigService
from .email_template import EmailTemplate
class EmailSenderService:
"""邮件发送服务"""
@staticmethod
def _get_smtp_config(db: Session) -> dict:
"""
从数据库获取 SMTP 配置
Args:
db: 数据库会话
Returns:
SMTP 配置字典
"""
config = {
"smtp_host": ConfigService.get_config(db, "smtp_host"),
"smtp_port": ConfigService.get_config(db, "smtp_port", default=587),
"smtp_user": ConfigService.get_config(db, "smtp_user"),
"smtp_password": ConfigService.get_config(db, "smtp_password"),
"smtp_use_tls": ConfigService.get_config(db, "smtp_use_tls", default=True),
"smtp_use_ssl": ConfigService.get_config(db, "smtp_use_ssl", default=False),
"smtp_from_email": ConfigService.get_config(db, "smtp_from_email"),
"smtp_from_name": ConfigService.get_config(db, "smtp_from_name", default="Aether"),
}
return config
@staticmethod
def _validate_smtp_config(config: dict) -> Tuple[bool, Optional[str]]:
"""
验证 SMTP 配置
Args:
config: SMTP 配置字典
Returns:
(是否有效, 错误信息)
"""
required_fields = ["smtp_host", "smtp_from_email"]
for field in required_fields:
if not config.get(field):
return False, f"缺少必要的 SMTP 配置: {field}"
return True, None
@staticmethod
async def send_verification_code(
db: Session, to_email: str, code: str, expire_minutes: int = 30
) -> Tuple[bool, Optional[str]]:
"""
发送验证码邮件
Args:
db: 数据库会话
to_email: 收件人邮箱
code: 验证码
expire_minutes: 过期时间(分钟)
Returns:
(是否发送成功, 错误信息)
"""
# 获取 SMTP 配置
config = EmailSenderService._get_smtp_config(db)
# 验证配置
valid, error = EmailSenderService._validate_smtp_config(config)
if not valid:
logger.error(f"SMTP 配置无效: {error}")
return False, error
# 生成邮件内容
app_name = ConfigService.get_config(db, "smtp_from_name", default="Aether")
support_email = ConfigService.get_config(db, "smtp_support_email")
html_body = EmailTemplate.get_verification_code_html(
code=code, expire_minutes=expire_minutes, app_name=app_name, support_email=support_email
)
text_body = EmailTemplate.get_verification_code_text(
code=code, expire_minutes=expire_minutes, app_name=app_name, support_email=support_email
)
subject = EmailTemplate.get_subject("verification")
# 发送邮件
return await EmailSenderService._send_email(
config=config, to_email=to_email, subject=subject, html_body=html_body, text_body=text_body
)
@staticmethod
async def _send_email(
config: dict,
to_email: str,
subject: str,
html_body: Optional[str] = None,
text_body: Optional[str] = None,
) -> Tuple[bool, Optional[str]]:
"""
发送邮件(内部方法)
Args:
config: SMTP 配置
to_email: 收件人邮箱
subject: 邮件主题
html_body: HTML 邮件内容
text_body: 纯文本邮件内容
Returns:
(是否发送成功, 错误信息)
"""
if AIOSMTPLIB_AVAILABLE:
return await EmailSenderService._send_email_async(
config, to_email, subject, html_body, text_body
)
else:
return await EmailSenderService._send_email_sync_wrapper(
config, to_email, subject, html_body, text_body
)
@staticmethod
async def _send_email_async(
config: dict,
to_email: str,
subject: str,
html_body: Optional[str] = None,
text_body: Optional[str] = None,
) -> Tuple[bool, Optional[str]]:
"""
异步发送邮件(使用 aiosmtplib
Args:
config: SMTP 配置
to_email: 收件人邮箱
subject: 邮件主题
html_body: HTML 邮件内容
text_body: 纯文本邮件内容
Returns:
(是否发送成功, 错误信息)
"""
try:
# 构建邮件
message = MIMEMultipart("alternative")
message["Subject"] = subject
message["From"] = f"{config['smtp_from_name']} <{config['smtp_from_email']}>"
message["To"] = to_email
# 添加纯文本部分
if text_body:
message.attach(MIMEText(text_body, "plain", "utf-8"))
# 添加 HTML 部分
if html_body:
message.attach(MIMEText(html_body, "html", "utf-8"))
# 发送邮件
if config["smtp_use_ssl"]:
await aiosmtplib.send(
message,
hostname=config["smtp_host"],
port=config["smtp_port"],
use_tls=True,
username=config["smtp_user"],
password=config["smtp_password"],
)
else:
await aiosmtplib.send(
message,
hostname=config["smtp_host"],
port=config["smtp_port"],
start_tls=config["smtp_use_tls"],
username=config["smtp_user"],
password=config["smtp_password"],
)
logger.info(f"验证码邮件发送成功: {to_email}")
return True, None
except Exception as e:
error_msg = f"发送邮件失败: {str(e)}"
logger.error(error_msg)
return False, error_msg
@staticmethod
async def _send_email_sync_wrapper(
config: dict,
to_email: str,
subject: str,
html_body: Optional[str] = None,
text_body: Optional[str] = None,
) -> Tuple[bool, Optional[str]]:
"""
同步邮件发送的异步包装器
Args:
config: SMTP 配置
to_email: 收件人邮箱
subject: 邮件主题
html_body: HTML 邮件内容
text_body: 纯文本邮件内容
Returns:
(是否发送成功, 错误信息)
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, EmailSenderService._send_email_sync, config, to_email, subject, html_body, text_body
)
@staticmethod
def _send_email_sync(
config: dict,
to_email: str,
subject: str,
html_body: Optional[str] = None,
text_body: Optional[str] = None,
) -> Tuple[bool, Optional[str]]:
"""
同步发送邮件(使用标准库 smtplib
Args:
config: SMTP 配置
to_email: 收件人邮箱
subject: 邮件主题
html_body: HTML 邮件内容
text_body: 纯文本邮件内容
Returns:
(是否发送成功, 错误信息)
"""
try:
# 构建邮件
message = MIMEMultipart("alternative")
message["Subject"] = subject
message["From"] = f"{config['smtp_from_name']} <{config['smtp_from_email']}>"
message["To"] = to_email
# 添加纯文本部分
if text_body:
message.attach(MIMEText(text_body, "plain", "utf-8"))
# 添加 HTML 部分
if html_body:
message.attach(MIMEText(html_body, "html", "utf-8"))
# 连接 SMTP 服务器
server = None
try:
if config["smtp_use_ssl"]:
server = smtplib.SMTP_SSL(config["smtp_host"], config["smtp_port"])
else:
server = smtplib.SMTP(config["smtp_host"], config["smtp_port"])
if config["smtp_use_tls"]:
server.starttls()
# 登录
if config["smtp_user"] and config["smtp_password"]:
server.login(config["smtp_user"], config["smtp_password"])
# 发送邮件
server.send_message(message)
logger.info(f"验证码邮件发送成功(同步方式): {to_email}")
return True, None
finally:
# 确保服务器连接被关闭
if server is not None:
try:
server.quit()
except Exception as quit_error:
logger.warning(f"关闭 SMTP 连接时出错: {quit_error}")
except Exception as e:
error_msg = f"发送邮件失败: {str(e)}"
logger.error(error_msg)
return False, error_msg
@staticmethod
async def test_smtp_connection(
db: Session, override_config: Optional[dict] = None
) -> Tuple[bool, Optional[str]]:
"""
测试 SMTP 连接
Args:
db: 数据库会话
override_config: 可选的覆盖配置(通常来自未保存的前端表单)
Returns:
(是否连接成功, 错误信息)
"""
config = EmailSenderService._get_smtp_config(db)
# 用外部传入的配置覆盖(仅覆盖提供的字段)
if override_config:
config.update({k: v for k, v in override_config.items() if v is not None})
# 验证配置
valid, error = EmailSenderService._validate_smtp_config(config)
if not valid:
return False, error
try:
if AIOSMTPLIB_AVAILABLE:
# 使用异步方式测试
smtp = aiosmtplib.SMTP(
hostname=config["smtp_host"],
port=config["smtp_port"],
use_tls=config["smtp_use_ssl"],
)
await smtp.connect()
if config["smtp_use_tls"] and not config["smtp_use_ssl"]:
await smtp.starttls()
if config["smtp_user"] and config["smtp_password"]:
await smtp.login(config["smtp_user"], config["smtp_password"])
await smtp.quit()
else:
# 使用同步方式测试
if config["smtp_use_ssl"]:
server = smtplib.SMTP_SSL(config["smtp_host"], config["smtp_port"])
else:
server = smtplib.SMTP(config["smtp_host"], config["smtp_port"])
if config["smtp_use_tls"]:
server.starttls()
if config["smtp_user"] and config["smtp_password"]:
server.login(config["smtp_user"], config["smtp_password"])
server.quit()
logger.info("SMTP 连接测试成功")
return True, None
except Exception as e:
error_msg = f"SMTP 连接测试失败: {str(e)}"
logger.error(error_msg)
return False, error_msg

View File

@@ -0,0 +1,238 @@
"""
邮件模板
提供验证码邮件的 HTML 和纯文本模板
"""
from typing import Dict
class EmailTemplate:
"""邮件模板类"""
@staticmethod
def get_verification_code_html(code: str, expire_minutes: int = 30, **kwargs) -> str:
"""
获取验证码邮件 HTML 模板
Args:
code: 验证码
expire_minutes: 过期时间(分钟)
**kwargs: 其他模板变量
Returns:
HTML 邮件内容
"""
app_name = kwargs.get("app_name", "Aether")
support_email = kwargs.get("support_email", "")
html = f"""
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>邮箱验证码</title>
<style>
body {{
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif;
line-height: 1.6;
color: #333;
background-color: #f5f5f5;
margin: 0;
padding: 0;
}}
.container {{
max-width: 600px;
margin: 40px auto;
background-color: #ffffff;
border-radius: 8px;
box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1);
overflow: hidden;
}}
.header {{
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: #ffffff;
padding: 30px;
text-align: center;
}}
.header h1 {{
margin: 0;
font-size: 28px;
font-weight: 600;
}}
.content {{
padding: 40px 30px;
}}
.greeting {{
font-size: 18px;
margin-bottom: 20px;
color: #333;
}}
.message {{
font-size: 16px;
color: #666;
margin-bottom: 30px;
line-height: 1.8;
}}
.code-container {{
background-color: #f8f9fa;
border: 2px dashed #667eea;
border-radius: 8px;
padding: 30px;
text-align: center;
margin: 30px 0;
}}
.code-label {{
font-size: 14px;
color: #666;
margin-bottom: 10px;
text-transform: uppercase;
letter-spacing: 1px;
}}
.code {{
font-size: 36px;
font-weight: 700;
color: #667eea;
letter-spacing: 8px;
font-family: 'Courier New', Courier, monospace;
margin: 10px 0;
}}
.expire-info {{
font-size: 14px;
color: #999;
margin-top: 10px;
}}
.warning {{
background-color: #fff3cd;
border-left: 4px solid #ffc107;
padding: 15px;
margin: 20px 0;
font-size: 14px;
color: #856404;
}}
.footer {{
background-color: #f8f9fa;
padding: 20px 30px;
text-align: center;
font-size: 14px;
color: #999;
}}
.footer a {{
color: #667eea;
text-decoration: none;
}}
.divider {{
height: 1px;
background-color: #e9ecef;
margin: 30px 0;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>{app_name}</h1>
</div>
<div class="content">
<div class="greeting">您好!</div>
<div class="message">
感谢您注册 {app_name}。为了验证您的邮箱地址,请使用以下验证码完成注册流程:
</div>
<div class="code-container">
<div class="code-label">验证码</div>
<div class="code">{code}</div>
<div class="expire-info">
验证码有效期:{expire_minutes} 分钟
</div>
</div>
<div class="warning">
<strong>安全提示:</strong>
<ul style="margin: 10px 0; padding-left: 20px;">
<li>请勿将此验证码透露给任何人</li>
<li>如果您没有请求此验证码,请忽略此邮件</li>
<li>验证码在 {expire_minutes} 分钟后自动失效</li>
</ul>
</div>
<div class="divider"></div>
<div class="message" style="font-size: 14px;">
如果您在注册过程中遇到任何问题,请随时联系我们的支持团队。
</div>
</div>
<div class="footer">
<p>此邮件由系统自动发送,请勿直接回复。</p>
{f'<p>需要帮助?联系我们:<a href="mailto:{support_email}">{support_email}</a></p>' if support_email else ''}
<p>&copy; {app_name}. All rights reserved.</p>
</div>
</div>
</body>
</html>
"""
return html.strip()
@staticmethod
def get_verification_code_text(code: str, expire_minutes: int = 30, **kwargs) -> str:
"""
获取验证码邮件纯文本模板
Args:
code: 验证码
expire_minutes: 过期时间(分钟)
**kwargs: 其他模板变量
Returns:
纯文本邮件内容
"""
app_name = kwargs.get("app_name", "Aether")
support_email = kwargs.get("support_email", "")
text = f"""
{app_name} - 邮箱验证码
{'=' * 50}
您好!
感谢您注册 {app_name}。为了验证您的邮箱地址,请使用以下验证码完成注册流程:
验证码:{code}
验证码有效期:{expire_minutes} 分钟
{'=' * 50}
安全提示:
- 请勿将此验证码透露给任何人
- 如果您没有请求此验证码,请忽略此邮件
- 验证码在 {expire_minutes} 分钟后自动失效
{'=' * 50}
如果您在注册过程中遇到任何问题,请随时联系我们的支持团队。
{f'联系邮箱:{support_email}' if support_email else ''}
此邮件由系统自动发送,请勿直接回复。
&copy; {app_name}. All rights reserved.
"""
return text.strip()
@staticmethod
def get_subject(template_type: str = "verification") -> str:
"""
获取邮件主题
Args:
template_type: 模板类型
Returns:
邮件主题
"""
subjects = {
"verification": "邮箱验证码 - 请完成验证",
"welcome": "欢迎加入 Aether",
"password_reset": "密码重置验证码",
}
return subjects.get(template_type, "Aether 通知")

View File

@@ -0,0 +1,281 @@
"""
邮箱验证服务
提供验证码生成、发送、验证等功能
"""
import json
import secrets
from datetime import datetime, timezone
from typing import Optional, Tuple
from src.clients.redis_client import get_redis_client
from src.core.logger import logger
class EmailVerificationService:
"""邮箱验证码服务"""
# Redis key 前缀
VERIFICATION_PREFIX = "email:verification:"
SEND_LIMIT_PREFIX = "email:send_limit:"
VERIFIED_PREFIX = "email:verified:"
# 默认配置
DEFAULT_CODE_EXPIRE_MINUTES = 30
DEFAULT_MAX_ATTEMPTS = 5
SEND_COOLDOWN_SECONDS = 60
SEND_LIMIT_PER_HOUR = 5
@staticmethod
def _generate_code() -> str:
"""
生成 6 位数字验证码
Returns:
6 位数字字符串
"""
# 使用 secrets 模块生成安全的随机数
code = secrets.randbelow(1000000)
return f"{code:06d}"
@staticmethod
async def send_verification_code(
email: str, expire_minutes: Optional[int] = None
) -> Tuple[bool, str, Optional[str]]:
"""
发送验证码(生成并存储到 Redis
Args:
email: 目标邮箱地址
expire_minutes: 验证码过期时间分钟None 则使用默认值
Returns:
(是否成功, 验证码/错误信息, 错误详情)
"""
redis_client = await get_redis_client(require_redis=False)
if redis_client is None:
logger.error("Redis 不可用,无法发送验证码")
return False, "系统错误", "Redis 服务不可用"
try:
# 检查发送频率限制
send_limit_key = f"{EmailVerificationService.SEND_LIMIT_PREFIX}{email}"
send_count = await redis_client.get(send_limit_key)
if send_count:
send_count = int(send_count)
if send_count >= EmailVerificationService.SEND_LIMIT_PER_HOUR:
logger.warning(f"邮箱 {email} 发送验证码次数超限: {send_count}")
return False, "发送次数过多", "每小时最多发送 5 次验证码"
# 检查冷却时间
verification_key = f"{EmailVerificationService.VERIFICATION_PREFIX}{email}"
existing_data = await redis_client.get(verification_key)
if existing_data:
data = json.loads(existing_data)
created_at = datetime.fromisoformat(data["created_at"])
elapsed = (datetime.now(timezone.utc) - created_at).total_seconds()
if elapsed < EmailVerificationService.SEND_COOLDOWN_SECONDS:
remaining = int(EmailVerificationService.SEND_COOLDOWN_SECONDS - elapsed)
logger.warning(f"邮箱 {email} 请求验证码过于频繁,需等待 {remaining}")
return False, "请求过于频繁", f"请在 {remaining} 秒后重试"
# 生成验证码
code = EmailVerificationService._generate_code()
expire_time = expire_minutes or EmailVerificationService.DEFAULT_CODE_EXPIRE_MINUTES
# 存储验证码数据
verification_data = {
"code": code,
"attempts": 0,
"created_at": datetime.now(timezone.utc).isoformat(),
}
# 存储到 Redis设置过期时间
await redis_client.setex(
verification_key, expire_time * 60, json.dumps(verification_data)
)
# 更新发送计数器1 小时过期)- 使用原子操作
current_count = await redis_client.incr(send_limit_key)
# 如果是第一次设置,需要设置过期时间
if current_count == 1:
await redis_client.expire(send_limit_key, 3600)
logger.info(f"验证码已生成并存储: {email}, 有效期: {expire_time} 分钟")
return True, code, None
except Exception as e:
logger.error(f"发送验证码失败: {e}")
return False, "系统错误", str(e)
@staticmethod
async def verify_code(email: str, code: str) -> Tuple[bool, str]:
"""
验证验证码
Args:
email: 邮箱地址
code: 用户输入的验证码
Returns:
(是否验证成功, 错误信息)
"""
redis_client = await get_redis_client(require_redis=False)
if redis_client is None:
logger.error("Redis 不可用,无法验证验证码")
return False, "系统错误"
try:
verification_key = f"{EmailVerificationService.VERIFICATION_PREFIX}{email}"
data_str = await redis_client.get(verification_key)
if not data_str:
logger.warning(f"验证码不存在或已过期: {email}")
return False, "验证码不存在或已过期"
data = json.loads(data_str)
# 检查尝试次数
if data["attempts"] >= EmailVerificationService.DEFAULT_MAX_ATTEMPTS:
logger.warning(f"验证码尝试次数过多: {email}")
await redis_client.delete(verification_key)
return False, "验证码尝试次数过多,请重新发送"
# 增加尝试次数
data["attempts"] += 1
# 验证码比对 - 使用常量时间比较防止时序攻击
if not secrets.compare_digest(code, data["code"]):
# 更新尝试次数
ttl = await redis_client.ttl(verification_key)
if ttl > 0:
await redis_client.setex(verification_key, ttl, json.dumps(data))
remaining_attempts = EmailVerificationService.DEFAULT_MAX_ATTEMPTS - data["attempts"]
logger.warning(f"验证码错误: {email}, 剩余尝试次数: {remaining_attempts}")
return False, f"验证码错误,剩余尝试次数: {remaining_attempts}"
# 验证成功:删除验证码,标记邮箱已验证
await redis_client.delete(verification_key)
verified_key = f"{EmailVerificationService.VERIFIED_PREFIX}{email}"
# 已验证标记保留 1 小时,足够完成注册流程
await redis_client.setex(verified_key, 3600, "verified")
logger.info(f"验证码验证成功: {email}")
return True, "验证成功"
except Exception as e:
logger.error(f"验证验证码失败: {e}")
return False, "系统错误"
@staticmethod
async def is_email_verified(email: str) -> bool:
"""
检查邮箱是否已验证
Args:
email: 邮箱地址
Returns:
是否已验证
"""
redis_client = await get_redis_client(require_redis=False)
if redis_client is None:
logger.warning("Redis 不可用,跳过邮箱验证检查")
return False
try:
verified_key = f"{EmailVerificationService.VERIFIED_PREFIX}{email}"
verified = await redis_client.exists(verified_key)
return bool(verified)
except Exception as e:
logger.error(f"检查邮箱验证状态失败: {e}")
return False
@staticmethod
async def clear_verification(email: str) -> bool:
"""
清除邮箱验证状态(注册成功后调用)
Args:
email: 邮箱地址
Returns:
是否清除成功
"""
redis_client = await get_redis_client(require_redis=False)
if redis_client is None:
logger.warning("Redis 不可用,无法清除验证状态")
return False
try:
verified_key = f"{EmailVerificationService.VERIFIED_PREFIX}{email}"
verification_key = f"{EmailVerificationService.VERIFICATION_PREFIX}{email}"
# 删除已验证标记和验证码(如果还存在)
await redis_client.delete(verified_key)
await redis_client.delete(verification_key)
logger.info(f"邮箱验证状态已清除: {email}")
return True
except Exception as e:
logger.error(f"清除邮箱验证状态失败: {e}")
return False
@staticmethod
async def get_verification_status(email: str) -> dict:
"""
获取邮箱验证状态(用于调试和管理)
Args:
email: 邮箱地址
Returns:
验证状态信息
"""
redis_client = await get_redis_client(require_redis=False)
if redis_client is None:
return {"error": "Redis 不可用"}
try:
verification_key = f"{EmailVerificationService.VERIFICATION_PREFIX}{email}"
verified_key = f"{EmailVerificationService.VERIFIED_PREFIX}{email}"
send_limit_key = f"{EmailVerificationService.SEND_LIMIT_PREFIX}{email}"
# 获取各个状态
verification_data = await redis_client.get(verification_key)
is_verified = await redis_client.exists(verified_key)
send_count = await redis_client.get(send_limit_key)
verification_ttl = await redis_client.ttl(verification_key)
verified_ttl = await redis_client.ttl(verified_key)
status = {
"email": email,
"has_pending_code": bool(verification_data),
"is_verified": bool(is_verified),
"send_count_this_hour": int(send_count) if send_count else 0,
"code_expires_in": verification_ttl if verification_ttl > 0 else None,
"verified_expires_in": verified_ttl if verified_ttl > 0 else None,
}
if verification_data:
data = json.loads(verification_data)
status["attempts"] = data.get("attempts", 0)
status["created_at"] = data.get("created_at")
return status
except Exception as e:
logger.error(f"获取邮箱验证状态失败: {e}")
return {"error": str(e)}