refactor(migrations): add idempotency checks to migration scripts

This commit is contained in:
fawney19
2025-12-16 17:46:38 +08:00
parent 46ff5a1a50
commit d696c575e6
3 changed files with 226 additions and 119 deletions

View File

@@ -26,16 +26,66 @@ branch_labels = None
depends_on = None depends_on = None
def column_exists(bind, table_name: str, column_name: str) -> bool:
"""检查列是否存在"""
result = bind.execute(
sa.text(
"""
SELECT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = :table_name AND column_name = :column_name
)
"""
),
{"table_name": table_name, "column_name": column_name},
)
return result.scalar()
def table_exists(bind, table_name: str) -> bool:
"""检查表是否存在"""
result = bind.execute(
sa.text(
"""
SELECT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_name = :table_name
)
"""
),
{"table_name": table_name},
)
return result.scalar()
def index_exists(bind, index_name: str) -> bool:
"""检查索引是否存在"""
result = bind.execute(
sa.text(
"""
SELECT EXISTS (
SELECT 1 FROM pg_indexes
WHERE indexname = :index_name
)
"""
),
{"index_name": index_name},
)
return result.scalar()
def upgrade() -> None: def upgrade() -> None:
"""添加 provider_model_aliases 字段,迁移数据,删除 model_mappings 表""" """添加 provider_model_aliases 字段,迁移数据,删除 model_mappings 表"""
# 1. 添加 provider_model_aliases 字段
op.add_column(
'models',
sa.Column('provider_model_aliases', sa.JSON(), nullable=True)
)
# 2. 迁移 model_mappings 数据
bind = op.get_bind() bind = op.get_bind()
# 1. 添加 provider_model_aliases 字段(如果不存在)
if not column_exists(bind, "models", "provider_model_aliases"):
op.add_column(
'models',
sa.Column('provider_model_aliases', sa.JSON(), nullable=True)
)
# 2. 迁移 model_mappings 数据(如果表存在)
session = Session(bind=bind) session = Session(bind=bind)
model_mappings_table = sa.table( model_mappings_table = sa.table(
@@ -96,104 +146,118 @@ def upgrade() -> None:
# 查询所有活跃的 provider 级别 alias只迁移 is_active=True 且 mapping_type='alias' 的) # 查询所有活跃的 provider 级别 alias只迁移 is_active=True 且 mapping_type='alias' 的)
# 全局别名/映射不迁移(新架构不再支持 source_model -> GlobalModel.name 的解析) # 全局别名/映射不迁移(新架构不再支持 source_model -> GlobalModel.name 的解析)
mappings = session.execute( # 仅当 model_mappings 表存在时执行迁移
sa.select( if table_exists(bind, "model_mappings"):
model_mappings_table.c.source_model, mappings = session.execute(
model_mappings_table.c.target_global_model_id, sa.select(
model_mappings_table.c.provider_id, model_mappings_table.c.source_model,
) model_mappings_table.c.target_global_model_id,
.where( model_mappings_table.c.provider_id,
model_mappings_table.c.is_active.is_(True), )
model_mappings_table.c.provider_id.isnot(None),
model_mappings_table.c.mapping_type == "alias",
)
.order_by(model_mappings_table.c.provider_id, model_mappings_table.c.source_model)
).all()
# 按 (provider_id, target_global_model_id) 分组,收集别名
alias_groups: dict = {}
for source_model, target_global_model_id, provider_id in mappings:
if not isinstance(source_model, str):
continue
source_model = source_model.strip()
if not source_model:
continue
if not isinstance(provider_id, str) or not provider_id:
continue
if not isinstance(target_global_model_id, str) or not target_global_model_id:
continue
key = (provider_id, target_global_model_id)
if key not in alias_groups:
alias_groups[key] = []
priority = len(alias_groups[key]) + 1
alias_groups[key].append({"name": source_model, "priority": priority})
# 更新对应的 models 记录
for (provider_id, global_model_id), aliases in alias_groups.items():
model_row = session.execute(
sa.select(models_table.c.id, models_table.c.provider_model_aliases)
.where( .where(
models_table.c.provider_id == provider_id, model_mappings_table.c.is_active.is_(True),
models_table.c.global_model_id == global_model_id, model_mappings_table.c.provider_id.isnot(None),
model_mappings_table.c.mapping_type == "alias",
) )
.limit(1) .order_by(model_mappings_table.c.provider_id, model_mappings_table.c.source_model)
).first() ).all()
if model_row: # 按 (provider_id, target_global_model_id) 分组,收集别名
model_id = model_row[0] alias_groups: dict = {}
existing_aliases = normalize_alias_list(model_row[1]) for source_model, target_global_model_id, provider_id in mappings:
if not isinstance(source_model, str):
continue
source_model = source_model.strip()
if not source_model:
continue
if not isinstance(provider_id, str) or not provider_id:
continue
if not isinstance(target_global_model_id, str) or not target_global_model_id:
continue
existing_names = {a["name"] for a in existing_aliases} key = (provider_id, target_global_model_id)
merged_aliases = list(existing_aliases) if key not in alias_groups:
for alias in aliases: alias_groups[key] = []
name = alias.get("name") priority = len(alias_groups[key]) + 1
if not isinstance(name, str): alias_groups[key].append({"name": source_model, "priority": priority})
continue
name = name.strip()
if not name or name in existing_names:
continue
merged_aliases.append( # 更新对应的 models 记录
{ for (provider_id, global_model_id), aliases in alias_groups.items():
"name": name, model_row = session.execute(
"priority": len(merged_aliases) + 1, sa.select(models_table.c.id, models_table.c.provider_model_aliases)
} .where(
models_table.c.provider_id == provider_id,
models_table.c.global_model_id == global_model_id,
) )
existing_names.add(name) .limit(1)
).first()
session.execute( if model_row:
models_table.update() model_id = model_row[0]
.where(models_table.c.id == model_id) existing_aliases = normalize_alias_list(model_row[1])
.values(
provider_model_aliases=merged_aliases if merged_aliases else None, existing_names = {a["name"] for a in existing_aliases}
updated_at=datetime.now(timezone.utc), merged_aliases = list(existing_aliases)
for alias in aliases:
name = alias.get("name")
if not isinstance(name, str):
continue
name = name.strip()
if not name or name in existing_names:
continue
merged_aliases.append(
{
"name": name,
"priority": len(merged_aliases) + 1,
}
)
existing_names.add(name)
session.execute(
models_table.update()
.where(models_table.c.id == model_id)
.values(
provider_model_aliases=merged_aliases if merged_aliases else None,
updated_at=datetime.now(timezone.utc),
)
) )
)
session.commit() session.commit()
# 3. 删除 model_mappings 表 # 3. 删除 model_mappings 表
op.drop_table('model_mappings') op.drop_table('model_mappings')
# 4. 添加索引优化别名解析性能 # 4. 添加索引优化别名解析性能
# provider_model_name 索引(支持精确匹配) # provider_model_name 索引(支持精确匹配,如果不存在
op.create_index( if not index_exists(bind, "idx_model_provider_model_name"):
"idx_model_provider_model_name", op.create_index(
"models", "idx_model_provider_model_name",
["provider_model_name"], "models",
unique=False, ["provider_model_name"],
postgresql_where=sa.text("is_active = true"), unique=False,
) postgresql_where=sa.text("is_active = true"),
)
# provider_model_aliases GIN 索引(支持 JSONB 查询,仅 PostgreSQL # provider_model_aliases GIN 索引(支持 JSONB 查询,仅 PostgreSQL
if bind.dialect.name == "postgresql": if bind.dialect.name == "postgresql":
# 将 json 列转为 jsonbjsonb 性能更好且支持 GIN 索引) # 将 json 列转为 jsonbjsonb 性能更好且支持 GIN 索引)
# 使用 IF NOT EXISTS 风格的检查来避免重复转换
op.execute( op.execute(
""" """
ALTER TABLE models DO $$
ALTER COLUMN provider_model_aliases TYPE jsonb BEGIN
USING provider_model_aliases::jsonb IF EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'models'
AND column_name = 'provider_model_aliases'
AND data_type = 'json'
) THEN
ALTER TABLE models
ALTER COLUMN provider_model_aliases TYPE jsonb
USING provider_model_aliases::jsonb;
END IF;
END $$;
""" """
) )
# 创建 GIN 索引 # 创建 GIN 索引

View File

@@ -5,8 +5,8 @@ Revises: e9b3d63f0cbf
Create Date: 2025-12-15 17:07:44.631032+00:00 Create Date: 2025-12-15 17:07:44.631032+00:00
""" """
from alembic import op
import sqlalchemy as sa import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
@@ -16,10 +16,29 @@ branch_labels = None
depends_on = None depends_on = None
def column_exists(bind, table_name: str, column_name: str) -> bool:
"""检查列是否存在"""
result = bind.execute(
sa.text(
"""
SELECT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = :table_name AND column_name = :column_name
)
"""
),
{"table_name": table_name, "column_name": column_name},
)
return result.scalar()
def upgrade() -> None: def upgrade() -> None:
"""应用迁移:升级到新版本""" """应用迁移:升级到新版本"""
# 添加首字时间字段到 usage 表 bind = op.get_bind()
op.add_column('usage', sa.Column('first_byte_time_ms', sa.Integer(), nullable=True))
# 添加首字时间字段到 usage 表(如果不存在)
if not column_exists(bind, "usage", "first_byte_time_ms"):
op.add_column('usage', sa.Column('first_byte_time_ms', sa.Integer(), nullable=True))
def downgrade() -> None: def downgrade() -> None:

View File

@@ -5,8 +5,8 @@ Revises: 180e63a9c83a
Create Date: 2025-12-16 03:11:32.480976+00:00 Create Date: 2025-12-16 03:11:32.480976+00:00
""" """
from alembic import op
import sqlalchemy as sa import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
@@ -16,6 +16,22 @@ branch_labels = None
depends_on = None depends_on = None
def column_exists(bind, table_name: str, column_name: str) -> bool:
"""检查列是否存在"""
result = bind.execute(
sa.text(
"""
SELECT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = :table_name AND column_name = :column_name
)
"""
),
{"table_name": table_name, "column_name": column_name},
)
return result.scalar()
def upgrade() -> None: def upgrade() -> None:
"""应用迁移:升级到新版本 """应用迁移:升级到新版本
@@ -23,37 +39,45 @@ def upgrade() -> None:
2. 把旧数据迁移到 config 2. 把旧数据迁移到 config
3. 删除旧列 3. 删除旧列
""" """
bind = op.get_bind()
# 检查是否已经迁移过config 列存在且旧列不存在)
has_config = column_exists(bind, "global_models", "config")
has_old_columns = column_exists(bind, "global_models", "default_supports_streaming")
if has_config and not has_old_columns:
# 已完成迁移,跳过
return
# 1. 添加 config 列(使用 JSONB 类型,支持索引和更高效的查询) # 1. 添加 config 列(使用 JSONB 类型,支持索引和更高效的查询)
op.add_column('global_models', sa.Column('config', postgresql.JSONB(), nullable=True)) if not has_config:
op.add_column('global_models', sa.Column('config', postgresql.JSONB(), nullable=True))
# 2. 迁移数据:把旧字段合并到 config JSON # 2. 迁移数据:把旧字段合并到 config JSON(仅当旧列存在时)
# 注意:使用 COALESCE 为布尔字段设置默认值,避免数据丢失 if has_old_columns:
# - streaming 默认 true大多数模型支持 op.execute("""
# - 其他能力默认 false UPDATE global_models
# - jsonb_strip_nulls 只移除 null 字段,不影响 false 值 SET config = jsonb_strip_nulls(jsonb_build_object(
op.execute(""" 'streaming', COALESCE(default_supports_streaming, true),
UPDATE global_models 'vision', CASE WHEN COALESCE(default_supports_vision, false) THEN true ELSE NULL END,
SET config = jsonb_strip_nulls(jsonb_build_object( 'function_calling', CASE WHEN COALESCE(default_supports_function_calling, false) THEN true ELSE NULL END,
'streaming', COALESCE(default_supports_streaming, true), 'extended_thinking', CASE WHEN COALESCE(default_supports_extended_thinking, false) THEN true ELSE NULL END,
'vision', CASE WHEN COALESCE(default_supports_vision, false) THEN true ELSE NULL END, 'image_generation', CASE WHEN COALESCE(default_supports_image_generation, false) THEN true ELSE NULL END,
'function_calling', CASE WHEN COALESCE(default_supports_function_calling, false) THEN true ELSE NULL END, 'description', description,
'extended_thinking', CASE WHEN COALESCE(default_supports_extended_thinking, false) THEN true ELSE NULL END, 'icon_url', icon_url,
'image_generation', CASE WHEN COALESCE(default_supports_image_generation, false) THEN true ELSE NULL END, 'official_url', official_url
'description', description, ))
'icon_url', icon_url, """)
'official_url', official_url
))
""")
# 3. 删除旧列 # 3. 删除旧列
op.drop_column('global_models', 'default_supports_streaming') op.drop_column('global_models', 'default_supports_streaming')
op.drop_column('global_models', 'default_supports_vision') op.drop_column('global_models', 'default_supports_vision')
op.drop_column('global_models', 'default_supports_function_calling') op.drop_column('global_models', 'default_supports_function_calling')
op.drop_column('global_models', 'default_supports_extended_thinking') op.drop_column('global_models', 'default_supports_extended_thinking')
op.drop_column('global_models', 'default_supports_image_generation') op.drop_column('global_models', 'default_supports_image_generation')
op.drop_column('global_models', 'description') op.drop_column('global_models', 'description')
op.drop_column('global_models', 'icon_url') op.drop_column('global_models', 'icon_url')
op.drop_column('global_models', 'official_url') op.drop_column('global_models', 'official_url')
def downgrade() -> None: def downgrade() -> None: