mirror of
https://github.com/fawney19/Aether.git
synced 2026-01-02 15:52:26 +08:00
fix: 启动时自动回填缺失的统计数据
This commit is contained in:
@@ -142,7 +142,7 @@ class CleanupScheduler:
|
||||
"""执行统计聚合任务
|
||||
|
||||
Args:
|
||||
backfill: 是否回填历史数据(首次启动时使用)
|
||||
backfill: 是否回填历史数据(启动时检查缺失的日期)
|
||||
"""
|
||||
db = create_session()
|
||||
try:
|
||||
@@ -153,12 +153,18 @@ class CleanupScheduler:
|
||||
|
||||
logger.info("开始执行统计数据聚合...")
|
||||
|
||||
from src.models.database import StatsDaily, User as DBUser
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
today = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
|
||||
if backfill:
|
||||
# 首次启动时回填历史数据
|
||||
# 启动时检查并回填缺失的日期
|
||||
from src.models.database import StatsSummary
|
||||
|
||||
summary = db.query(StatsSummary).first()
|
||||
if not summary:
|
||||
# 首次运行,回填所有历史数据
|
||||
logger.info("检测到首次运行,开始回填历史统计数据...")
|
||||
days_to_backfill = SystemConfigService.get_config(
|
||||
db, "stats_backfill_days", 365
|
||||
@@ -169,17 +175,68 @@ class CleanupScheduler:
|
||||
logger.info(f"历史数据回填完成,共 {count} 天")
|
||||
return
|
||||
|
||||
# 聚合昨天的数据
|
||||
now = datetime.now(timezone.utc)
|
||||
yesterday = (now - timedelta(days=1)).replace(
|
||||
hour=0, minute=0, second=0, microsecond=0
|
||||
# 非首次运行,检查最近是否有缺失的日期需要回填
|
||||
latest_stat = (
|
||||
db.query(StatsDaily)
|
||||
.order_by(StatsDaily.date.desc())
|
||||
.first()
|
||||
)
|
||||
|
||||
if latest_stat:
|
||||
latest_date = latest_stat.date.replace(tzinfo=timezone.utc)
|
||||
# 计算缺失的天数(从最新记录的下一天到昨天)
|
||||
yesterday = today - timedelta(days=1)
|
||||
missing_start = latest_date + timedelta(days=1)
|
||||
|
||||
if missing_start <= yesterday:
|
||||
missing_days = (yesterday - missing_start).days + 1
|
||||
logger.info(
|
||||
f"检测到缺失 {missing_days} 天的统计数据 "
|
||||
f"({missing_start.date()} ~ {yesterday.date()}),开始回填..."
|
||||
)
|
||||
|
||||
current_date = missing_start
|
||||
users = db.query(DBUser.id).filter(DBUser.is_active.is_(True)).all()
|
||||
|
||||
while current_date <= yesterday:
|
||||
try:
|
||||
StatsAggregatorService.aggregate_daily_stats(db, current_date)
|
||||
# 聚合用户数据
|
||||
for (user_id,) in users:
|
||||
try:
|
||||
StatsAggregatorService.aggregate_user_daily_stats(
|
||||
db, user_id, current_date
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"回填用户 {user_id} 日期 {current_date.date()} 失败: {e}"
|
||||
)
|
||||
try:
|
||||
db.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.warning(f"回填日期 {current_date.date()} 失败: {e}")
|
||||
try:
|
||||
db.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
current_date += timedelta(days=1)
|
||||
|
||||
# 更新全局汇总
|
||||
StatsAggregatorService.update_summary(db)
|
||||
logger.info(f"缺失数据回填完成,共 {missing_days} 天")
|
||||
else:
|
||||
logger.info("统计数据已是最新,无需回填")
|
||||
return
|
||||
|
||||
# 定时任务:聚合昨天的数据
|
||||
yesterday = (today - timedelta(days=1))
|
||||
|
||||
StatsAggregatorService.aggregate_daily_stats(db, yesterday)
|
||||
|
||||
# 聚合所有用户的昨日数据
|
||||
from src.models.database import User as DBUser
|
||||
|
||||
users = db.query(DBUser.id).filter(DBUser.is_active.is_(True)).all()
|
||||
for (user_id,) in users:
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user