From 03ad16ea8a62d08746f35e6118c229d7a432ce2d Mon Sep 17 00:00:00 2001 From: fawney19 Date: Wed, 24 Dec 2025 21:50:05 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E8=BF=81=E7=A7=BB?= =?UTF-8?q?=E8=84=9A=E6=9C=AC=E5=9C=A8=E5=85=A8=E6=96=B0=E5=AE=89=E8=A3=85?= =?UTF-8?q?=E6=97=B6=E6=8A=A5=E9=94=99=E5=8F=8A=E6=94=B9=E8=BF=9B=E7=BB=9F?= =?UTF-8?q?=E8=AE=A1=E5=9B=9E=E5=A1=AB=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 迁移脚本修复: - 移除 AUTOCOMMIT 模式,改为在同一事务中创建索引 - 分别检查每个索引是否存在,只创建缺失的索引 - 修复全新安装时 AUTOCOMMIT 连接看不到未提交表的问题 (#46) 统计回填改进: - 分别检查 StatsDaily 和 StatsDailyModel 的缺失日期 - 只回填实际缺失的数据而非连续区间 - 添加失败统计计数和 rollback 错误日志 --- ...251220_1500_add_usage_composite_indexes.py | 62 +++---- src/services/system/cleanup_scheduler.py | 164 +++++++++++------- 2 files changed, 123 insertions(+), 103 deletions(-) diff --git a/alembic/versions/20251220_1500_add_usage_composite_indexes.py b/alembic/versions/20251220_1500_add_usage_composite_indexes.py index 01d519b..e516ba9 100644 --- a/alembic/versions/20251220_1500_add_usage_composite_indexes.py +++ b/alembic/versions/20251220_1500_add_usage_composite_indexes.py @@ -18,49 +18,35 @@ depends_on = None def upgrade() -> None: """为 usage 表添加复合索引以优化常见查询 - 使用 CONCURRENTLY 创建索引以避免锁表, - 但需要在 AUTOCOMMIT 模式下执行(不能在事务内) - - 注意:如果是从全新数据库执行(baseline 刚创建表), - 由于 AUTOCOMMIT 连接看不到事务中未提交的表,会跳过索引创建。 - 这种情况下索引会在下次迁移或手动创建。 + 注意:这些索引已经在 baseline 迁移中创建。 + 此迁移仅用于从旧版本升级的场景,新安装会跳过。 """ conn = op.get_bind() - engine = conn.engine - # 使用新连接并设置 AUTOCOMMIT 模式以支持 CREATE INDEX CONCURRENTLY - with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as autocommit_conn: - # 检查 usage 表是否存在(在 AUTOCOMMIT 连接中可见) - # 如果表不存在(例如 baseline 迁移还在事务中),跳过索引创建 - result = autocommit_conn.execute(text( - "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'usage')" + # 检查 usage 表是否存在 + result = conn.execute(text( + "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'usage')" + )) + if not result.scalar(): + # 表不存在,跳过 + return + + # 定义需要创建的索引 + indexes = [ + ("idx_usage_user_created", "ON usage (user_id, created_at)"), + ("idx_usage_apikey_created", "ON usage (api_key_id, created_at)"), + ("idx_usage_provider_model_created", "ON usage (provider, model, created_at)"), + ] + + # 分别检查并创建每个索引 + for index_name, index_def in indexes: + result = conn.execute(text( + f"SELECT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = '{index_name}')" )) - table_exists = result.scalar() + if result.scalar(): + continue # 索引已存在,跳过 - if not table_exists: - # 表在当前连接不可见(可能 baseline 还在事务中),跳过 - # 索引将通过后续迁移或手动创建 - return - - # 使用 IF NOT EXISTS 避免重复创建,无需单独检查索引是否存在 - - # 1. user_id + created_at 复合索引 (用户用量查询) - autocommit_conn.execute(text( - "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_usage_user_created " - "ON usage (user_id, created_at)" - )) - - # 2. api_key_id + created_at 复合索引 (API Key 用量查询) - autocommit_conn.execute(text( - "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_usage_apikey_created " - "ON usage (api_key_id, created_at)" - )) - - # 3. provider + model + created_at 复合索引 (模型统计查询) - autocommit_conn.execute(text( - "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_usage_provider_model_created " - "ON usage (provider, model, created_at)" - )) + conn.execute(text(f"CREATE INDEX {index_name} {index_def}")) def downgrade() -> None: diff --git a/src/services/system/cleanup_scheduler.py b/src/services/system/cleanup_scheduler.py index 4588b19..f98a559 100644 --- a/src/services/system/cleanup_scheduler.py +++ b/src/services/system/cleanup_scheduler.py @@ -208,86 +208,120 @@ class CleanupScheduler: return # 非首次运行,检查最近是否有缺失的日期需要回填 - latest_stat = db.query(StatsDaily).order_by(StatsDaily.date.desc()).first() + from src.models.database import StatsDailyModel - if latest_stat: - latest_date_utc = latest_stat.date - if latest_date_utc.tzinfo is None: - latest_date_utc = latest_date_utc.replace(tzinfo=timezone.utc) - else: - latest_date_utc = latest_date_utc.astimezone(timezone.utc) + yesterday_business_date = today_local.date() - timedelta(days=1) + max_backfill_days: int = SystemConfigService.get_config( + db, "max_stats_backfill_days", 30 + ) or 30 - # 使用业务日期计算缺失区间(避免用 UTC 年月日导致日期偏移,且对 DST 更安全) - latest_business_date = latest_date_utc.astimezone(app_tz).date() - yesterday_business_date = today_local.date() - timedelta(days=1) - missing_start_date = latest_business_date + timedelta(days=1) + # 计算回填检查的起始日期 + check_start_date = yesterday_business_date - timedelta( + days=max_backfill_days - 1 + ) - if missing_start_date <= yesterday_business_date: - missing_days = ( - yesterday_business_date - missing_start_date - ).days + 1 + # 获取 StatsDaily 和 StatsDailyModel 中已有数据的日期集合 + existing_daily_dates = set() + existing_model_dates = set() - # 限制最大回填天数,防止停机很久后一次性回填太多 - max_backfill_days: int = SystemConfigService.get_config( - db, "max_stats_backfill_days", 30 - ) or 30 - if missing_days > max_backfill_days: - logger.warning( - f"缺失 {missing_days} 天数据超过最大回填限制 " - f"{max_backfill_days} 天,只回填最近 {max_backfill_days} 天" + daily_stats = ( + db.query(StatsDaily.date) + .filter(StatsDaily.date >= check_start_date.isoformat()) + .all() + ) + for (stat_date,) in daily_stats: + if stat_date.tzinfo is None: + stat_date = stat_date.replace(tzinfo=timezone.utc) + existing_daily_dates.add(stat_date.astimezone(app_tz).date()) + + model_stats = ( + db.query(StatsDailyModel.date) + .filter(StatsDailyModel.date >= check_start_date.isoformat()) + .distinct() + .all() + ) + for (stat_date,) in model_stats: + if stat_date.tzinfo is None: + stat_date = stat_date.replace(tzinfo=timezone.utc) + existing_model_dates.add(stat_date.astimezone(app_tz).date()) + + # 找出需要回填的日期 + all_dates = set() + current = check_start_date + while current <= yesterday_business_date: + all_dates.add(current) + current += timedelta(days=1) + + # 需要回填 StatsDaily 的日期 + missing_daily_dates = all_dates - existing_daily_dates + # 需要回填 StatsDailyModel 的日期 + missing_model_dates = all_dates - existing_model_dates + # 合并所有需要处理的日期 + dates_to_process = missing_daily_dates | missing_model_dates + + if dates_to_process: + sorted_dates = sorted(dates_to_process) + logger.info( + f"检测到 {len(dates_to_process)} 天的统计数据需要回填 " + f"(StatsDaily 缺失 {len(missing_daily_dates)} 天, " + f"StatsDailyModel 缺失 {len(missing_model_dates)} 天)" + ) + + users = ( + db.query(DBUser.id).filter(DBUser.is_active.is_(True)).all() + ) + + failed_dates = 0 + failed_users = 0 + + for current_date in sorted_dates: + try: + current_date_local = datetime.combine( + current_date, datetime.min.time(), tzinfo=app_tz ) - missing_start_date = yesterday_business_date - timedelta( - days=max_backfill_days - 1 - ) - missing_days = max_backfill_days - - logger.info( - f"检测到缺失 {missing_days} 天的统计数据 " - f"({missing_start_date} ~ {yesterday_business_date}),开始回填..." - ) - - current_date = missing_start_date - users = ( - db.query(DBUser.id).filter(DBUser.is_active.is_(True)).all() - ) - - while current_date <= yesterday_business_date: - try: - current_date_local = datetime.combine( - current_date, datetime.min.time(), tzinfo=app_tz - ) + # 只在缺失时才聚合对应的表 + if current_date in missing_daily_dates: StatsAggregatorService.aggregate_daily_stats( db, current_date_local ) + if current_date in missing_model_dates: StatsAggregatorService.aggregate_daily_model_stats( db, current_date_local ) - for (user_id,) in users: - try: - StatsAggregatorService.aggregate_user_daily_stats( - db, user_id, current_date_local - ) - except Exception as e: - logger.warning( - f"回填用户 {user_id} 日期 {current_date} 失败: {e}" - ) - try: - db.rollback() - except Exception: - pass - except Exception as e: - logger.warning(f"回填日期 {current_date} 失败: {e}") + # 用户统计在任一缺失时都回填 + for (user_id,) in users: try: - db.rollback() - except Exception: - pass + StatsAggregatorService.aggregate_user_daily_stats( + db, user_id, current_date_local + ) + except Exception as e: + failed_users += 1 + logger.warning( + f"回填用户 {user_id} 日期 {current_date} 失败: {e}" + ) + try: + db.rollback() + except Exception as rollback_err: + logger.error(f"回滚失败: {rollback_err}") + except Exception as e: + failed_dates += 1 + logger.warning(f"回填日期 {current_date} 失败: {e}") + try: + db.rollback() + except Exception as rollback_err: + logger.error(f"回滚失败: {rollback_err}") - current_date += timedelta(days=1) + StatsAggregatorService.update_summary(db) - StatsAggregatorService.update_summary(db) - logger.info(f"缺失数据回填完成,共 {missing_days} 天") + if failed_dates > 0 or failed_users > 0: + logger.warning( + f"回填完成,共处理 {len(dates_to_process)} 天," + f"失败: {failed_dates} 天, {failed_users} 个用户记录" + ) else: - logger.info("统计数据已是最新,无需回填") + logger.info(f"缺失数据回填完成,共处理 {len(dates_to_process)} 天") + else: + logger.info("统计数据已是最新,无需回填") return # 定时任务:聚合昨天的数据