From 39defce71c6dc20d0e4e34187f076bf3f4c6d8b5 Mon Sep 17 00:00:00 2001 From: fawney19 Date: Fri, 12 Dec 2025 10:06:07 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E7=BB=9F=E8=AE=A1?= =?UTF-8?q?=E8=81=9A=E5=90=88=E7=9A=84=E6=97=B6=E5=8C=BA=E9=97=AE=E9=A2=98?= =?UTF-8?q?=EF=BC=8C=E5=90=AF=E5=8A=A8=E6=97=B6=E8=87=AA=E5=8A=A8=E5=9B=9E?= =?UTF-8?q?=E5=A1=AB=E7=BC=BA=E5=A4=B1=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 统计聚合使用业务时区(APP_TIMEZONE)计算日期,而非UTC - 新增 _get_business_day_range() 将业务日期转换为UTC时间范围 - 启动时检查并自动回填因容器重启等原因缺失的统计数据 - 修复 aggregate_daily_stats/update_summary/get_today_realtime_stats 等方法的时区计算 --- src/services/system/cleanup_scheduler.py | 11 ++- src/services/system/stats_aggregator.py | 90 ++++++++++++++++++------ 2 files changed, 79 insertions(+), 22 deletions(-) diff --git a/src/services/system/cleanup_scheduler.py b/src/services/system/cleanup_scheduler.py index 461fc97..62d6f64 100644 --- a/src/services/system/cleanup_scheduler.py +++ b/src/services/system/cleanup_scheduler.py @@ -154,9 +154,16 @@ class CleanupScheduler: logger.info("开始执行统计数据聚合...") from src.models.database import StatsDaily, User as DBUser + from src.services.system.scheduler import APP_TIMEZONE + from zoneinfo import ZoneInfo - now = datetime.now(timezone.utc) - today = now.replace(hour=0, minute=0, second=0, microsecond=0) + # 使用业务时区计算日期,确保与定时任务触发时间一致 + # 定时任务在 Asia/Shanghai 凌晨 1 点触发,此时应聚合 Asia/Shanghai 的"昨天" + app_tz = ZoneInfo(APP_TIMEZONE) + now_local = datetime.now(app_tz) + today_local = now_local.replace(hour=0, minute=0, second=0, microsecond=0) + # 转换为 UTC 用于数据库查询(stats_daily.date 存储的是 UTC) + today = today_local.astimezone(timezone.utc).replace(tzinfo=timezone.utc) if backfill: # 启动时检查并回填缺失的日期 diff --git a/src/services/system/stats_aggregator.py b/src/services/system/stats_aggregator.py index c73c075..d0b59a2 100644 --- a/src/services/system/stats_aggregator.py +++ b/src/services/system/stats_aggregator.py @@ -3,6 +3,7 @@ 实现预聚合统计,避免每次请求都全表扫描。 """ +import os import uuid from datetime import datetime, timedelta, timezone from typing import Optional @@ -21,6 +22,35 @@ from src.models.database import ( ) from src.models.database import User as DBUser +# 业务时区配置 +APP_TIMEZONE = os.getenv("APP_TIMEZONE", "Asia/Shanghai") + + +def _get_business_day_range(date: datetime) -> tuple[datetime, datetime]: + """将业务时区的日期转换为 UTC 时间范围 + + Args: + date: 业务时区的日期(只使用日期部分) + + Returns: + (day_start_utc, day_end_utc): UTC 时间范围 + """ + from zoneinfo import ZoneInfo + + app_tz = ZoneInfo(APP_TIMEZONE) + + # 取日期部分,构造业务时区的当天 00:00:00 + day_start_local = datetime( + date.year, date.month, date.day, 0, 0, 0, tzinfo=app_tz + ) + day_end_local = day_start_local + timedelta(days=1) + + # 转换为 UTC + day_start_utc = day_start_local.astimezone(timezone.utc) + day_end_utc = day_end_local.astimezone(timezone.utc) + + return day_start_utc, day_end_utc + class StatsAggregatorService: """统计数据聚合服务""" @@ -31,15 +61,15 @@ class StatsAggregatorService: Args: db: 数据库会话 - date: 要聚合的日期 (会自动转为 UTC 当天开始) + date: 要聚合的业务日期(使用 APP_TIMEZONE 时区) Returns: StatsDaily 记录 """ - # 确保日期是 UTC 当天开始 - day_start = date.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc) - day_end = day_start + timedelta(days=1) + # 将业务日期转换为 UTC 时间范围 + day_start, day_end = _get_business_day_range(date) + # stats_daily.date 存储的是业务日期对应的 UTC 开始时间 # 检查是否已存在该日期的记录 existing = db.query(StatsDaily).filter(StatsDaily.date == day_start).first() if existing: @@ -172,8 +202,8 @@ class StatsAggregatorService: db: Session, user_id: str, date: datetime ) -> StatsUserDaily: """聚合指定用户指定日期的统计数据""" - day_start = date.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc) - day_end = day_start + timedelta(days=1) + # 将业务日期转换为 UTC 时间范围 + day_start, day_end = _get_business_day_range(date) existing = ( db.query(StatsUserDaily) @@ -256,9 +286,15 @@ class StatsAggregatorService: 汇总截止到昨天的所有数据。 """ - now = datetime.now(timezone.utc) - today = now.replace(hour=0, minute=0, second=0, microsecond=0) - cutoff_date = today # 不含今天 + from zoneinfo import ZoneInfo + + app_tz = ZoneInfo(APP_TIMEZONE) + + # 使用业务时区计算今天 + now_local = datetime.now(app_tz) + today_local = now_local.replace(hour=0, minute=0, second=0, microsecond=0) + # 转换为 UTC 用于与 stats_daily.date 比较 + cutoff_date = today_local.astimezone(timezone.utc) # 获取或创建 summary 记录 summary = db.query(StatsSummary).first() @@ -311,16 +347,23 @@ class StatsAggregatorService: db.add(summary) db.commit() - logger.info(f"[StatsAggregator] 更新全局汇总完成,截止日期: {cutoff_date.date()}") + logger.info(f"[StatsAggregator] 更新全局汇总完成,截止日期: {today_local.date()}") return summary @staticmethod def get_today_realtime_stats(db: Session) -> dict: """获取今日实时统计(用于与预聚合数据合并)""" - now = datetime.now(timezone.utc) - today = now.replace(hour=0, minute=0, second=0, microsecond=0) + from zoneinfo import ZoneInfo - base_query = db.query(Usage).filter(Usage.created_at >= today) + app_tz = ZoneInfo(APP_TIMEZONE) + + # 使用业务时区计算今天的开始时间 + now_local = datetime.now(app_tz) + today_local = now_local.replace(hour=0, minute=0, second=0, microsecond=0) + # 转换为 UTC 用于查询 + today_utc = today_local.astimezone(timezone.utc) + + base_query = db.query(Usage).filter(Usage.created_at >= today_utc) total_requests = base_query.count() @@ -352,7 +395,7 @@ class StatsAggregatorService: func.sum(Usage.total_cost_usd).label("total_cost"), func.sum(Usage.actual_total_cost_usd).label("actual_total_cost"), ) - .filter(Usage.created_at >= today) + .filter(Usage.created_at >= today_utc) .first() ) @@ -408,8 +451,13 @@ class StatsAggregatorService: Returns: 回填的天数 """ - now = datetime.now(timezone.utc) - today = now.replace(hour=0, minute=0, second=0, microsecond=0) + from zoneinfo import ZoneInfo + + app_tz = ZoneInfo(APP_TIMEZONE) + + # 使用业务时区计算今天 + now_local = datetime.now(app_tz) + today_local = now_local.replace(hour=0, minute=0, second=0, microsecond=0) # 找到最早的 Usage 记录 earliest = db.query(func.min(Usage.created_at)).scalar() @@ -417,13 +465,15 @@ class StatsAggregatorService: logger.info("[StatsAggregator] 没有历史数据需要回填") return 0 - # 计算需要回填的日期范围 - earliest_date = earliest.replace(hour=0, minute=0, second=0, microsecond=0) - start_date = max(earliest_date, today - timedelta(days=days)) + # 将最早记录时间转换为业务时区的日期 + earliest_local = earliest.astimezone(app_tz).replace( + hour=0, minute=0, second=0, microsecond=0 + ) + start_date = max(earliest_local, today_local - timedelta(days=days)) count = 0 current_date = start_date - while current_date < today: + while current_date < today_local: StatsAggregatorService.aggregate_daily_stats(db, current_date) count += 1 current_date += timedelta(days=1)