feat: 用户用量页面支持分页、搜索和密钥信息展示

- 用户用量API增加search参数支持密钥名、模型名搜索
- 用户用量API返回api_key信息(id、name、display)
- 用户页面记录表格增加密钥列显示
- 前端统一管理员和用户页面的分页/搜索逻辑
- 后端LIKE查询增加特殊字符转义防止SQL注入
- 添加escape_like_pattern和safe_truncate_escaped工具函数
This commit is contained in:
fawney19
2026-01-05 19:32:57 +08:00
parent 142e15bbcc
commit 431c6de8d2
10 changed files with 262 additions and 109 deletions

View File

@@ -92,9 +92,9 @@ async def get_usage_records(
request: Request,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
search: Optional[str] = None, # 通用搜索:用户名、密钥名、模型名、提供商名
user_id: Optional[str] = None,
username: Optional[str] = None,
user_api_key_name: Optional[str] = None,
model: Optional[str] = None,
provider: Optional[str] = None,
status: Optional[str] = None, # stream, standard, error
@@ -105,9 +105,9 @@ async def get_usage_records(
adapter = AdminUsageRecordsAdapter(
start_date=start_date,
end_date=end_date,
search=search,
user_id=user_id,
username=username,
user_api_key_name=user_api_key_name,
model=model,
provider=provider,
status=status,
@@ -502,9 +502,9 @@ class AdminUsageRecordsAdapter(AdminApiAdapter):
self,
start_date: Optional[datetime],
end_date: Optional[datetime],
search: Optional[str],
user_id: Optional[str],
username: Optional[str],
user_api_key_name: Optional[str],
model: Optional[str],
provider: Optional[str],
status: Optional[str],
@@ -513,9 +513,9 @@ class AdminUsageRecordsAdapter(AdminApiAdapter):
):
self.start_date = start_date
self.end_date = end_date
self.search = search
self.user_id = user_id
self.username = username
self.user_api_key_name = user_api_key_name
self.model = model
self.provider = provider
self.status = status
@@ -523,6 +523,10 @@ class AdminUsageRecordsAdapter(AdminApiAdapter):
self.offset = offset
async def handle(self, context): # type: ignore[override]
from sqlalchemy import or_
from src.utils.database_helpers import escape_like_pattern, safe_truncate_escaped
db = context.db
query = (
db.query(Usage, User, ProviderEndpoint, ProviderAPIKey, ApiKey)
@@ -531,21 +535,42 @@ class AdminUsageRecordsAdapter(AdminApiAdapter):
.outerjoin(ProviderAPIKey, Usage.provider_api_key_id == ProviderAPIKey.id)
.outerjoin(ApiKey, Usage.api_key_id == ApiKey.id)
)
# 如果需要按 Provider 名称搜索/筛选,统一在这里 JOIN
if self.search or self.provider:
query = query.join(Provider, Usage.provider_id == Provider.id, isouter=True)
# 通用搜索:用户名、密钥名、模型名、提供商名
# 支持空格分隔的组合搜索,多个关键词之间是 AND 关系
# 限制:最多 10 个关键词,转义后每个关键词最长 100 字符
if self.search:
keywords = [kw for kw in self.search.strip().split() if kw][:10]
for keyword in keywords:
escaped = safe_truncate_escaped(escape_like_pattern(keyword), 100)
search_pattern = f"%{escaped}%"
query = query.filter(
or_(
User.username.ilike(search_pattern, escape="\\"),
ApiKey.name.ilike(search_pattern, escape="\\"),
Usage.model.ilike(search_pattern, escape="\\"),
Provider.name.ilike(search_pattern, escape="\\"),
)
)
if self.user_id:
query = query.filter(Usage.user_id == self.user_id)
if self.username:
# 支持用户名模糊搜索
query = query.filter(User.username.ilike(f"%{self.username}%"))
if self.user_api_key_name:
# 支持用户 API Key 名称模糊搜索(注意:不是 Provider Key
query = query.filter(ApiKey.name.ilike(f"%{self.user_api_key_name}%"))
escaped = escape_like_pattern(self.username)
query = query.filter(User.username.ilike(f"%{escaped}%", escape="\\"))
if self.model:
# 支持模型名模糊搜索
query = query.filter(Usage.model.ilike(f"%{self.model}%"))
escaped = escape_like_pattern(self.model)
query = query.filter(Usage.model.ilike(f"%{escaped}%", escape="\\"))
if self.provider:
# 支持提供商名称搜索(通过 Provider 表)
query = query.join(Provider, Usage.provider_id == Provider.id, isouter=True)
query = query.filter(Provider.name.ilike(f"%{self.provider}%"))
# 支持提供商名称搜索
escaped = escape_like_pattern(self.provider)
query = query.filter(Provider.name.ilike(f"%{escaped}%", escape="\\"))
if self.status:
# 状态筛选
# 旧的筛选值(基于 is_stream 和 status_codestream, standard, error
@@ -603,9 +628,9 @@ class AdminUsageRecordsAdapter(AdminApiAdapter):
action="usage_records",
start_date=self.start_date.isoformat() if self.start_date else None,
end_date=self.end_date.isoformat() if self.end_date else None,
search=self.search,
user_id=self.user_id,
username=self.username,
user_api_key_name=self.user_api_key_name,
model=self.model,
provider=self.provider,
status=self.status,

View File

@@ -104,11 +104,14 @@ async def get_my_usage(
request: Request,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
search: Optional[str] = None, # 通用搜索:密钥名、模型名
limit: int = Query(100, ge=1, le=200, description="每页记录数默认100最大200"),
offset: int = Query(0, ge=0, le=2000, description="偏移量用于分页最大2000"),
db: Session = Depends(get_db),
):
adapter = GetUsageAdapter(start_date=start_date, end_date=end_date, limit=limit, offset=offset)
adapter = GetUsageAdapter(
start_date=start_date, end_date=end_date, search=search, limit=limit, offset=offset
)
return await pipeline.run(adapter=adapter, http_request=request, db=db, mode=adapter.mode)
@@ -487,10 +490,15 @@ class ToggleMyApiKeyAdapter(AuthenticatedApiAdapter):
class GetUsageAdapter(AuthenticatedApiAdapter):
start_date: Optional[datetime]
end_date: Optional[datetime]
search: Optional[str] = None
limit: int = 100
offset: int = 0
async def handle(self, context): # type: ignore[override]
from sqlalchemy import or_
from src.utils.database_helpers import escape_like_pattern, safe_truncate_escaped
db = context.db
user = context.user
summary_list = UsageService.get_usage_summary(
@@ -595,12 +603,30 @@ class GetUsageAdapter(AuthenticatedApiAdapter):
})
summary_by_provider = sorted(summary_by_provider, key=lambda x: x["requests"], reverse=True)
query = db.query(Usage).filter(Usage.user_id == user.id)
query = (
db.query(Usage, ApiKey)
.outerjoin(ApiKey, Usage.api_key_id == ApiKey.id)
.filter(Usage.user_id == user.id)
)
if self.start_date:
query = query.filter(Usage.created_at >= self.start_date)
if self.end_date:
query = query.filter(Usage.created_at <= self.end_date)
# 通用搜索:密钥名、模型名
# 支持空格分隔的组合搜索,多个关键词之间是 AND 关系
if self.search and self.search.strip():
keywords = [kw for kw in self.search.strip().split() if kw][:10]
for keyword in keywords:
escaped = safe_truncate_escaped(escape_like_pattern(keyword), 100)
search_pattern = f"%{escaped}%"
query = query.filter(
or_(
ApiKey.name.ilike(search_pattern, escape="\\"),
Usage.model.ilike(search_pattern, escape="\\"),
)
)
# 计算总数用于分页
total_records = query.count()
usage_records = query.order_by(Usage.created_at.desc()).offset(self.offset).limit(self.limit).all()
@@ -659,8 +685,17 @@ class GetUsageAdapter(AuthenticatedApiAdapter):
"output_price_per_1m": r.output_price_per_1m,
"cache_creation_price_per_1m": r.cache_creation_price_per_1m,
"cache_read_price_per_1m": r.cache_read_price_per_1m,
"api_key": (
{
"id": str(api_key.id),
"name": api_key.name,
"display": api_key.get_display_key(),
}
if api_key
else None
),
}
for r in usage_records
for r, api_key in usage_records
],
}
@@ -668,7 +703,7 @@ class GetUsageAdapter(AuthenticatedApiAdapter):
if user.role == "admin":
response_data["total_actual_cost"] = total_actual_cost
# 为每条记录添加真实成本和倍率信息
for i, r in enumerate(usage_records):
for i, (r, _) in enumerate(usage_records):
# 确保字段有值,避免前端显示 -
actual_cost = (
r.actual_total_cost_usd if r.actual_total_cost_usd is not None else 0.0