feat: 优化用量查询分页和热力图性能

- 用量查询接口添加 limit/offset 分页参数支持
- 热力图统计从实时查询 Usage 表改为读取预计算的 StatsDaily/StatsUserDaily 表
- 修复 avg_response_time_ms 为 0 时被错误跳过的问题
This commit is contained in:
fawney19
2026-01-04 18:02:47 +08:00
parent c02ac56da8
commit c3a5878b1b
3 changed files with 142 additions and 49 deletions

View File

@@ -75,6 +75,16 @@ export interface ModelSummary {
actual_total_cost_usd?: number // 倍率消耗(仅管理员可见)
}
// 提供商统计接口
export interface ProviderSummary {
provider: string
requests: number
total_tokens: number
total_cost_usd: number
success_rate: number | null
avg_response_time_ms: number | null
}
// 使用统计响应接口
export interface UsageResponse {
total_requests: number
@@ -87,6 +97,13 @@ export interface UsageResponse {
quota_usd: number | null
used_usd: number
summary_by_model: ModelSummary[]
summary_by_provider?: ProviderSummary[]
pagination?: {
total: number
limit: number
offset: number
has_more: boolean
}
records: UsageRecordDetail[]
activity_heatmap?: ActivityHeatmap | null
}
@@ -175,6 +192,8 @@ export const meApi = {
async getUsage(params?: {
start_date?: string
end_date?: string
limit?: number
offset?: number
}): Promise<UsageResponse> {
const response = await apiClient.get<UsageResponse>('/api/users/me/usage', { params })
return response.data

View File

@@ -104,9 +104,11 @@ async def get_my_usage(
request: Request,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: int = Query(100, ge=1, le=200, description="每页记录数默认100最大200"),
offset: int = Query(0, ge=0, le=2000, description="偏移量用于分页最大2000"),
db: Session = Depends(get_db),
):
adapter = GetUsageAdapter(start_date=start_date, end_date=end_date)
adapter = GetUsageAdapter(start_date=start_date, end_date=end_date, limit=limit, offset=offset)
return await pipeline.run(adapter=adapter, http_request=request, db=db, mode=adapter.mode)
@@ -471,6 +473,8 @@ class ToggleMyApiKeyAdapter(AuthenticatedApiAdapter):
class GetUsageAdapter(AuthenticatedApiAdapter):
start_date: Optional[datetime]
end_date: Optional[datetime]
limit: int = 100
offset: int = 0
async def handle(self, context): # type: ignore[override]
db = context.db
@@ -553,7 +557,7 @@ class GetUsageAdapter(AuthenticatedApiAdapter):
stats["total_cost_usd"] += item["total_cost_usd"]
# 假设 summary 中的都是成功的请求
stats["success_count"] += item["requests"]
if item.get("avg_response_time_ms"):
if item.get("avg_response_time_ms") is not None:
stats["total_response_time_ms"] += item["avg_response_time_ms"] * item["requests"]
stats["response_time_count"] += item["requests"]
@@ -582,7 +586,10 @@ class GetUsageAdapter(AuthenticatedApiAdapter):
query = query.filter(Usage.created_at >= self.start_date)
if self.end_date:
query = query.filter(Usage.created_at <= self.end_date)
usage_records = query.order_by(Usage.created_at.desc()).limit(100).all()
# 计算总数用于分页
total_records = query.count()
usage_records = query.order_by(Usage.created_at.desc()).offset(self.offset).limit(self.limit).all()
avg_resp_query = db.query(func.avg(Usage.response_time_ms)).filter(
Usage.user_id == user.id,
@@ -608,6 +615,13 @@ class GetUsageAdapter(AuthenticatedApiAdapter):
"used_usd": user.used_usd,
"summary_by_model": summary_by_model,
"summary_by_provider": summary_by_provider,
# 分页信息
"pagination": {
"total": total_records,
"limit": self.limit,
"offset": self.offset,
"has_more": self.offset + self.limit < total_records,
},
"records": [
{
"id": r.id,

View File

@@ -1027,7 +1027,12 @@ class UsageService:
window_days: int = 365,
include_actual_cost: bool = False,
) -> Dict[str, Any]:
"""按天统计请求活跃度,用于渲染热力图。"""
"""按天统计请求活跃度,用于渲染热力图。
优化策略:
- 历史数据从预计算的 StatsDaily/StatsUserDaily 表读取
- 只有"今天"的数据才实时查询 Usage 表
"""
def ensure_timezone(value: datetime) -> datetime:
if value.tzinfo is None:
@@ -1041,54 +1046,109 @@ class UsageService:
ensure_timezone(start_date) if start_date else end_dt - timedelta(days=window_days - 1)
)
# 对齐到自然日的开始/结束,避免遗漏边界数据
start_dt = start_dt.replace(hour=0, minute=0, second=0, microsecond=0)
end_dt = end_dt.replace(hour=23, minute=59, second=59, microsecond=999999)
from src.utils.database_helpers import date_trunc_portable
bind = db.get_bind()
dialect = bind.dialect.name if bind is not None else "sqlite"
day_bucket = date_trunc_portable(dialect, "day", Usage.created_at).label("day")
columns = [
day_bucket,
func.count(Usage.id).label("requests"),
func.sum(Usage.total_tokens).label("total_tokens"),
func.sum(Usage.total_cost_usd).label("total_cost_usd"),
]
if include_actual_cost:
columns.append(func.sum(Usage.actual_total_cost_usd).label("actual_total_cost_usd"))
query = db.query(*columns).filter(Usage.created_at >= start_dt, Usage.created_at <= end_dt)
if user_id:
query = query.filter(Usage.user_id == user_id)
query = query.group_by(day_bucket).order_by(day_bucket)
rows = query.all()
def normalize_period(value: Any) -> str:
if value is None:
return ""
if isinstance(value, str):
return value[:10]
if isinstance(value, datetime):
return value.date().isoformat()
return str(value)
# 对齐到自然日的开始/结束
start_dt = datetime.combine(start_dt.date(), datetime.min.time(), tzinfo=timezone.utc)
end_dt = datetime.combine(end_dt.date(), datetime.max.time(), tzinfo=timezone.utc)
today = now.date()
today_start_dt = datetime.combine(today, datetime.min.time(), tzinfo=timezone.utc)
aggregated: Dict[str, Dict[str, Any]] = {}
for row in rows:
key = normalize_period(row.day)
aggregated[key] = {
"requests": int(row.requests or 0),
"total_tokens": int(row.total_tokens or 0),
"total_cost_usd": float(row.total_cost_usd or 0.0),
}
if include_actual_cost:
aggregated[key]["actual_total_cost_usd"] = float(row.actual_total_cost_usd or 0.0)
# 1. 从预计算表读取历史数据(不包括今天)
if user_id:
from src.models.database import StatsUserDaily
hist_query = db.query(StatsUserDaily).filter(
StatsUserDaily.user_id == user_id,
StatsUserDaily.date >= start_dt,
StatsUserDaily.date < today_start_dt,
)
for row in hist_query.all():
key = (
row.date.date().isoformat()
if isinstance(row.date, datetime)
else str(row.date)[:10]
)
aggregated[key] = {
"requests": row.total_requests or 0,
"total_tokens": (
(row.input_tokens or 0)
+ (row.output_tokens or 0)
+ (row.cache_creation_tokens or 0)
+ (row.cache_read_tokens or 0)
),
"total_cost_usd": float(row.total_cost or 0.0),
}
# StatsUserDaily 没有 actual_total_cost 字段,用户视图不需要倍率成本
else:
from src.models.database import StatsDaily
hist_query = db.query(StatsDaily).filter(
StatsDaily.date >= start_dt,
StatsDaily.date < today_start_dt,
)
for row in hist_query.all():
key = (
row.date.date().isoformat()
if isinstance(row.date, datetime)
else str(row.date)[:10]
)
aggregated[key] = {
"requests": row.total_requests or 0,
"total_tokens": (
(row.input_tokens or 0)
+ (row.output_tokens or 0)
+ (row.cache_creation_tokens or 0)
+ (row.cache_read_tokens or 0)
),
"total_cost_usd": float(row.total_cost or 0.0),
}
if include_actual_cost:
aggregated[key]["actual_total_cost_usd"] = float(
row.actual_total_cost or 0.0 # type: ignore[attr-defined]
)
# 2. 实时查询今天的数据(如果在查询范围内)
if today >= start_dt.date() and today <= end_dt.date():
today_start = datetime.combine(today, datetime.min.time(), tzinfo=timezone.utc)
today_end = datetime.combine(today, datetime.max.time(), tzinfo=timezone.utc)
if include_actual_cost:
today_query = db.query(
func.count(Usage.id).label("requests"),
func.sum(Usage.total_tokens).label("total_tokens"),
func.sum(Usage.total_cost_usd).label("total_cost_usd"),
func.sum(Usage.actual_total_cost_usd).label("actual_total_cost_usd"),
).filter(
Usage.created_at >= today_start,
Usage.created_at <= today_end,
)
else:
today_query = db.query(
func.count(Usage.id).label("requests"),
func.sum(Usage.total_tokens).label("total_tokens"),
func.sum(Usage.total_cost_usd).label("total_cost_usd"),
).filter(
Usage.created_at >= today_start,
Usage.created_at <= today_end,
)
if user_id:
today_query = today_query.filter(Usage.user_id == user_id)
today_row = today_query.first()
if today_row and today_row.requests:
aggregated[today.isoformat()] = {
"requests": int(today_row.requests or 0),
"total_tokens": int(today_row.total_tokens or 0),
"total_cost_usd": float(today_row.total_cost_usd or 0.0),
}
if include_actual_cost:
aggregated[today.isoformat()]["actual_total_cost_usd"] = float(
today_row.actual_total_cost_usd or 0.0
)
# 3. 构建返回结果
days: List[Dict[str, Any]] = []
cursor = start_dt.date()
end_date_only = end_dt.date()