fix: enhance proxy configuration with password preservation and UI improvements

- Add 'enabled' field to ProxyConfig for preserving config when disabled
- Mask proxy password in API responses (return '***' instead of actual password)
- Preserve existing password on update when new password not provided
- Add URL encoding for proxy credentials (handle special chars like @, :, /)
- Enhanced URL validation: block SOCKS4, require valid host, forbid embedded auth
- UI improvements: use Switch component, dynamic password placeholder
- Add confirmation dialog for orphaned credentials (URL empty but has username/password)
- Prevent browser password autofill with randomized IDs and CSS text-security
- Unify ProxyConfig type definition in types.ts
This commit is contained in:
fawney19
2025-12-18 16:14:37 +08:00
parent 3e50c157be
commit 293bb592dc
7 changed files with 224 additions and 58 deletions

View File

@@ -5,7 +5,7 @@ ProviderEndpoint CRUD 管理 API
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import List
from typing import List, Optional
from fastapi import APIRouter, Depends, Query, Request
from sqlalchemy import and_, func
@@ -27,6 +27,16 @@ router = APIRouter(tags=["Endpoint Management"])
pipeline = ApiRequestPipeline()
def mask_proxy_password(proxy_config: Optional[dict]) -> Optional[dict]:
"""对代理配置中的密码进行脱敏处理"""
if not proxy_config:
return None
masked = dict(proxy_config)
if masked.get("password"):
masked["password"] = "***"
return masked
@router.get("/providers/{provider_id}/endpoints", response_model=List[ProviderEndpointResponse])
async def list_provider_endpoints(
provider_id: str,
@@ -153,6 +163,7 @@ class AdminListProviderEndpointsAdapter(AdminApiAdapter):
"api_format": endpoint.api_format,
"total_keys": total_keys_map.get(endpoint.id, 0),
"active_keys": active_keys_map.get(endpoint.id, 0),
"proxy": mask_proxy_password(endpoint.proxy),
}
endpoint_dict.pop("_sa_instance_state", None)
result.append(ProviderEndpointResponse(**endpoint_dict))
@@ -216,12 +227,13 @@ class AdminCreateProviderEndpointAdapter(AdminApiAdapter):
endpoint_dict = {
k: v
for k, v in new_endpoint.__dict__.items()
if k not in {"api_format", "_sa_instance_state"}
if k not in {"api_format", "_sa_instance_state", "proxy"}
}
return ProviderEndpointResponse(
**endpoint_dict,
provider_name=provider.name,
api_format=new_endpoint.api_format,
proxy=mask_proxy_password(new_endpoint.proxy),
total_keys=0,
active_keys=0,
)
@@ -260,12 +272,13 @@ class AdminGetProviderEndpointAdapter(AdminApiAdapter):
endpoint_dict = {
k: v
for k, v in endpoint_obj.__dict__.items()
if k not in {"api_format", "_sa_instance_state"}
if k not in {"api_format", "_sa_instance_state", "proxy"}
}
return ProviderEndpointResponse(
**endpoint_dict,
provider_name=provider.name,
api_format=endpoint_obj.api_format,
proxy=mask_proxy_password(endpoint_obj.proxy),
total_keys=total_keys,
active_keys=active_keys,
)
@@ -285,9 +298,17 @@ class AdminUpdateProviderEndpointAdapter(AdminApiAdapter):
raise NotFoundException(f"Endpoint {self.endpoint_id} 不存在")
update_data = self.endpoint_data.model_dump(exclude_unset=True)
# 把 proxy 转换为 dict 存储
if "proxy" in update_data and update_data["proxy"] is not None:
update_data["proxy"] = dict(update_data["proxy"])
# 把 proxy 转换为 dict 存储,支持显式设置为 None 清除代理
if "proxy" in update_data:
if update_data["proxy"] is not None:
new_proxy = dict(update_data["proxy"])
# 只有当密码字段未提供时才保留原密码(空字符串视为显式清除)
if "password" not in new_proxy and endpoint.proxy:
old_password = endpoint.proxy.get("password")
if old_password:
new_proxy["password"] = old_password
update_data["proxy"] = new_proxy
# proxy 为 None 时保留,用于清除代理配置
for field, value in update_data.items():
setattr(endpoint, field, value)
endpoint.updated_at = datetime.now(timezone.utc)
@@ -315,12 +336,13 @@ class AdminUpdateProviderEndpointAdapter(AdminApiAdapter):
endpoint_dict = {
k: v
for k, v in endpoint.__dict__.items()
if k not in {"api_format", "_sa_instance_state"}
if k not in {"api_format", "_sa_instance_state", "proxy"}
}
return ProviderEndpointResponse(
**endpoint_dict,
provider_name=provider.name if provider else "Unknown",
api_format=endpoint.api_format,
proxy=mask_proxy_password(endpoint.proxy),
total_keys=total_keys,
active_keys=active_keys,
)

View File

@@ -5,7 +5,7 @@
from contextlib import asynccontextmanager
from typing import Any, Dict, Optional
from urllib.parse import urlparse
from urllib.parse import quote, urlparse
import httpx
@@ -17,14 +17,19 @@ def build_proxy_url(proxy_config: Dict[str, Any]) -> Optional[str]:
根据代理配置构建完整的代理 URL
Args:
proxy_config: 代理配置字典,包含 url, username, password
proxy_config: 代理配置字典,包含 url, username, password, enabled
Returns:
完整的代理 URL如 socks5://user:pass@host:port
如果 enabled=False 或无配置,返回 None
"""
if not proxy_config:
return None
# 检查 enabled 字段,默认为 True兼容旧数据
if not proxy_config.get("enabled", True):
return None
proxy_url = proxy_config.get("url")
if not proxy_url:
return None
@@ -32,10 +37,17 @@ def build_proxy_url(proxy_config: Dict[str, Any]) -> Optional[str]:
username = proxy_config.get("username")
password = proxy_config.get("password")
if username and password:
# 只要有用户名就添加认证信息(密码可以为空)
if username:
parsed = urlparse(proxy_url)
# URL 编码用户名和密码,处理特殊字符(如 @, :, /
encoded_username = quote(username, safe="")
encoded_password = quote(password, safe="") if password else ""
# 重新构建带认证的代理 URL
auth_proxy = f"{parsed.scheme}://{username}:{password}@{parsed.netloc}"
if encoded_password:
auth_proxy = f"{parsed.scheme}://{encoded_username}:{encoded_password}@{parsed.netloc}"
else:
auth_proxy = f"{parsed.scheme}://{encoded_username}@{parsed.netloc}"
if parsed.path:
auth_proxy += parsed.path
return auth_proxy

View File

@@ -19,14 +19,33 @@ class ProxyConfig(BaseModel):
url: str = Field(..., description="代理 URL (http://, https://, socks5://)")
username: Optional[str] = Field(None, max_length=255, description="代理用户名")
password: Optional[str] = Field(None, max_length=500, description="代理密码")
enabled: bool = Field(True, description="是否启用代理false 时保留配置但不使用)")
@field_validator("url")
@classmethod
def validate_proxy_url(cls, v: str) -> str:
"""验证代理 URL 格式"""
from urllib.parse import urlparse
v = v.strip()
if not re.match(r"^(http|https|socks5|socks4)://", v, re.IGNORECASE):
raise ValueError("代理 URL 必须以 http://, https://, socks5:// 或 socks4:// 开头")
# 检查禁止的字符(防止注入)
if "\n" in v or "\r" in v:
raise ValueError("代理 URL 包含非法字符")
# 验证协议(不支持 SOCKS4
if not re.match(r"^(http|https|socks5)://", v, re.IGNORECASE):
raise ValueError("代理 URL 必须以 http://, https:// 或 socks5:// 开头")
# 验证 URL 结构
parsed = urlparse(v)
if not parsed.netloc:
raise ValueError("代理 URL 必须包含有效的 host")
# 禁止 URL 中内嵌认证信息,强制使用独立字段
if parsed.username or parsed.password:
raise ValueError("请勿在 URL 中包含用户名和密码,请使用独立的认证字段")
return v

View File

@@ -110,8 +110,8 @@ class ProviderEndpointResponse(BaseModel):
# 额外配置
config: Optional[Dict[str, Any]] = None
# 代理配置
proxy: Optional[Dict[str, Any]] = Field(default=None, description="代理配置")
# 代理配置(响应中密码已脱敏)
proxy: Optional[Dict[str, Any]] = Field(default=None, description="代理配置(密码已脱敏)")
# 统计(从 Keys 聚合)
total_keys: int = Field(default=0, description="总 Key 数量")