From 5722b1422ed6c66c50e5ae898cf8bc50dc1949bf Mon Sep 17 00:00:00 2001 From: fawney19 Date: Fri, 12 Dec 2025 09:48:17 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=90=AF=E5=8A=A8=E6=97=B6=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E5=9B=9E=E5=A1=AB=E7=BC=BA=E5=A4=B1=E7=9A=84=E7=BB=9F?= =?UTF-8?q?=E8=AE=A1=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/services/system/cleanup_scheduler.py | 75 +++++++++++++++++++++--- 1 file changed, 66 insertions(+), 9 deletions(-) diff --git a/src/services/system/cleanup_scheduler.py b/src/services/system/cleanup_scheduler.py index 13ab5ee..461fc97 100644 --- a/src/services/system/cleanup_scheduler.py +++ b/src/services/system/cleanup_scheduler.py @@ -142,7 +142,7 @@ class CleanupScheduler: """执行统计聚合任务 Args: - backfill: 是否回填历史数据(首次启动时使用) + backfill: 是否回填历史数据(启动时检查缺失的日期) """ db = create_session() try: @@ -153,12 +153,18 @@ class CleanupScheduler: logger.info("开始执行统计数据聚合...") + from src.models.database import StatsDaily, User as DBUser + + now = datetime.now(timezone.utc) + today = now.replace(hour=0, minute=0, second=0, microsecond=0) + if backfill: - # 首次启动时回填历史数据 + # 启动时检查并回填缺失的日期 from src.models.database import StatsSummary summary = db.query(StatsSummary).first() if not summary: + # 首次运行,回填所有历史数据 logger.info("检测到首次运行,开始回填历史统计数据...") days_to_backfill = SystemConfigService.get_config( db, "stats_backfill_days", 365 @@ -169,17 +175,68 @@ class CleanupScheduler: logger.info(f"历史数据回填完成,共 {count} 天") return - # 聚合昨天的数据 - now = datetime.now(timezone.utc) - yesterday = (now - timedelta(days=1)).replace( - hour=0, minute=0, second=0, microsecond=0 - ) + # 非首次运行,检查最近是否有缺失的日期需要回填 + latest_stat = ( + db.query(StatsDaily) + .order_by(StatsDaily.date.desc()) + .first() + ) + + if latest_stat: + latest_date = latest_stat.date.replace(tzinfo=timezone.utc) + # 计算缺失的天数(从最新记录的下一天到昨天) + yesterday = today - timedelta(days=1) + missing_start = latest_date + timedelta(days=1) + + if missing_start <= yesterday: + missing_days = (yesterday - missing_start).days + 1 + logger.info( + f"检测到缺失 {missing_days} 天的统计数据 " + f"({missing_start.date()} ~ {yesterday.date()}),开始回填..." + ) + + current_date = missing_start + users = db.query(DBUser.id).filter(DBUser.is_active.is_(True)).all() + + while current_date <= yesterday: + try: + StatsAggregatorService.aggregate_daily_stats(db, current_date) + # 聚合用户数据 + for (user_id,) in users: + try: + StatsAggregatorService.aggregate_user_daily_stats( + db, user_id, current_date + ) + except Exception as e: + logger.warning( + f"回填用户 {user_id} 日期 {current_date.date()} 失败: {e}" + ) + try: + db.rollback() + except Exception: + pass + except Exception as e: + logger.warning(f"回填日期 {current_date.date()} 失败: {e}") + try: + db.rollback() + except Exception: + pass + + current_date += timedelta(days=1) + + # 更新全局汇总 + StatsAggregatorService.update_summary(db) + logger.info(f"缺失数据回填完成,共 {missing_days} 天") + else: + logger.info("统计数据已是最新,无需回填") + return + + # 定时任务:聚合昨天的数据 + yesterday = (today - timedelta(days=1)) StatsAggregatorService.aggregate_daily_stats(db, yesterday) # 聚合所有用户的昨日数据 - from src.models.database import User as DBUser - users = db.query(DBUser.id).filter(DBUser.is_active.is_(True)).all() for (user_id,) in users: try: