diff --git a/src/services/system/cleanup_scheduler.py b/src/services/system/cleanup_scheduler.py index 13ab5ee..62d6f64 100644 --- a/src/services/system/cleanup_scheduler.py +++ b/src/services/system/cleanup_scheduler.py @@ -142,7 +142,7 @@ class CleanupScheduler: """执行统计聚合任务 Args: - backfill: 是否回填历史数据(首次启动时使用) + backfill: 是否回填历史数据(启动时检查缺失的日期) """ db = create_session() try: @@ -153,12 +153,25 @@ class CleanupScheduler: logger.info("开始执行统计数据聚合...") + from src.models.database import StatsDaily, User as DBUser + from src.services.system.scheduler import APP_TIMEZONE + from zoneinfo import ZoneInfo + + # 使用业务时区计算日期,确保与定时任务触发时间一致 + # 定时任务在 Asia/Shanghai 凌晨 1 点触发,此时应聚合 Asia/Shanghai 的"昨天" + app_tz = ZoneInfo(APP_TIMEZONE) + now_local = datetime.now(app_tz) + today_local = now_local.replace(hour=0, minute=0, second=0, microsecond=0) + # 转换为 UTC 用于数据库查询(stats_daily.date 存储的是 UTC) + today = today_local.astimezone(timezone.utc).replace(tzinfo=timezone.utc) + if backfill: - # 首次启动时回填历史数据 + # 启动时检查并回填缺失的日期 from src.models.database import StatsSummary summary = db.query(StatsSummary).first() if not summary: + # 首次运行,回填所有历史数据 logger.info("检测到首次运行,开始回填历史统计数据...") days_to_backfill = SystemConfigService.get_config( db, "stats_backfill_days", 365 @@ -169,17 +182,68 @@ class CleanupScheduler: logger.info(f"历史数据回填完成,共 {count} 天") return - # 聚合昨天的数据 - now = datetime.now(timezone.utc) - yesterday = (now - timedelta(days=1)).replace( - hour=0, minute=0, second=0, microsecond=0 - ) + # 非首次运行,检查最近是否有缺失的日期需要回填 + latest_stat = ( + db.query(StatsDaily) + .order_by(StatsDaily.date.desc()) + .first() + ) + + if latest_stat: + latest_date = latest_stat.date.replace(tzinfo=timezone.utc) + # 计算缺失的天数(从最新记录的下一天到昨天) + yesterday = today - timedelta(days=1) + missing_start = latest_date + timedelta(days=1) + + if missing_start <= yesterday: + missing_days = (yesterday - missing_start).days + 1 + logger.info( + f"检测到缺失 {missing_days} 天的统计数据 " + f"({missing_start.date()} ~ {yesterday.date()}),开始回填..." + ) + + current_date = missing_start + users = db.query(DBUser.id).filter(DBUser.is_active.is_(True)).all() + + while current_date <= yesterday: + try: + StatsAggregatorService.aggregate_daily_stats(db, current_date) + # 聚合用户数据 + for (user_id,) in users: + try: + StatsAggregatorService.aggregate_user_daily_stats( + db, user_id, current_date + ) + except Exception as e: + logger.warning( + f"回填用户 {user_id} 日期 {current_date.date()} 失败: {e}" + ) + try: + db.rollback() + except Exception: + pass + except Exception as e: + logger.warning(f"回填日期 {current_date.date()} 失败: {e}") + try: + db.rollback() + except Exception: + pass + + current_date += timedelta(days=1) + + # 更新全局汇总 + StatsAggregatorService.update_summary(db) + logger.info(f"缺失数据回填完成,共 {missing_days} 天") + else: + logger.info("统计数据已是最新,无需回填") + return + + # 定时任务:聚合昨天的数据 + yesterday = (today - timedelta(days=1)) StatsAggregatorService.aggregate_daily_stats(db, yesterday) # 聚合所有用户的昨日数据 - from src.models.database import User as DBUser - users = db.query(DBUser.id).filter(DBUser.is_active.is_(True)).all() for (user_id,) in users: try: diff --git a/src/services/system/stats_aggregator.py b/src/services/system/stats_aggregator.py index c73c075..d0b59a2 100644 --- a/src/services/system/stats_aggregator.py +++ b/src/services/system/stats_aggregator.py @@ -3,6 +3,7 @@ 实现预聚合统计,避免每次请求都全表扫描。 """ +import os import uuid from datetime import datetime, timedelta, timezone from typing import Optional @@ -21,6 +22,35 @@ from src.models.database import ( ) from src.models.database import User as DBUser +# 业务时区配置 +APP_TIMEZONE = os.getenv("APP_TIMEZONE", "Asia/Shanghai") + + +def _get_business_day_range(date: datetime) -> tuple[datetime, datetime]: + """将业务时区的日期转换为 UTC 时间范围 + + Args: + date: 业务时区的日期(只使用日期部分) + + Returns: + (day_start_utc, day_end_utc): UTC 时间范围 + """ + from zoneinfo import ZoneInfo + + app_tz = ZoneInfo(APP_TIMEZONE) + + # 取日期部分,构造业务时区的当天 00:00:00 + day_start_local = datetime( + date.year, date.month, date.day, 0, 0, 0, tzinfo=app_tz + ) + day_end_local = day_start_local + timedelta(days=1) + + # 转换为 UTC + day_start_utc = day_start_local.astimezone(timezone.utc) + day_end_utc = day_end_local.astimezone(timezone.utc) + + return day_start_utc, day_end_utc + class StatsAggregatorService: """统计数据聚合服务""" @@ -31,15 +61,15 @@ class StatsAggregatorService: Args: db: 数据库会话 - date: 要聚合的日期 (会自动转为 UTC 当天开始) + date: 要聚合的业务日期(使用 APP_TIMEZONE 时区) Returns: StatsDaily 记录 """ - # 确保日期是 UTC 当天开始 - day_start = date.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc) - day_end = day_start + timedelta(days=1) + # 将业务日期转换为 UTC 时间范围 + day_start, day_end = _get_business_day_range(date) + # stats_daily.date 存储的是业务日期对应的 UTC 开始时间 # 检查是否已存在该日期的记录 existing = db.query(StatsDaily).filter(StatsDaily.date == day_start).first() if existing: @@ -172,8 +202,8 @@ class StatsAggregatorService: db: Session, user_id: str, date: datetime ) -> StatsUserDaily: """聚合指定用户指定日期的统计数据""" - day_start = date.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc) - day_end = day_start + timedelta(days=1) + # 将业务日期转换为 UTC 时间范围 + day_start, day_end = _get_business_day_range(date) existing = ( db.query(StatsUserDaily) @@ -256,9 +286,15 @@ class StatsAggregatorService: 汇总截止到昨天的所有数据。 """ - now = datetime.now(timezone.utc) - today = now.replace(hour=0, minute=0, second=0, microsecond=0) - cutoff_date = today # 不含今天 + from zoneinfo import ZoneInfo + + app_tz = ZoneInfo(APP_TIMEZONE) + + # 使用业务时区计算今天 + now_local = datetime.now(app_tz) + today_local = now_local.replace(hour=0, minute=0, second=0, microsecond=0) + # 转换为 UTC 用于与 stats_daily.date 比较 + cutoff_date = today_local.astimezone(timezone.utc) # 获取或创建 summary 记录 summary = db.query(StatsSummary).first() @@ -311,16 +347,23 @@ class StatsAggregatorService: db.add(summary) db.commit() - logger.info(f"[StatsAggregator] 更新全局汇总完成,截止日期: {cutoff_date.date()}") + logger.info(f"[StatsAggregator] 更新全局汇总完成,截止日期: {today_local.date()}") return summary @staticmethod def get_today_realtime_stats(db: Session) -> dict: """获取今日实时统计(用于与预聚合数据合并)""" - now = datetime.now(timezone.utc) - today = now.replace(hour=0, minute=0, second=0, microsecond=0) + from zoneinfo import ZoneInfo - base_query = db.query(Usage).filter(Usage.created_at >= today) + app_tz = ZoneInfo(APP_TIMEZONE) + + # 使用业务时区计算今天的开始时间 + now_local = datetime.now(app_tz) + today_local = now_local.replace(hour=0, minute=0, second=0, microsecond=0) + # 转换为 UTC 用于查询 + today_utc = today_local.astimezone(timezone.utc) + + base_query = db.query(Usage).filter(Usage.created_at >= today_utc) total_requests = base_query.count() @@ -352,7 +395,7 @@ class StatsAggregatorService: func.sum(Usage.total_cost_usd).label("total_cost"), func.sum(Usage.actual_total_cost_usd).label("actual_total_cost"), ) - .filter(Usage.created_at >= today) + .filter(Usage.created_at >= today_utc) .first() ) @@ -408,8 +451,13 @@ class StatsAggregatorService: Returns: 回填的天数 """ - now = datetime.now(timezone.utc) - today = now.replace(hour=0, minute=0, second=0, microsecond=0) + from zoneinfo import ZoneInfo + + app_tz = ZoneInfo(APP_TIMEZONE) + + # 使用业务时区计算今天 + now_local = datetime.now(app_tz) + today_local = now_local.replace(hour=0, minute=0, second=0, microsecond=0) # 找到最早的 Usage 记录 earliest = db.query(func.min(Usage.created_at)).scalar() @@ -417,13 +465,15 @@ class StatsAggregatorService: logger.info("[StatsAggregator] 没有历史数据需要回填") return 0 - # 计算需要回填的日期范围 - earliest_date = earliest.replace(hour=0, minute=0, second=0, microsecond=0) - start_date = max(earliest_date, today - timedelta(days=days)) + # 将最早记录时间转换为业务时区的日期 + earliest_local = earliest.astimezone(app_tz).replace( + hour=0, minute=0, second=0, microsecond=0 + ) + start_date = max(earliest_local, today_local - timedelta(days=days)) count = 0 current_date = start_date - while current_date < today: + while current_date < today_local: StatsAggregatorService.aggregate_daily_stats(db, current_date) count += 1 current_date += timedelta(days=1)