From d696c575e6299e0392723cd6b4c1bb521bd59744 Mon Sep 17 00:00:00 2001 From: fawney19 Date: Tue, 16 Dec 2025 17:46:38 +0800 Subject: [PATCH] refactor(migrations): add idempotency checks to migration scripts --- ...f0cbf_remove_model_mappings_add_aliases.py | 238 +++++++++++------- ...a_add_first_byte_time_ms_to_usage_table.py | 25 +- ...6f_refactor_global_model_to_use_config_.py | 82 +++--- 3 files changed, 226 insertions(+), 119 deletions(-) diff --git a/alembic/versions/20251214_1300_e9b3d63f0cbf_remove_model_mappings_add_aliases.py b/alembic/versions/20251214_1300_e9b3d63f0cbf_remove_model_mappings_add_aliases.py index 0d721f0..9049616 100644 --- a/alembic/versions/20251214_1300_e9b3d63f0cbf_remove_model_mappings_add_aliases.py +++ b/alembic/versions/20251214_1300_e9b3d63f0cbf_remove_model_mappings_add_aliases.py @@ -26,16 +26,66 @@ branch_labels = None depends_on = None +def column_exists(bind, table_name: str, column_name: str) -> bool: + """检查列是否存在""" + result = bind.execute( + sa.text( + """ + SELECT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = :table_name AND column_name = :column_name + ) + """ + ), + {"table_name": table_name, "column_name": column_name}, + ) + return result.scalar() + + +def table_exists(bind, table_name: str) -> bool: + """检查表是否存在""" + result = bind.execute( + sa.text( + """ + SELECT EXISTS ( + SELECT 1 FROM information_schema.tables + WHERE table_name = :table_name + ) + """ + ), + {"table_name": table_name}, + ) + return result.scalar() + + +def index_exists(bind, index_name: str) -> bool: + """检查索引是否存在""" + result = bind.execute( + sa.text( + """ + SELECT EXISTS ( + SELECT 1 FROM pg_indexes + WHERE indexname = :index_name + ) + """ + ), + {"index_name": index_name}, + ) + return result.scalar() + + def upgrade() -> None: """添加 provider_model_aliases 字段,迁移数据,删除 model_mappings 表""" - # 1. 添加 provider_model_aliases 字段 - op.add_column( - 'models', - sa.Column('provider_model_aliases', sa.JSON(), nullable=True) - ) - - # 2. 迁移 model_mappings 数据 bind = op.get_bind() + + # 1. 添加 provider_model_aliases 字段(如果不存在) + if not column_exists(bind, "models", "provider_model_aliases"): + op.add_column( + 'models', + sa.Column('provider_model_aliases', sa.JSON(), nullable=True) + ) + + # 2. 迁移 model_mappings 数据(如果表存在) session = Session(bind=bind) model_mappings_table = sa.table( @@ -96,104 +146,118 @@ def upgrade() -> None: # 查询所有活跃的 provider 级别 alias(只迁移 is_active=True 且 mapping_type='alias' 的) # 全局别名/映射不迁移(新架构不再支持 source_model -> GlobalModel.name 的解析) - mappings = session.execute( - sa.select( - model_mappings_table.c.source_model, - model_mappings_table.c.target_global_model_id, - model_mappings_table.c.provider_id, - ) - .where( - model_mappings_table.c.is_active.is_(True), - model_mappings_table.c.provider_id.isnot(None), - model_mappings_table.c.mapping_type == "alias", - ) - .order_by(model_mappings_table.c.provider_id, model_mappings_table.c.source_model) - ).all() - - # 按 (provider_id, target_global_model_id) 分组,收集别名 - alias_groups: dict = {} - for source_model, target_global_model_id, provider_id in mappings: - if not isinstance(source_model, str): - continue - source_model = source_model.strip() - if not source_model: - continue - if not isinstance(provider_id, str) or not provider_id: - continue - if not isinstance(target_global_model_id, str) or not target_global_model_id: - continue - - key = (provider_id, target_global_model_id) - if key not in alias_groups: - alias_groups[key] = [] - priority = len(alias_groups[key]) + 1 - alias_groups[key].append({"name": source_model, "priority": priority}) - - # 更新对应的 models 记录 - for (provider_id, global_model_id), aliases in alias_groups.items(): - model_row = session.execute( - sa.select(models_table.c.id, models_table.c.provider_model_aliases) + # 仅当 model_mappings 表存在时执行迁移 + if table_exists(bind, "model_mappings"): + mappings = session.execute( + sa.select( + model_mappings_table.c.source_model, + model_mappings_table.c.target_global_model_id, + model_mappings_table.c.provider_id, + ) .where( - models_table.c.provider_id == provider_id, - models_table.c.global_model_id == global_model_id, + model_mappings_table.c.is_active.is_(True), + model_mappings_table.c.provider_id.isnot(None), + model_mappings_table.c.mapping_type == "alias", ) - .limit(1) - ).first() + .order_by(model_mappings_table.c.provider_id, model_mappings_table.c.source_model) + ).all() - if model_row: - model_id = model_row[0] - existing_aliases = normalize_alias_list(model_row[1]) + # 按 (provider_id, target_global_model_id) 分组,收集别名 + alias_groups: dict = {} + for source_model, target_global_model_id, provider_id in mappings: + if not isinstance(source_model, str): + continue + source_model = source_model.strip() + if not source_model: + continue + if not isinstance(provider_id, str) or not provider_id: + continue + if not isinstance(target_global_model_id, str) or not target_global_model_id: + continue - existing_names = {a["name"] for a in existing_aliases} - merged_aliases = list(existing_aliases) - for alias in aliases: - name = alias.get("name") - if not isinstance(name, str): - continue - name = name.strip() - if not name or name in existing_names: - continue + key = (provider_id, target_global_model_id) + if key not in alias_groups: + alias_groups[key] = [] + priority = len(alias_groups[key]) + 1 + alias_groups[key].append({"name": source_model, "priority": priority}) - merged_aliases.append( - { - "name": name, - "priority": len(merged_aliases) + 1, - } + # 更新对应的 models 记录 + for (provider_id, global_model_id), aliases in alias_groups.items(): + model_row = session.execute( + sa.select(models_table.c.id, models_table.c.provider_model_aliases) + .where( + models_table.c.provider_id == provider_id, + models_table.c.global_model_id == global_model_id, ) - existing_names.add(name) + .limit(1) + ).first() - session.execute( - models_table.update() - .where(models_table.c.id == model_id) - .values( - provider_model_aliases=merged_aliases if merged_aliases else None, - updated_at=datetime.now(timezone.utc), + if model_row: + model_id = model_row[0] + existing_aliases = normalize_alias_list(model_row[1]) + + existing_names = {a["name"] for a in existing_aliases} + merged_aliases = list(existing_aliases) + for alias in aliases: + name = alias.get("name") + if not isinstance(name, str): + continue + name = name.strip() + if not name or name in existing_names: + continue + + merged_aliases.append( + { + "name": name, + "priority": len(merged_aliases) + 1, + } + ) + existing_names.add(name) + + session.execute( + models_table.update() + .where(models_table.c.id == model_id) + .values( + provider_model_aliases=merged_aliases if merged_aliases else None, + updated_at=datetime.now(timezone.utc), + ) ) - ) - session.commit() + session.commit() - # 3. 删除 model_mappings 表 - op.drop_table('model_mappings') + # 3. 删除 model_mappings 表 + op.drop_table('model_mappings') # 4. 添加索引优化别名解析性能 - # provider_model_name 索引(支持精确匹配) - op.create_index( - "idx_model_provider_model_name", - "models", - ["provider_model_name"], - unique=False, - postgresql_where=sa.text("is_active = true"), - ) + # provider_model_name 索引(支持精确匹配,如果不存在) + if not index_exists(bind, "idx_model_provider_model_name"): + op.create_index( + "idx_model_provider_model_name", + "models", + ["provider_model_name"], + unique=False, + postgresql_where=sa.text("is_active = true"), + ) # provider_model_aliases GIN 索引(支持 JSONB 查询,仅 PostgreSQL) if bind.dialect.name == "postgresql": # 将 json 列转为 jsonb(jsonb 性能更好且支持 GIN 索引) + # 使用 IF NOT EXISTS 风格的检查来避免重复转换 op.execute( """ - ALTER TABLE models - ALTER COLUMN provider_model_aliases TYPE jsonb - USING provider_model_aliases::jsonb + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'models' + AND column_name = 'provider_model_aliases' + AND data_type = 'json' + ) THEN + ALTER TABLE models + ALTER COLUMN provider_model_aliases TYPE jsonb + USING provider_model_aliases::jsonb; + END IF; + END $$; """ ) # 创建 GIN 索引 diff --git a/alembic/versions/20251215_1707_180e63a9c83a_add_first_byte_time_ms_to_usage_table.py b/alembic/versions/20251215_1707_180e63a9c83a_add_first_byte_time_ms_to_usage_table.py index 3d93a13..d29177a 100644 --- a/alembic/versions/20251215_1707_180e63a9c83a_add_first_byte_time_ms_to_usage_table.py +++ b/alembic/versions/20251215_1707_180e63a9c83a_add_first_byte_time_ms_to_usage_table.py @@ -5,8 +5,8 @@ Revises: e9b3d63f0cbf Create Date: 2025-12-15 17:07:44.631032+00:00 """ -from alembic import op import sqlalchemy as sa +from alembic import op # revision identifiers, used by Alembic. @@ -16,10 +16,29 @@ branch_labels = None depends_on = None +def column_exists(bind, table_name: str, column_name: str) -> bool: + """检查列是否存在""" + result = bind.execute( + sa.text( + """ + SELECT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = :table_name AND column_name = :column_name + ) + """ + ), + {"table_name": table_name, "column_name": column_name}, + ) + return result.scalar() + + def upgrade() -> None: """应用迁移:升级到新版本""" - # 添加首字时间字段到 usage 表 - op.add_column('usage', sa.Column('first_byte_time_ms', sa.Integer(), nullable=True)) + bind = op.get_bind() + + # 添加首字时间字段到 usage 表(如果不存在) + if not column_exists(bind, "usage", "first_byte_time_ms"): + op.add_column('usage', sa.Column('first_byte_time_ms', sa.Integer(), nullable=True)) def downgrade() -> None: diff --git a/alembic/versions/20251216_0311_1cc6942cf06f_refactor_global_model_to_use_config_.py b/alembic/versions/20251216_0311_1cc6942cf06f_refactor_global_model_to_use_config_.py index fd60af8..cc05705 100644 --- a/alembic/versions/20251216_0311_1cc6942cf06f_refactor_global_model_to_use_config_.py +++ b/alembic/versions/20251216_0311_1cc6942cf06f_refactor_global_model_to_use_config_.py @@ -5,8 +5,8 @@ Revises: 180e63a9c83a Create Date: 2025-12-16 03:11:32.480976+00:00 """ -from alembic import op import sqlalchemy as sa +from alembic import op from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. @@ -16,6 +16,22 @@ branch_labels = None depends_on = None +def column_exists(bind, table_name: str, column_name: str) -> bool: + """检查列是否存在""" + result = bind.execute( + sa.text( + """ + SELECT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = :table_name AND column_name = :column_name + ) + """ + ), + {"table_name": table_name, "column_name": column_name}, + ) + return result.scalar() + + def upgrade() -> None: """应用迁移:升级到新版本 @@ -23,37 +39,45 @@ def upgrade() -> None: 2. 把旧数据迁移到 config 3. 删除旧列 """ + bind = op.get_bind() + + # 检查是否已经迁移过(config 列存在且旧列不存在) + has_config = column_exists(bind, "global_models", "config") + has_old_columns = column_exists(bind, "global_models", "default_supports_streaming") + + if has_config and not has_old_columns: + # 已完成迁移,跳过 + return + # 1. 添加 config 列(使用 JSONB 类型,支持索引和更高效的查询) - op.add_column('global_models', sa.Column('config', postgresql.JSONB(), nullable=True)) + if not has_config: + op.add_column('global_models', sa.Column('config', postgresql.JSONB(), nullable=True)) - # 2. 迁移数据:把旧字段合并到 config JSON - # 注意:使用 COALESCE 为布尔字段设置默认值,避免数据丢失 - # - streaming 默认 true(大多数模型支持) - # - 其他能力默认 false - # - jsonb_strip_nulls 只移除 null 字段,不影响 false 值 - op.execute(""" - UPDATE global_models - SET config = jsonb_strip_nulls(jsonb_build_object( - 'streaming', COALESCE(default_supports_streaming, true), - 'vision', CASE WHEN COALESCE(default_supports_vision, false) THEN true ELSE NULL END, - 'function_calling', CASE WHEN COALESCE(default_supports_function_calling, false) THEN true ELSE NULL END, - 'extended_thinking', CASE WHEN COALESCE(default_supports_extended_thinking, false) THEN true ELSE NULL END, - 'image_generation', CASE WHEN COALESCE(default_supports_image_generation, false) THEN true ELSE NULL END, - 'description', description, - 'icon_url', icon_url, - 'official_url', official_url - )) - """) + # 2. 迁移数据:把旧字段合并到 config JSON(仅当旧列存在时) + if has_old_columns: + op.execute(""" + UPDATE global_models + SET config = jsonb_strip_nulls(jsonb_build_object( + 'streaming', COALESCE(default_supports_streaming, true), + 'vision', CASE WHEN COALESCE(default_supports_vision, false) THEN true ELSE NULL END, + 'function_calling', CASE WHEN COALESCE(default_supports_function_calling, false) THEN true ELSE NULL END, + 'extended_thinking', CASE WHEN COALESCE(default_supports_extended_thinking, false) THEN true ELSE NULL END, + 'image_generation', CASE WHEN COALESCE(default_supports_image_generation, false) THEN true ELSE NULL END, + 'description', description, + 'icon_url', icon_url, + 'official_url', official_url + )) + """) - # 3. 删除旧列 - op.drop_column('global_models', 'default_supports_streaming') - op.drop_column('global_models', 'default_supports_vision') - op.drop_column('global_models', 'default_supports_function_calling') - op.drop_column('global_models', 'default_supports_extended_thinking') - op.drop_column('global_models', 'default_supports_image_generation') - op.drop_column('global_models', 'description') - op.drop_column('global_models', 'icon_url') - op.drop_column('global_models', 'official_url') + # 3. 删除旧列 + op.drop_column('global_models', 'default_supports_streaming') + op.drop_column('global_models', 'default_supports_vision') + op.drop_column('global_models', 'default_supports_function_calling') + op.drop_column('global_models', 'default_supports_extended_thinking') + op.drop_column('global_models', 'default_supports_image_generation') + op.drop_column('global_models', 'description') + op.drop_column('global_models', 'icon_url') + op.drop_column('global_models', 'official_url') def downgrade() -> None: