mirror of
https://github.com/fawney19/Aether.git
synced 2026-01-03 00:02:28 +08:00
@@ -142,7 +142,7 @@ class CleanupScheduler:
|
||||
"""执行统计聚合任务
|
||||
|
||||
Args:
|
||||
backfill: 是否回填历史数据(首次启动时使用)
|
||||
backfill: 是否回填历史数据(启动时检查缺失的日期)
|
||||
"""
|
||||
db = create_session()
|
||||
try:
|
||||
@@ -153,12 +153,25 @@ class CleanupScheduler:
|
||||
|
||||
logger.info("开始执行统计数据聚合...")
|
||||
|
||||
from src.models.database import StatsDaily, User as DBUser
|
||||
from src.services.system.scheduler import APP_TIMEZONE
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
# 使用业务时区计算日期,确保与定时任务触发时间一致
|
||||
# 定时任务在 Asia/Shanghai 凌晨 1 点触发,此时应聚合 Asia/Shanghai 的"昨天"
|
||||
app_tz = ZoneInfo(APP_TIMEZONE)
|
||||
now_local = datetime.now(app_tz)
|
||||
today_local = now_local.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
# 转换为 UTC 用于数据库查询(stats_daily.date 存储的是 UTC)
|
||||
today = today_local.astimezone(timezone.utc).replace(tzinfo=timezone.utc)
|
||||
|
||||
if backfill:
|
||||
# 首次启动时回填历史数据
|
||||
# 启动时检查并回填缺失的日期
|
||||
from src.models.database import StatsSummary
|
||||
|
||||
summary = db.query(StatsSummary).first()
|
||||
if not summary:
|
||||
# 首次运行,回填所有历史数据
|
||||
logger.info("检测到首次运行,开始回填历史统计数据...")
|
||||
days_to_backfill = SystemConfigService.get_config(
|
||||
db, "stats_backfill_days", 365
|
||||
@@ -169,17 +182,68 @@ class CleanupScheduler:
|
||||
logger.info(f"历史数据回填完成,共 {count} 天")
|
||||
return
|
||||
|
||||
# 聚合昨天的数据
|
||||
now = datetime.now(timezone.utc)
|
||||
yesterday = (now - timedelta(days=1)).replace(
|
||||
hour=0, minute=0, second=0, microsecond=0
|
||||
# 非首次运行,检查最近是否有缺失的日期需要回填
|
||||
latest_stat = (
|
||||
db.query(StatsDaily)
|
||||
.order_by(StatsDaily.date.desc())
|
||||
.first()
|
||||
)
|
||||
|
||||
if latest_stat:
|
||||
latest_date = latest_stat.date.replace(tzinfo=timezone.utc)
|
||||
# 计算缺失的天数(从最新记录的下一天到昨天)
|
||||
yesterday = today - timedelta(days=1)
|
||||
missing_start = latest_date + timedelta(days=1)
|
||||
|
||||
if missing_start <= yesterday:
|
||||
missing_days = (yesterday - missing_start).days + 1
|
||||
logger.info(
|
||||
f"检测到缺失 {missing_days} 天的统计数据 "
|
||||
f"({missing_start.date()} ~ {yesterday.date()}),开始回填..."
|
||||
)
|
||||
|
||||
current_date = missing_start
|
||||
users = db.query(DBUser.id).filter(DBUser.is_active.is_(True)).all()
|
||||
|
||||
while current_date <= yesterday:
|
||||
try:
|
||||
StatsAggregatorService.aggregate_daily_stats(db, current_date)
|
||||
# 聚合用户数据
|
||||
for (user_id,) in users:
|
||||
try:
|
||||
StatsAggregatorService.aggregate_user_daily_stats(
|
||||
db, user_id, current_date
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"回填用户 {user_id} 日期 {current_date.date()} 失败: {e}"
|
||||
)
|
||||
try:
|
||||
db.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.warning(f"回填日期 {current_date.date()} 失败: {e}")
|
||||
try:
|
||||
db.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
current_date += timedelta(days=1)
|
||||
|
||||
# 更新全局汇总
|
||||
StatsAggregatorService.update_summary(db)
|
||||
logger.info(f"缺失数据回填完成,共 {missing_days} 天")
|
||||
else:
|
||||
logger.info("统计数据已是最新,无需回填")
|
||||
return
|
||||
|
||||
# 定时任务:聚合昨天的数据
|
||||
yesterday = (today - timedelta(days=1))
|
||||
|
||||
StatsAggregatorService.aggregate_daily_stats(db, yesterday)
|
||||
|
||||
# 聚合所有用户的昨日数据
|
||||
from src.models.database import User as DBUser
|
||||
|
||||
users = db.query(DBUser.id).filter(DBUser.is_active.is_(True)).all()
|
||||
for (user_id,) in users:
|
||||
try:
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
实现预聚合统计,避免每次请求都全表扫描。
|
||||
"""
|
||||
|
||||
import os
|
||||
import uuid
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Optional
|
||||
@@ -21,6 +22,35 @@ from src.models.database import (
|
||||
)
|
||||
from src.models.database import User as DBUser
|
||||
|
||||
# 业务时区配置
|
||||
APP_TIMEZONE = os.getenv("APP_TIMEZONE", "Asia/Shanghai")
|
||||
|
||||
|
||||
def _get_business_day_range(date: datetime) -> tuple[datetime, datetime]:
|
||||
"""将业务时区的日期转换为 UTC 时间范围
|
||||
|
||||
Args:
|
||||
date: 业务时区的日期(只使用日期部分)
|
||||
|
||||
Returns:
|
||||
(day_start_utc, day_end_utc): UTC 时间范围
|
||||
"""
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
app_tz = ZoneInfo(APP_TIMEZONE)
|
||||
|
||||
# 取日期部分,构造业务时区的当天 00:00:00
|
||||
day_start_local = datetime(
|
||||
date.year, date.month, date.day, 0, 0, 0, tzinfo=app_tz
|
||||
)
|
||||
day_end_local = day_start_local + timedelta(days=1)
|
||||
|
||||
# 转换为 UTC
|
||||
day_start_utc = day_start_local.astimezone(timezone.utc)
|
||||
day_end_utc = day_end_local.astimezone(timezone.utc)
|
||||
|
||||
return day_start_utc, day_end_utc
|
||||
|
||||
|
||||
class StatsAggregatorService:
|
||||
"""统计数据聚合服务"""
|
||||
@@ -31,15 +61,15 @@ class StatsAggregatorService:
|
||||
|
||||
Args:
|
||||
db: 数据库会话
|
||||
date: 要聚合的日期 (会自动转为 UTC 当天开始)
|
||||
date: 要聚合的业务日期(使用 APP_TIMEZONE 时区)
|
||||
|
||||
Returns:
|
||||
StatsDaily 记录
|
||||
"""
|
||||
# 确保日期是 UTC 当天开始
|
||||
day_start = date.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc)
|
||||
day_end = day_start + timedelta(days=1)
|
||||
# 将业务日期转换为 UTC 时间范围
|
||||
day_start, day_end = _get_business_day_range(date)
|
||||
|
||||
# stats_daily.date 存储的是业务日期对应的 UTC 开始时间
|
||||
# 检查是否已存在该日期的记录
|
||||
existing = db.query(StatsDaily).filter(StatsDaily.date == day_start).first()
|
||||
if existing:
|
||||
@@ -172,8 +202,8 @@ class StatsAggregatorService:
|
||||
db: Session, user_id: str, date: datetime
|
||||
) -> StatsUserDaily:
|
||||
"""聚合指定用户指定日期的统计数据"""
|
||||
day_start = date.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc)
|
||||
day_end = day_start + timedelta(days=1)
|
||||
# 将业务日期转换为 UTC 时间范围
|
||||
day_start, day_end = _get_business_day_range(date)
|
||||
|
||||
existing = (
|
||||
db.query(StatsUserDaily)
|
||||
@@ -256,9 +286,15 @@ class StatsAggregatorService:
|
||||
|
||||
汇总截止到昨天的所有数据。
|
||||
"""
|
||||
now = datetime.now(timezone.utc)
|
||||
today = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
cutoff_date = today # 不含今天
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
app_tz = ZoneInfo(APP_TIMEZONE)
|
||||
|
||||
# 使用业务时区计算今天
|
||||
now_local = datetime.now(app_tz)
|
||||
today_local = now_local.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
# 转换为 UTC 用于与 stats_daily.date 比较
|
||||
cutoff_date = today_local.astimezone(timezone.utc)
|
||||
|
||||
# 获取或创建 summary 记录
|
||||
summary = db.query(StatsSummary).first()
|
||||
@@ -311,16 +347,23 @@ class StatsAggregatorService:
|
||||
db.add(summary)
|
||||
db.commit()
|
||||
|
||||
logger.info(f"[StatsAggregator] 更新全局汇总完成,截止日期: {cutoff_date.date()}")
|
||||
logger.info(f"[StatsAggregator] 更新全局汇总完成,截止日期: {today_local.date()}")
|
||||
return summary
|
||||
|
||||
@staticmethod
|
||||
def get_today_realtime_stats(db: Session) -> dict:
|
||||
"""获取今日实时统计(用于与预聚合数据合并)"""
|
||||
now = datetime.now(timezone.utc)
|
||||
today = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
base_query = db.query(Usage).filter(Usage.created_at >= today)
|
||||
app_tz = ZoneInfo(APP_TIMEZONE)
|
||||
|
||||
# 使用业务时区计算今天的开始时间
|
||||
now_local = datetime.now(app_tz)
|
||||
today_local = now_local.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
# 转换为 UTC 用于查询
|
||||
today_utc = today_local.astimezone(timezone.utc)
|
||||
|
||||
base_query = db.query(Usage).filter(Usage.created_at >= today_utc)
|
||||
|
||||
total_requests = base_query.count()
|
||||
|
||||
@@ -352,7 +395,7 @@ class StatsAggregatorService:
|
||||
func.sum(Usage.total_cost_usd).label("total_cost"),
|
||||
func.sum(Usage.actual_total_cost_usd).label("actual_total_cost"),
|
||||
)
|
||||
.filter(Usage.created_at >= today)
|
||||
.filter(Usage.created_at >= today_utc)
|
||||
.first()
|
||||
)
|
||||
|
||||
@@ -408,8 +451,13 @@ class StatsAggregatorService:
|
||||
Returns:
|
||||
回填的天数
|
||||
"""
|
||||
now = datetime.now(timezone.utc)
|
||||
today = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
app_tz = ZoneInfo(APP_TIMEZONE)
|
||||
|
||||
# 使用业务时区计算今天
|
||||
now_local = datetime.now(app_tz)
|
||||
today_local = now_local.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
|
||||
# 找到最早的 Usage 记录
|
||||
earliest = db.query(func.min(Usage.created_at)).scalar()
|
||||
@@ -417,13 +465,15 @@ class StatsAggregatorService:
|
||||
logger.info("[StatsAggregator] 没有历史数据需要回填")
|
||||
return 0
|
||||
|
||||
# 计算需要回填的日期范围
|
||||
earliest_date = earliest.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
start_date = max(earliest_date, today - timedelta(days=days))
|
||||
# 将最早记录时间转换为业务时区的日期
|
||||
earliest_local = earliest.astimezone(app_tz).replace(
|
||||
hour=0, minute=0, second=0, microsecond=0
|
||||
)
|
||||
start_date = max(earliest_local, today_local - timedelta(days=days))
|
||||
|
||||
count = 0
|
||||
current_date = start_date
|
||||
while current_date < today:
|
||||
while current_date < today_local:
|
||||
StatsAggregatorService.aggregate_daily_stats(db, current_date)
|
||||
count += 1
|
||||
current_date += timedelta(days=1)
|
||||
|
||||
Reference in New Issue
Block a user