mirror of
https://github.com/fawney19/Aether.git
synced 2026-01-03 00:02:28 +08:00
872 lines
34 KiB
Python
872 lines
34 KiB
Python
|
|
"""
|
|||
|
|
缓存监控端点
|
|||
|
|
|
|||
|
|
提供缓存亲和性统计、管理和监控功能
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from dataclasses import dataclass
|
|||
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|||
|
|
|
|||
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Request
|
|||
|
|
from fastapi.responses import PlainTextResponse
|
|||
|
|
from sqlalchemy.orm import Session
|
|||
|
|
|
|||
|
|
from src.api.base.admin_adapter import AdminApiAdapter
|
|||
|
|
from src.api.base.pagination import PaginationMeta, build_pagination_payload, paginate_sequence
|
|||
|
|
from src.api.base.pipeline import ApiRequestPipeline
|
|||
|
|
from src.clients.redis_client import get_redis_client_sync
|
|||
|
|
from src.core.crypto import crypto_service
|
|||
|
|
from src.core.logger import logger
|
|||
|
|
from src.database import get_db
|
|||
|
|
from src.models.database import ApiKey, User
|
|||
|
|
from src.services.cache.affinity_manager import get_affinity_manager
|
|||
|
|
from src.services.cache.aware_scheduler import get_cache_aware_scheduler
|
|||
|
|
|
|||
|
|
router = APIRouter(prefix="/api/admin/monitoring/cache", tags=["Admin - Monitoring: Cache"])
|
|||
|
|
pipeline = ApiRequestPipeline()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def mask_api_key(api_key: Optional[str], prefix_len: int = 8, suffix_len: int = 4) -> Optional[str]:
|
|||
|
|
"""
|
|||
|
|
脱敏 API Key,显示前缀 + 星号 + 后缀
|
|||
|
|
例如: sk-jhiId-xxxxxxxxxxxAABB -> sk-jhiId-********AABB
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
api_key: 原始 API Key
|
|||
|
|
prefix_len: 显示的前缀长度,默认 8
|
|||
|
|
suffix_len: 显示的后缀长度,默认 4
|
|||
|
|
"""
|
|||
|
|
if not api_key:
|
|||
|
|
return None
|
|||
|
|
total_visible = prefix_len + suffix_len
|
|||
|
|
if len(api_key) <= total_visible:
|
|||
|
|
# Key 太短,直接返回部分内容 + 星号
|
|||
|
|
return api_key[:prefix_len] + "********"
|
|||
|
|
return f"{api_key[:prefix_len]}********{api_key[-suffix_len:]}"
|
|||
|
|
|
|||
|
|
|
|||
|
|
def decrypt_and_mask(encrypted_key: Optional[str], prefix_len: int = 8) -> Optional[str]:
|
|||
|
|
"""
|
|||
|
|
解密 API Key 后脱敏显示
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
encrypted_key: 加密后的 API Key
|
|||
|
|
prefix_len: 显示的前缀长度
|
|||
|
|
"""
|
|||
|
|
if not encrypted_key:
|
|||
|
|
return None
|
|||
|
|
try:
|
|||
|
|
decrypted = crypto_service.decrypt(encrypted_key)
|
|||
|
|
return mask_api_key(decrypted, prefix_len)
|
|||
|
|
except Exception:
|
|||
|
|
# 解密失败时返回 None
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def resolve_user_identifier(db: Session, identifier: str) -> Optional[str]:
|
|||
|
|
"""
|
|||
|
|
将用户标识符(username/email/user_id/api_key_id)解析为 user_id
|
|||
|
|
|
|||
|
|
支持的输入格式:
|
|||
|
|
1. User UUID (36位,带横杠)
|
|||
|
|
2. Username (用户名)
|
|||
|
|
3. Email (邮箱)
|
|||
|
|
4. API Key ID (36位UUID)
|
|||
|
|
|
|||
|
|
返回:
|
|||
|
|
- user_id (UUID字符串) 或 None
|
|||
|
|
"""
|
|||
|
|
identifier = identifier.strip()
|
|||
|
|
|
|||
|
|
# 1. 先尝试作为 User UUID 查询
|
|||
|
|
user = db.query(User).filter(User.id == identifier).first()
|
|||
|
|
if user:
|
|||
|
|
logger.debug(f"通过User ID解析: {identifier[:8]}... -> {user.username}")
|
|||
|
|
return user.id
|
|||
|
|
|
|||
|
|
# 2. 尝试作为 Username 查询
|
|||
|
|
user = db.query(User).filter(User.username == identifier).first()
|
|||
|
|
if user:
|
|||
|
|
logger.debug(f"通过Username解析: {identifier} -> {user.id[:8]}...")
|
|||
|
|
return user.id
|
|||
|
|
|
|||
|
|
# 3. 尝试作为 Email 查询
|
|||
|
|
user = db.query(User).filter(User.email == identifier).first()
|
|||
|
|
if user:
|
|||
|
|
logger.debug(f"通过Email解析: {identifier} -> {user.id[:8]}...")
|
|||
|
|
return user.id
|
|||
|
|
|
|||
|
|
# 4. 尝试作为 API Key ID 查询
|
|||
|
|
api_key = db.query(ApiKey).filter(ApiKey.id == identifier).first()
|
|||
|
|
if api_key:
|
|||
|
|
logger.debug(f"通过API Key ID解析: {identifier[:8]}... -> User ID: {api_key.user_id[:8]}...")
|
|||
|
|
return api_key.user_id
|
|||
|
|
|
|||
|
|
# 无法识别
|
|||
|
|
logger.debug(f"无法识别的用户标识符: {identifier}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
@router.get("/stats")
|
|||
|
|
async def get_cache_stats(
|
|||
|
|
request: Request,
|
|||
|
|
db: Session = Depends(get_db),
|
|||
|
|
):
|
|||
|
|
"""
|
|||
|
|
获取缓存亲和性统计信息
|
|||
|
|
|
|||
|
|
返回:
|
|||
|
|
- 缓存命中率
|
|||
|
|
- 缓存用户数
|
|||
|
|
- Provider切换次数
|
|||
|
|
- Key切换次数
|
|||
|
|
- 缓存预留配置
|
|||
|
|
"""
|
|||
|
|
adapter = AdminCacheStatsAdapter()
|
|||
|
|
return await pipeline.run(adapter=adapter, http_request=request, db=db, mode=adapter.mode)
|
|||
|
|
|
|||
|
|
|
|||
|
|
@router.get("/affinity/{user_identifier}")
|
|||
|
|
async def get_user_affinity(
|
|||
|
|
user_identifier: str,
|
|||
|
|
request: Request,
|
|||
|
|
db: Session = Depends(get_db),
|
|||
|
|
):
|
|||
|
|
"""
|
|||
|
|
查询指定用户的所有缓存亲和性
|
|||
|
|
|
|||
|
|
参数:
|
|||
|
|
- user_identifier: 用户标识符,支持以下格式:
|
|||
|
|
* 用户名 (username),如: yuanhonghu
|
|||
|
|
* 邮箱 (email),如: user@example.com
|
|||
|
|
* 用户UUID (user_id),如: 550e8400-e29b-41d4-a716-446655440000
|
|||
|
|
* API Key ID,如: 660e8400-e29b-41d4-a716-446655440000
|
|||
|
|
|
|||
|
|
返回:
|
|||
|
|
- 用户信息
|
|||
|
|
- 所有端点的缓存亲和性列表(每个端点一条记录)
|
|||
|
|
"""
|
|||
|
|
adapter = AdminGetUserAffinityAdapter(user_identifier=user_identifier)
|
|||
|
|
return await pipeline.run(adapter=adapter, http_request=request, db=db, mode=adapter.mode)
|
|||
|
|
|
|||
|
|
|
|||
|
|
@router.get("/affinities")
|
|||
|
|
async def list_affinities(
|
|||
|
|
request: Request,
|
|||
|
|
keyword: Optional[str] = None,
|
|||
|
|
limit: int = Query(100, ge=1, le=1000, description="返回数量限制"),
|
|||
|
|
offset: int = Query(0, ge=0, description="偏移量"),
|
|||
|
|
db: Session = Depends(get_db),
|
|||
|
|
):
|
|||
|
|
"""
|
|||
|
|
获取所有缓存亲和性列表,可选按关键词过滤
|
|||
|
|
|
|||
|
|
参数:
|
|||
|
|
- keyword: 可选,支持用户名/邮箱/User ID/API Key ID 或模糊匹配
|
|||
|
|
"""
|
|||
|
|
adapter = AdminListAffinitiesAdapter(keyword=keyword, limit=limit, offset=offset)
|
|||
|
|
return await pipeline.run(adapter=adapter, http_request=request, db=db, mode=adapter.mode)
|
|||
|
|
|
|||
|
|
|
|||
|
|
@router.delete("/users/{user_identifier}")
|
|||
|
|
async def clear_user_cache(
|
|||
|
|
user_identifier: str,
|
|||
|
|
request: Request,
|
|||
|
|
db: Session = Depends(get_db),
|
|||
|
|
):
|
|||
|
|
"""
|
|||
|
|
Clear cache affinity for a specific user
|
|||
|
|
|
|||
|
|
Parameters:
|
|||
|
|
- user_identifier: User identifier (username, email, user_id, or API Key ID)
|
|||
|
|
"""
|
|||
|
|
adapter = AdminClearUserCacheAdapter(user_identifier=user_identifier)
|
|||
|
|
return await pipeline.run(adapter=adapter, http_request=request, db=db, mode=adapter.mode)
|
|||
|
|
|
|||
|
|
|
|||
|
|
@router.delete("")
|
|||
|
|
async def clear_all_cache(
|
|||
|
|
request: Request,
|
|||
|
|
db: Session = Depends(get_db),
|
|||
|
|
):
|
|||
|
|
"""
|
|||
|
|
Clear all cache affinities
|
|||
|
|
|
|||
|
|
Warning: This affects all users, use with caution
|
|||
|
|
"""
|
|||
|
|
adapter = AdminClearAllCacheAdapter()
|
|||
|
|
return await pipeline.run(adapter=adapter, http_request=request, db=db, mode=adapter.mode)
|
|||
|
|
|
|||
|
|
|
|||
|
|
@router.delete("/providers/{provider_id}")
|
|||
|
|
async def clear_provider_cache(
|
|||
|
|
provider_id: str,
|
|||
|
|
request: Request,
|
|||
|
|
db: Session = Depends(get_db),
|
|||
|
|
):
|
|||
|
|
"""
|
|||
|
|
Clear cache affinities for a specific provider
|
|||
|
|
|
|||
|
|
Parameters:
|
|||
|
|
- provider_id: Provider ID
|
|||
|
|
"""
|
|||
|
|
adapter = AdminClearProviderCacheAdapter(provider_id=provider_id)
|
|||
|
|
return await pipeline.run(adapter=adapter, http_request=request, db=db, mode=adapter.mode)
|
|||
|
|
|
|||
|
|
|
|||
|
|
@router.get("/config")
|
|||
|
|
async def get_cache_config(
|
|||
|
|
request: Request,
|
|||
|
|
db: Session = Depends(get_db),
|
|||
|
|
):
|
|||
|
|
"""
|
|||
|
|
获取缓存相关配置
|
|||
|
|
|
|||
|
|
返回:
|
|||
|
|
- 缓存TTL
|
|||
|
|
- 缓存预留比例
|
|||
|
|
"""
|
|||
|
|
adapter = AdminCacheConfigAdapter()
|
|||
|
|
return await pipeline.run(adapter=adapter, http_request=request, db=db, mode=adapter.mode)
|
|||
|
|
|
|||
|
|
|
|||
|
|
@router.get("/metrics", response_class=PlainTextResponse)
|
|||
|
|
async def get_cache_metrics(
|
|||
|
|
request: Request,
|
|||
|
|
db: Session = Depends(get_db),
|
|||
|
|
):
|
|||
|
|
"""
|
|||
|
|
以 Prometheus 文本格式暴露缓存调度指标,方便接入 Grafana。
|
|||
|
|
"""
|
|||
|
|
adapter = AdminCacheMetricsAdapter()
|
|||
|
|
return await pipeline.run(adapter=adapter, http_request=request, db=db, mode=adapter.mode)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# -------- 缓存监控适配器 --------
|
|||
|
|
|
|||
|
|
|
|||
|
|
class AdminCacheStatsAdapter(AdminApiAdapter):
|
|||
|
|
async def handle(self, context): # type: ignore[override]
|
|||
|
|
try:
|
|||
|
|
redis_client = get_redis_client_sync()
|
|||
|
|
scheduler = await get_cache_aware_scheduler(redis_client)
|
|||
|
|
stats = await scheduler.get_stats()
|
|||
|
|
logger.info("缓存统计信息查询成功")
|
|||
|
|
context.add_audit_metadata(
|
|||
|
|
action="cache_stats",
|
|||
|
|
scheduler=stats.get("scheduler"),
|
|||
|
|
total_affinities=stats.get("total_affinities"),
|
|||
|
|
cache_hit_rate=stats.get("cache_hit_rate"),
|
|||
|
|
provider_switches=stats.get("provider_switches"),
|
|||
|
|
)
|
|||
|
|
return {"status": "ok", "data": stats}
|
|||
|
|
except Exception as exc:
|
|||
|
|
logger.exception(f"获取缓存统计信息失败: {exc}")
|
|||
|
|
raise HTTPException(status_code=500, detail=f"获取缓存统计失败: {exc}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
class AdminCacheMetricsAdapter(AdminApiAdapter):
|
|||
|
|
async def handle(self, context): # type: ignore[override]
|
|||
|
|
try:
|
|||
|
|
redis_client = get_redis_client_sync()
|
|||
|
|
scheduler = await get_cache_aware_scheduler(redis_client)
|
|||
|
|
stats = await scheduler.get_stats()
|
|||
|
|
payload = self._format_prometheus(stats)
|
|||
|
|
context.add_audit_metadata(
|
|||
|
|
action="cache_metrics_export",
|
|||
|
|
scheduler=stats.get("scheduler"),
|
|||
|
|
metrics_lines=payload.count("\n"),
|
|||
|
|
)
|
|||
|
|
return PlainTextResponse(payload)
|
|||
|
|
except Exception as exc:
|
|||
|
|
logger.exception(f"导出缓存指标失败: {exc}")
|
|||
|
|
raise HTTPException(status_code=500, detail=f"导出缓存指标失败: {exc}")
|
|||
|
|
|
|||
|
|
def _format_prometheus(self, stats: Dict[str, Any]) -> str:
|
|||
|
|
"""
|
|||
|
|
将 scheduler/affinity 指标转换为 Prometheus 文本格式。
|
|||
|
|
"""
|
|||
|
|
scheduler_metrics = stats.get("scheduler_metrics", {})
|
|||
|
|
affinity_stats = stats.get("affinity_stats", {})
|
|||
|
|
|
|||
|
|
metric_map: List[Tuple[str, str, float]] = [
|
|||
|
|
(
|
|||
|
|
"cache_scheduler_total_batches",
|
|||
|
|
"Total batches pulled from provider list",
|
|||
|
|
float(scheduler_metrics.get("total_batches", 0)),
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"cache_scheduler_last_batch_size",
|
|||
|
|
"Size of the latest candidate batch",
|
|||
|
|
float(scheduler_metrics.get("last_batch_size", 0)),
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"cache_scheduler_total_candidates",
|
|||
|
|
"Total candidates enumerated by scheduler",
|
|||
|
|
float(scheduler_metrics.get("total_candidates", 0)),
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"cache_scheduler_last_candidate_count",
|
|||
|
|
"Number of candidates in the most recent batch",
|
|||
|
|
float(scheduler_metrics.get("last_candidate_count", 0)),
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"cache_scheduler_cache_hits",
|
|||
|
|
"Cache hits counted during scheduling",
|
|||
|
|
float(scheduler_metrics.get("cache_hits", 0)),
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"cache_scheduler_cache_misses",
|
|||
|
|
"Cache misses counted during scheduling",
|
|||
|
|
float(scheduler_metrics.get("cache_misses", 0)),
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"cache_scheduler_cache_hit_rate",
|
|||
|
|
"Cache hit rate during scheduling",
|
|||
|
|
float(scheduler_metrics.get("cache_hit_rate", 0.0)),
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"cache_scheduler_concurrency_denied",
|
|||
|
|
"Times candidate rejected due to concurrency limits",
|
|||
|
|
float(scheduler_metrics.get("concurrency_denied", 0)),
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"cache_scheduler_avg_candidates_per_batch",
|
|||
|
|
"Average candidates per batch",
|
|||
|
|
float(scheduler_metrics.get("avg_candidates_per_batch", 0.0)),
|
|||
|
|
),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
affinity_map: List[Tuple[str, str, float]] = [
|
|||
|
|
(
|
|||
|
|
"cache_affinity_total",
|
|||
|
|
"Total cache affinities stored",
|
|||
|
|
float(affinity_stats.get("total_affinities", 0)),
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"cache_affinity_hits",
|
|||
|
|
"Affinity cache hits",
|
|||
|
|
float(affinity_stats.get("cache_hits", 0)),
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"cache_affinity_misses",
|
|||
|
|
"Affinity cache misses",
|
|||
|
|
float(affinity_stats.get("cache_misses", 0)),
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"cache_affinity_hit_rate",
|
|||
|
|
"Affinity cache hit rate",
|
|||
|
|
float(affinity_stats.get("cache_hit_rate", 0.0)),
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"cache_affinity_invalidations",
|
|||
|
|
"Affinity invalidations",
|
|||
|
|
float(affinity_stats.get("cache_invalidations", 0)),
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"cache_affinity_provider_switches",
|
|||
|
|
"Affinity provider switches",
|
|||
|
|
float(affinity_stats.get("provider_switches", 0)),
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"cache_affinity_key_switches",
|
|||
|
|
"Affinity key switches",
|
|||
|
|
float(affinity_stats.get("key_switches", 0)),
|
|||
|
|
),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
lines = []
|
|||
|
|
for name, help_text, value in metric_map + affinity_map:
|
|||
|
|
lines.append(f"# HELP {name} {help_text}")
|
|||
|
|
lines.append(f"# TYPE {name} gauge")
|
|||
|
|
lines.append(f"{name} {value}")
|
|||
|
|
|
|||
|
|
scheduler_name = stats.get("scheduler", "cache_aware")
|
|||
|
|
lines.append(f'cache_scheduler_info{{scheduler="{scheduler_name}"}} 1')
|
|||
|
|
|
|||
|
|
return "\n".join(lines) + "\n"
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class AdminGetUserAffinityAdapter(AdminApiAdapter):
|
|||
|
|
user_identifier: str
|
|||
|
|
|
|||
|
|
async def handle(self, context): # type: ignore[override]
|
|||
|
|
db = context.db
|
|||
|
|
try:
|
|||
|
|
user_id = resolve_user_identifier(db, self.user_identifier)
|
|||
|
|
if not user_id:
|
|||
|
|
raise HTTPException(
|
|||
|
|
status_code=404,
|
|||
|
|
detail=f"无法识别的用户标识符: {self.user_identifier}。支持用户名、邮箱、User ID或API Key ID",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|||
|
|
redis_client = get_redis_client_sync()
|
|||
|
|
affinity_mgr = await get_affinity_manager(redis_client)
|
|||
|
|
|
|||
|
|
# 获取该用户的所有缓存亲和性
|
|||
|
|
all_affinities = await affinity_mgr.list_affinities()
|
|||
|
|
user_affinities = [aff for aff in all_affinities if aff.get("user_id") == user_id]
|
|||
|
|
|
|||
|
|
if not user_affinities:
|
|||
|
|
response = {
|
|||
|
|
"status": "not_found",
|
|||
|
|
"message": f"用户 {user.username} ({user.email}) 没有缓存亲和性",
|
|||
|
|
"user_info": {
|
|||
|
|
"user_id": user_id,
|
|||
|
|
"username": user.username,
|
|||
|
|
"email": user.email,
|
|||
|
|
},
|
|||
|
|
"affinities": [],
|
|||
|
|
}
|
|||
|
|
context.add_audit_metadata(
|
|||
|
|
action="cache_user_affinity",
|
|||
|
|
user_identifier=self.user_identifier,
|
|||
|
|
resolved_user_id=user_id,
|
|||
|
|
affinity_count=0,
|
|||
|
|
status="not_found",
|
|||
|
|
)
|
|||
|
|
return response
|
|||
|
|
|
|||
|
|
response = {
|
|||
|
|
"status": "ok",
|
|||
|
|
"user_info": {
|
|||
|
|
"user_id": user_id,
|
|||
|
|
"username": user.username,
|
|||
|
|
"email": user.email,
|
|||
|
|
},
|
|||
|
|
"affinities": [
|
|||
|
|
{
|
|||
|
|
"provider_id": aff["provider_id"],
|
|||
|
|
"endpoint_id": aff["endpoint_id"],
|
|||
|
|
"key_id": aff["key_id"],
|
|||
|
|
"api_format": aff.get("api_format"),
|
|||
|
|
"model_name": aff.get("model_name"),
|
|||
|
|
"created_at": aff["created_at"],
|
|||
|
|
"expire_at": aff["expire_at"],
|
|||
|
|
"request_count": aff["request_count"],
|
|||
|
|
}
|
|||
|
|
for aff in user_affinities
|
|||
|
|
],
|
|||
|
|
"total_endpoints": len(user_affinities),
|
|||
|
|
}
|
|||
|
|
context.add_audit_metadata(
|
|||
|
|
action="cache_user_affinity",
|
|||
|
|
user_identifier=self.user_identifier,
|
|||
|
|
resolved_user_id=user_id,
|
|||
|
|
affinity_count=len(user_affinities),
|
|||
|
|
status="ok",
|
|||
|
|
)
|
|||
|
|
return response
|
|||
|
|
except HTTPException:
|
|||
|
|
raise
|
|||
|
|
except Exception as exc:
|
|||
|
|
logger.exception(f"查询用户缓存亲和性失败: {exc}")
|
|||
|
|
raise HTTPException(status_code=500, detail=f"查询失败: {exc}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class AdminListAffinitiesAdapter(AdminApiAdapter):
|
|||
|
|
keyword: Optional[str]
|
|||
|
|
limit: int
|
|||
|
|
offset: int
|
|||
|
|
|
|||
|
|
async def handle(self, context): # type: ignore[override]
|
|||
|
|
db = context.db
|
|||
|
|
redis_client = get_redis_client_sync()
|
|||
|
|
if not redis_client:
|
|||
|
|
raise HTTPException(status_code=503, detail="Redis未初始化,无法获取缓存亲和性")
|
|||
|
|
|
|||
|
|
affinity_mgr = await get_affinity_manager(redis_client)
|
|||
|
|
matched_user_id = None
|
|||
|
|
matched_api_key_id = None
|
|||
|
|
raw_affinities: List[Dict[str, Any]] = []
|
|||
|
|
|
|||
|
|
if self.keyword:
|
|||
|
|
# 首先检查是否是 API Key ID(affinity_key)
|
|||
|
|
api_key = db.query(ApiKey).filter(ApiKey.id == self.keyword).first()
|
|||
|
|
if api_key:
|
|||
|
|
# 直接通过 affinity_key 过滤
|
|||
|
|
matched_api_key_id = str(api_key.id)
|
|||
|
|
matched_user_id = str(api_key.user_id)
|
|||
|
|
all_affinities = await affinity_mgr.list_affinities()
|
|||
|
|
raw_affinities = [
|
|||
|
|
aff for aff in all_affinities if aff.get("affinity_key") == matched_api_key_id
|
|||
|
|
]
|
|||
|
|
else:
|
|||
|
|
# 尝试解析为用户标识
|
|||
|
|
user_id = resolve_user_identifier(db, self.keyword)
|
|||
|
|
if user_id:
|
|||
|
|
matched_user_id = user_id
|
|||
|
|
# 获取该用户所有的 API Key ID
|
|||
|
|
user_api_keys = db.query(ApiKey).filter(ApiKey.user_id == user_id).all()
|
|||
|
|
user_api_key_ids = {str(k.id) for k in user_api_keys}
|
|||
|
|
# 过滤出该用户所有 API Key 的亲和性
|
|||
|
|
all_affinities = await affinity_mgr.list_affinities()
|
|||
|
|
raw_affinities = [
|
|||
|
|
aff for aff in all_affinities if aff.get("affinity_key") in user_api_key_ids
|
|||
|
|
]
|
|||
|
|
else:
|
|||
|
|
# 关键词不是有效标识,返回所有亲和性(后续会进行模糊匹配)
|
|||
|
|
raw_affinities = await affinity_mgr.list_affinities()
|
|||
|
|
else:
|
|||
|
|
raw_affinities = await affinity_mgr.list_affinities()
|
|||
|
|
|
|||
|
|
# 收集所有 affinity_key (API Key ID)
|
|||
|
|
affinity_keys = {
|
|||
|
|
item.get("affinity_key") for item in raw_affinities if item.get("affinity_key")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 批量查询用户 API Key 信息
|
|||
|
|
user_api_key_map: Dict[str, ApiKey] = {}
|
|||
|
|
if affinity_keys:
|
|||
|
|
user_api_keys = db.query(ApiKey).filter(ApiKey.id.in_(list(affinity_keys))).all()
|
|||
|
|
user_api_key_map = {str(k.id): k for k in user_api_keys}
|
|||
|
|
|
|||
|
|
# 收集所有 user_id
|
|||
|
|
user_ids = {str(k.user_id) for k in user_api_key_map.values()}
|
|||
|
|
user_map: Dict[str, User] = {}
|
|||
|
|
if user_ids:
|
|||
|
|
users = db.query(User).filter(User.id.in_(list(user_ids))).all()
|
|||
|
|
user_map = {str(user.id): user for user in users}
|
|||
|
|
|
|||
|
|
# 收集所有provider_id、endpoint_id、key_id
|
|||
|
|
provider_ids = {
|
|||
|
|
item.get("provider_id") for item in raw_affinities if item.get("provider_id")
|
|||
|
|
}
|
|||
|
|
endpoint_ids = {
|
|||
|
|
item.get("endpoint_id") for item in raw_affinities if item.get("endpoint_id")
|
|||
|
|
}
|
|||
|
|
key_ids = {item.get("key_id") for item in raw_affinities if item.get("key_id")}
|
|||
|
|
|
|||
|
|
# 批量查询Provider、Endpoint、Key信息
|
|||
|
|
from src.models.database import Provider, ProviderAPIKey, ProviderEndpoint
|
|||
|
|
|
|||
|
|
provider_map = {}
|
|||
|
|
if provider_ids:
|
|||
|
|
providers = db.query(Provider).filter(Provider.id.in_(list(provider_ids))).all()
|
|||
|
|
provider_map = {p.id: p for p in providers}
|
|||
|
|
|
|||
|
|
endpoint_map = {}
|
|||
|
|
if endpoint_ids:
|
|||
|
|
endpoints = (
|
|||
|
|
db.query(ProviderEndpoint).filter(ProviderEndpoint.id.in_(list(endpoint_ids))).all()
|
|||
|
|
)
|
|||
|
|
endpoint_map = {e.id: e for e in endpoints}
|
|||
|
|
|
|||
|
|
key_map = {}
|
|||
|
|
if key_ids:
|
|||
|
|
keys = db.query(ProviderAPIKey).filter(ProviderAPIKey.id.in_(list(key_ids))).all()
|
|||
|
|
key_map = {k.id: k for k in keys}
|
|||
|
|
|
|||
|
|
# 收集所有 model_name(实际存储的是 global_model_id)并批量查询 GlobalModel
|
|||
|
|
from src.models.database import GlobalModel
|
|||
|
|
|
|||
|
|
global_model_ids = {
|
|||
|
|
item.get("model_name") for item in raw_affinities if item.get("model_name")
|
|||
|
|
}
|
|||
|
|
global_model_map: Dict[str, GlobalModel] = {}
|
|||
|
|
if global_model_ids:
|
|||
|
|
# model_name 可能是 UUID 格式的 global_model_id,也可能是原始模型名称
|
|||
|
|
global_models = db.query(GlobalModel).filter(
|
|||
|
|
GlobalModel.id.in_(list(global_model_ids))
|
|||
|
|
).all()
|
|||
|
|
global_model_map = {str(gm.id): gm for gm in global_models}
|
|||
|
|
|
|||
|
|
keyword_lower = self.keyword.lower() if self.keyword else None
|
|||
|
|
items = []
|
|||
|
|
for affinity in raw_affinities:
|
|||
|
|
affinity_key = affinity.get("affinity_key")
|
|||
|
|
if not affinity_key:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 通过 affinity_key(API Key ID)找到用户 API Key 和用户
|
|||
|
|
user_api_key = user_api_key_map.get(affinity_key)
|
|||
|
|
user = user_map.get(str(user_api_key.user_id)) if user_api_key else None
|
|||
|
|
user_id = str(user_api_key.user_id) if user_api_key else None
|
|||
|
|
|
|||
|
|
provider_id = affinity.get("provider_id")
|
|||
|
|
endpoint_id = affinity.get("endpoint_id")
|
|||
|
|
key_id = affinity.get("key_id")
|
|||
|
|
|
|||
|
|
provider = provider_map.get(provider_id)
|
|||
|
|
endpoint = endpoint_map.get(endpoint_id)
|
|||
|
|
key = key_map.get(key_id)
|
|||
|
|
|
|||
|
|
# 用户 API Key 脱敏显示(解密 key_encrypted 后脱敏)
|
|||
|
|
user_api_key_masked = None
|
|||
|
|
if user_api_key and user_api_key.key_encrypted:
|
|||
|
|
user_api_key_masked = decrypt_and_mask(user_api_key.key_encrypted)
|
|||
|
|
|
|||
|
|
# Provider Key 脱敏显示(解密 api_key 后脱敏)
|
|||
|
|
provider_key_masked = None
|
|||
|
|
if key and key.api_key:
|
|||
|
|
provider_key_masked = decrypt_and_mask(key.api_key)
|
|||
|
|
|
|||
|
|
item = {
|
|||
|
|
"affinity_key": affinity_key,
|
|||
|
|
"user_api_key_name": user_api_key.name if user_api_key else None,
|
|||
|
|
"user_api_key_prefix": user_api_key_masked,
|
|||
|
|
"is_standalone": user_api_key.is_standalone if user_api_key else False,
|
|||
|
|
"user_id": user_id,
|
|||
|
|
"username": user.username if user else None,
|
|||
|
|
"email": user.email if user else None,
|
|||
|
|
"provider_id": provider_id,
|
|||
|
|
"provider_name": provider.display_name if provider else None,
|
|||
|
|
"endpoint_id": endpoint_id,
|
|||
|
|
"endpoint_api_format": (
|
|||
|
|
endpoint.api_format if endpoint and endpoint.api_format else None
|
|||
|
|
),
|
|||
|
|
"endpoint_url": endpoint.base_url if endpoint else None,
|
|||
|
|
"key_id": key_id,
|
|||
|
|
"key_name": key.name if key else None,
|
|||
|
|
"key_prefix": provider_key_masked,
|
|||
|
|
"rate_multiplier": key.rate_multiplier if key else 1.0,
|
|||
|
|
"model_name": (
|
|||
|
|
global_model_map.get(affinity.get("model_name")).name
|
|||
|
|
if affinity.get("model_name") and global_model_map.get(affinity.get("model_name"))
|
|||
|
|
else affinity.get("model_name") # 如果找不到 GlobalModel,显示原始值
|
|||
|
|
),
|
|||
|
|
"model_display_name": (
|
|||
|
|
global_model_map.get(affinity.get("model_name")).display_name
|
|||
|
|
if affinity.get("model_name") and global_model_map.get(affinity.get("model_name"))
|
|||
|
|
else None
|
|||
|
|
),
|
|||
|
|
"api_format": affinity.get("api_format"),
|
|||
|
|
"created_at": affinity.get("created_at"),
|
|||
|
|
"expire_at": affinity.get("expire_at"),
|
|||
|
|
"request_count": affinity.get("request_count", 0),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if keyword_lower and not matched_user_id and not matched_api_key_id:
|
|||
|
|
searchable = [
|
|||
|
|
item["affinity_key"],
|
|||
|
|
item["user_api_key_name"] or "",
|
|||
|
|
item["user_id"] or "",
|
|||
|
|
item["username"] or "",
|
|||
|
|
item["email"] or "",
|
|||
|
|
item["provider_id"] or "",
|
|||
|
|
item["key_id"] or "",
|
|||
|
|
]
|
|||
|
|
if not any(keyword_lower in str(value).lower() for value in searchable if value):
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
items.append(item)
|
|||
|
|
|
|||
|
|
items.sort(key=lambda x: x.get("expire_at") or 0, reverse=True)
|
|||
|
|
paged_items, meta = paginate_sequence(items, self.limit, self.offset)
|
|||
|
|
payload = build_pagination_payload(
|
|||
|
|
paged_items,
|
|||
|
|
meta,
|
|||
|
|
matched_user_id=matched_user_id,
|
|||
|
|
)
|
|||
|
|
response = {
|
|||
|
|
"status": "ok",
|
|||
|
|
"data": payload,
|
|||
|
|
}
|
|||
|
|
result_count = meta.count if hasattr(meta, "count") else len(paged_items)
|
|||
|
|
context.add_audit_metadata(
|
|||
|
|
action="cache_affinity_list",
|
|||
|
|
keyword=self.keyword,
|
|||
|
|
matched_user_id=matched_user_id,
|
|||
|
|
matched_api_key_id=matched_api_key_id,
|
|||
|
|
limit=self.limit,
|
|||
|
|
offset=self.offset,
|
|||
|
|
result_count=result_count,
|
|||
|
|
)
|
|||
|
|
return response
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class AdminClearUserCacheAdapter(AdminApiAdapter):
|
|||
|
|
user_identifier: str
|
|||
|
|
|
|||
|
|
async def handle(self, context): # type: ignore[override]
|
|||
|
|
db = context.db
|
|||
|
|
try:
|
|||
|
|
redis_client = get_redis_client_sync()
|
|||
|
|
affinity_mgr = await get_affinity_manager(redis_client)
|
|||
|
|
|
|||
|
|
# 首先检查是否直接是 API Key ID (affinity_key)
|
|||
|
|
api_key = db.query(ApiKey).filter(ApiKey.id == self.user_identifier).first()
|
|||
|
|
if api_key:
|
|||
|
|
# 直接按 affinity_key 清除
|
|||
|
|
affinity_key = str(api_key.id)
|
|||
|
|
user = db.query(User).filter(User.id == api_key.user_id).first()
|
|||
|
|
|
|||
|
|
all_affinities = await affinity_mgr.list_affinities()
|
|||
|
|
target_affinities = [
|
|||
|
|
aff for aff in all_affinities if aff.get("affinity_key") == affinity_key
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
count = 0
|
|||
|
|
for aff in target_affinities:
|
|||
|
|
api_format = aff.get("api_format")
|
|||
|
|
model_name = aff.get("model_name")
|
|||
|
|
endpoint_id = aff.get("endpoint_id")
|
|||
|
|
if api_format and model_name:
|
|||
|
|
await affinity_mgr.invalidate_affinity(
|
|||
|
|
affinity_key, api_format, model_name, endpoint_id=endpoint_id
|
|||
|
|
)
|
|||
|
|
count += 1
|
|||
|
|
|
|||
|
|
logger.info(f"已清除API Key缓存亲和性: api_key_name={api_key.name}, affinity_key={affinity_key[:8]}..., 清除数量={count}")
|
|||
|
|
|
|||
|
|
response = {
|
|||
|
|
"status": "ok",
|
|||
|
|
"message": f"已清除 API Key {api_key.name} 的缓存亲和性",
|
|||
|
|
"user_info": {
|
|||
|
|
"user_id": str(api_key.user_id),
|
|||
|
|
"username": user.username if user else None,
|
|||
|
|
"email": user.email if user else None,
|
|||
|
|
"api_key_id": affinity_key,
|
|||
|
|
"api_key_name": api_key.name,
|
|||
|
|
},
|
|||
|
|
}
|
|||
|
|
context.add_audit_metadata(
|
|||
|
|
action="cache_clear_api_key",
|
|||
|
|
user_identifier=self.user_identifier,
|
|||
|
|
resolved_api_key_id=affinity_key,
|
|||
|
|
cleared_count=count,
|
|||
|
|
)
|
|||
|
|
return response
|
|||
|
|
|
|||
|
|
# 如果不是 API Key ID,尝试解析为用户标识
|
|||
|
|
user_id = resolve_user_identifier(db, self.user_identifier)
|
|||
|
|
if not user_id:
|
|||
|
|
raise HTTPException(
|
|||
|
|
status_code=404,
|
|||
|
|
detail=f"无法识别的标识符: {self.user_identifier}。支持用户名、邮箱、User ID或API Key ID",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|||
|
|
|
|||
|
|
# 获取该用户所有的 API Key
|
|||
|
|
user_api_keys = db.query(ApiKey).filter(ApiKey.user_id == user_id).all()
|
|||
|
|
user_api_key_ids = {str(k.id) for k in user_api_keys}
|
|||
|
|
|
|||
|
|
# 获取该用户所有 API Key 的缓存亲和性并逐个失效
|
|||
|
|
all_affinities = await affinity_mgr.list_affinities()
|
|||
|
|
user_affinities = [
|
|||
|
|
aff for aff in all_affinities if aff.get("affinity_key") in user_api_key_ids
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
count = 0
|
|||
|
|
for aff in user_affinities:
|
|||
|
|
affinity_key = aff.get("affinity_key")
|
|||
|
|
api_format = aff.get("api_format")
|
|||
|
|
model_name = aff.get("model_name")
|
|||
|
|
endpoint_id = aff.get("endpoint_id")
|
|||
|
|
if affinity_key and api_format and model_name:
|
|||
|
|
await affinity_mgr.invalidate_affinity(
|
|||
|
|
affinity_key, api_format, model_name, endpoint_id=endpoint_id
|
|||
|
|
)
|
|||
|
|
count += 1
|
|||
|
|
|
|||
|
|
logger.info(f"已清除用户缓存亲和性: username={user.username}, user_id={user_id[:8]}..., 清除数量={count}")
|
|||
|
|
|
|||
|
|
response = {
|
|||
|
|
"status": "ok",
|
|||
|
|
"message": f"已清除用户 {user.username} 的所有缓存亲和性",
|
|||
|
|
"user_info": {"user_id": user_id, "username": user.username, "email": user.email},
|
|||
|
|
}
|
|||
|
|
context.add_audit_metadata(
|
|||
|
|
action="cache_clear_user",
|
|||
|
|
user_identifier=self.user_identifier,
|
|||
|
|
resolved_user_id=user_id,
|
|||
|
|
cleared_count=count,
|
|||
|
|
)
|
|||
|
|
return response
|
|||
|
|
except HTTPException:
|
|||
|
|
raise
|
|||
|
|
except Exception as exc:
|
|||
|
|
logger.exception(f"清除用户缓存亲和性失败: {exc}")
|
|||
|
|
raise HTTPException(status_code=500, detail=f"清除失败: {exc}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
class AdminClearAllCacheAdapter(AdminApiAdapter):
|
|||
|
|
async def handle(self, context): # type: ignore[override]
|
|||
|
|
try:
|
|||
|
|
redis_client = get_redis_client_sync()
|
|||
|
|
affinity_mgr = await get_affinity_manager(redis_client)
|
|||
|
|
count = await affinity_mgr.clear_all()
|
|||
|
|
logger.warning(f"已清除所有缓存亲和性(管理员操作): {count} 个")
|
|||
|
|
context.add_audit_metadata(
|
|||
|
|
action="cache_clear_all",
|
|||
|
|
cleared_count=count,
|
|||
|
|
)
|
|||
|
|
return {"status": "ok", "message": "已清除所有缓存亲和性", "count": count}
|
|||
|
|
except Exception as exc:
|
|||
|
|
logger.exception(f"清除所有缓存亲和性失败: {exc}")
|
|||
|
|
raise HTTPException(status_code=500, detail=f"清除失败: {exc}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class AdminClearProviderCacheAdapter(AdminApiAdapter):
|
|||
|
|
provider_id: str
|
|||
|
|
|
|||
|
|
async def handle(self, context): # type: ignore[override]
|
|||
|
|
try:
|
|||
|
|
redis_client = get_redis_client_sync()
|
|||
|
|
affinity_mgr = await get_affinity_manager(redis_client)
|
|||
|
|
count = await affinity_mgr.invalidate_all_for_provider(self.provider_id)
|
|||
|
|
logger.info(f"已清除Provider缓存亲和性: provider_id={self.provider_id[:8]}..., count={count}")
|
|||
|
|
context.add_audit_metadata(
|
|||
|
|
action="cache_clear_provider",
|
|||
|
|
provider_id=self.provider_id,
|
|||
|
|
cleared_count=count,
|
|||
|
|
)
|
|||
|
|
return {
|
|||
|
|
"status": "ok",
|
|||
|
|
"message": "已清除Provider的缓存亲和性",
|
|||
|
|
"provider_id": self.provider_id,
|
|||
|
|
"count": count,
|
|||
|
|
}
|
|||
|
|
except Exception as exc:
|
|||
|
|
logger.exception(f"清除Provider缓存亲和性失败: {exc}")
|
|||
|
|
raise HTTPException(status_code=500, detail=f"清除失败: {exc}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
class AdminCacheConfigAdapter(AdminApiAdapter):
|
|||
|
|
async def handle(self, context): # type: ignore[override]
|
|||
|
|
from src.services.cache.affinity_manager import CacheAffinityManager
|
|||
|
|
from src.services.cache.aware_scheduler import CacheAwareScheduler
|
|||
|
|
from src.services.rate_limit.adaptive_reservation import get_adaptive_reservation_manager
|
|||
|
|
|
|||
|
|
# 获取动态预留管理器的配置
|
|||
|
|
reservation_manager = get_adaptive_reservation_manager()
|
|||
|
|
reservation_stats = reservation_manager.get_stats()
|
|||
|
|
|
|||
|
|
response = {
|
|||
|
|
"status": "ok",
|
|||
|
|
"data": {
|
|||
|
|
"cache_ttl_seconds": CacheAffinityManager.DEFAULT_CACHE_TTL,
|
|||
|
|
"cache_reservation_ratio": CacheAwareScheduler.CACHE_RESERVATION_RATIO,
|
|||
|
|
"dynamic_reservation": {
|
|||
|
|
"enabled": True,
|
|||
|
|
"config": reservation_stats["config"],
|
|||
|
|
"description": {
|
|||
|
|
"probe_phase_requests": "探测阶段请求数阈值",
|
|||
|
|
"probe_reservation": "探测阶段预留比例",
|
|||
|
|
"stable_min_reservation": "稳定阶段最小预留比例",
|
|||
|
|
"stable_max_reservation": "稳定阶段最大预留比例",
|
|||
|
|
"low_load_threshold": "低负载阈值(低于此值使用最小预留)",
|
|||
|
|
"high_load_threshold": "高负载阈值(高于此值根据置信度使用较高预留)",
|
|||
|
|
},
|
|||
|
|
},
|
|||
|
|
"description": {
|
|||
|
|
"cache_ttl": "缓存亲和性有效期(秒)",
|
|||
|
|
"cache_reservation_ratio": "静态预留比例(已被动态预留替代)",
|
|||
|
|
"dynamic_reservation": "动态预留机制配置",
|
|||
|
|
},
|
|||
|
|
},
|
|||
|
|
}
|
|||
|
|
context.add_audit_metadata(
|
|||
|
|
action="cache_config",
|
|||
|
|
cache_ttl_seconds=CacheAffinityManager.DEFAULT_CACHE_TTL,
|
|||
|
|
cache_reservation_ratio=CacheAwareScheduler.CACHE_RESERVATION_RATIO,
|
|||
|
|
dynamic_reservation_enabled=True,
|
|||
|
|
)
|
|||
|
|
return response
|