mirror of
https://github.com/fawney19/Aether.git
synced 2026-01-03 16:22:27 +08:00
Compare commits
7 Commits
d44cfaddf6
...
v0.1.22
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
03ad16ea8a | ||
|
|
2fa64b98e3 | ||
|
|
75d7e89cbb | ||
|
|
d73a443484 | ||
|
|
15a9b88fc8 | ||
|
|
03eb7203ec | ||
|
|
e38cd6819b |
@@ -105,7 +105,7 @@ RUN printf '%s\n' \
|
|||||||
'stderr_logfile=/var/log/nginx/error.log' \
|
'stderr_logfile=/var/log/nginx/error.log' \
|
||||||
'' \
|
'' \
|
||||||
'[program:app]' \
|
'[program:app]' \
|
||||||
'command=gunicorn src.main:app -w %(ENV_GUNICORN_WORKERS)s -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:%(ENV_PORT)s --timeout 120 --access-logfile - --error-logfile - --log-level info' \
|
'command=gunicorn src.main:app --preload -w %(ENV_GUNICORN_WORKERS)s -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:%(ENV_PORT)s --timeout 120 --access-logfile - --error-logfile - --log-level info' \
|
||||||
'directory=/app' \
|
'directory=/app' \
|
||||||
'autostart=true' \
|
'autostart=true' \
|
||||||
'autorestart=true' \
|
'autorestart=true' \
|
||||||
|
|||||||
@@ -106,7 +106,7 @@ RUN printf '%s\n' \
|
|||||||
'stderr_logfile=/var/log/nginx/error.log' \
|
'stderr_logfile=/var/log/nginx/error.log' \
|
||||||
'' \
|
'' \
|
||||||
'[program:app]' \
|
'[program:app]' \
|
||||||
'command=gunicorn src.main:app -w %(ENV_GUNICORN_WORKERS)s -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:%(ENV_PORT)s --timeout 120 --access-logfile - --error-logfile - --log-level info' \
|
'command=gunicorn src.main:app --preload -w %(ENV_GUNICORN_WORKERS)s -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:%(ENV_PORT)s --timeout 120 --access-logfile - --error-logfile - --log-level info' \
|
||||||
'directory=/app' \
|
'directory=/app' \
|
||||||
'autostart=true' \
|
'autostart=true' \
|
||||||
'autorestart=true' \
|
'autorestart=true' \
|
||||||
|
|||||||
@@ -394,6 +394,10 @@ def upgrade() -> None:
|
|||||||
index=True,
|
index=True,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
# usage 表复合索引(优化常见查询)
|
||||||
|
op.create_index("idx_usage_user_created", "usage", ["user_id", "created_at"])
|
||||||
|
op.create_index("idx_usage_apikey_created", "usage", ["api_key_id", "created_at"])
|
||||||
|
op.create_index("idx_usage_provider_model_created", "usage", ["provider", "model", "created_at"])
|
||||||
|
|
||||||
# ==================== user_quotas ====================
|
# ==================== user_quotas ====================
|
||||||
op.create_table(
|
op.create_table(
|
||||||
|
|||||||
@@ -18,33 +18,35 @@ depends_on = None
|
|||||||
def upgrade() -> None:
|
def upgrade() -> None:
|
||||||
"""为 usage 表添加复合索引以优化常见查询
|
"""为 usage 表添加复合索引以优化常见查询
|
||||||
|
|
||||||
使用 CONCURRENTLY 创建索引以避免锁表,
|
注意:这些索引已经在 baseline 迁移中创建。
|
||||||
但需要在 AUTOCOMMIT 模式下执行(不能在事务内)
|
此迁移仅用于从旧版本升级的场景,新安装会跳过。
|
||||||
"""
|
"""
|
||||||
conn = op.get_bind()
|
conn = op.get_bind()
|
||||||
engine = conn.engine
|
|
||||||
|
|
||||||
# 使用新连接并设置 AUTOCOMMIT 模式以支持 CREATE INDEX CONCURRENTLY
|
# 检查 usage 表是否存在
|
||||||
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as autocommit_conn:
|
result = conn.execute(text(
|
||||||
# 使用 IF NOT EXISTS 避免重复创建,无需单独检查索引是否存在
|
"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'usage')"
|
||||||
|
))
|
||||||
|
if not result.scalar():
|
||||||
|
# 表不存在,跳过
|
||||||
|
return
|
||||||
|
|
||||||
# 1. user_id + created_at 复合索引 (用户用量查询)
|
# 定义需要创建的索引
|
||||||
autocommit_conn.execute(text(
|
indexes = [
|
||||||
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_usage_user_created "
|
("idx_usage_user_created", "ON usage (user_id, created_at)"),
|
||||||
"ON usage (user_id, created_at)"
|
("idx_usage_apikey_created", "ON usage (api_key_id, created_at)"),
|
||||||
|
("idx_usage_provider_model_created", "ON usage (provider, model, created_at)"),
|
||||||
|
]
|
||||||
|
|
||||||
|
# 分别检查并创建每个索引
|
||||||
|
for index_name, index_def in indexes:
|
||||||
|
result = conn.execute(text(
|
||||||
|
f"SELECT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = '{index_name}')"
|
||||||
))
|
))
|
||||||
|
if result.scalar():
|
||||||
|
continue # 索引已存在,跳过
|
||||||
|
|
||||||
# 2. api_key_id + created_at 复合索引 (API Key 用量查询)
|
conn.execute(text(f"CREATE INDEX {index_name} {index_def}"))
|
||||||
autocommit_conn.execute(text(
|
|
||||||
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_usage_apikey_created "
|
|
||||||
"ON usage (api_key_id, created_at)"
|
|
||||||
))
|
|
||||||
|
|
||||||
# 3. provider + model + created_at 复合索引 (模型统计查询)
|
|
||||||
autocommit_conn.execute(text(
|
|
||||||
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_usage_provider_model_created "
|
|
||||||
"ON usage (provider, model, created_at)"
|
|
||||||
))
|
|
||||||
|
|
||||||
|
|
||||||
def downgrade() -> None:
|
def downgrade() -> None:
|
||||||
|
|||||||
@@ -26,10 +26,13 @@ calc_deps_hash() {
|
|||||||
cat pyproject.toml frontend/package.json frontend/package-lock.json Dockerfile.base.local 2>/dev/null | md5sum | cut -d' ' -f1
|
cat pyproject.toml frontend/package.json frontend/package-lock.json Dockerfile.base.local 2>/dev/null | md5sum | cut -d' ' -f1
|
||||||
}
|
}
|
||||||
|
|
||||||
# 计算代码文件的哈希值
|
# 计算代码文件的哈希值(包含 Dockerfile.app.local)
|
||||||
calc_code_hash() {
|
calc_code_hash() {
|
||||||
find src -type f -name "*.py" 2>/dev/null | sort | xargs cat 2>/dev/null | md5sum | cut -d' ' -f1
|
{
|
||||||
find frontend/src -type f \( -name "*.vue" -o -name "*.ts" -o -name "*.tsx" -o -name "*.js" \) 2>/dev/null | sort | xargs cat 2>/dev/null | md5sum | cut -d' ' -f1
|
cat Dockerfile.app.local 2>/dev/null
|
||||||
|
find src -type f -name "*.py" 2>/dev/null | sort | xargs cat 2>/dev/null
|
||||||
|
find frontend/src -type f \( -name "*.vue" -o -name "*.ts" -o -name "*.tsx" -o -name "*.js" \) 2>/dev/null | sort | xargs cat 2>/dev/null
|
||||||
|
} | md5sum | cut -d' ' -f1
|
||||||
}
|
}
|
||||||
|
|
||||||
# 计算迁移文件的哈希值
|
# 计算迁移文件的哈希值
|
||||||
|
|||||||
@@ -484,9 +484,8 @@ class ChatHandlerBase(BaseMessageHandler, ABC):
|
|||||||
|
|
||||||
stream_response.raise_for_status()
|
stream_response.raise_for_status()
|
||||||
|
|
||||||
# 使用字节流迭代器(避免 aiter_lines 的性能问题)
|
# 使用字节流迭代器(避免 aiter_lines 的性能问题, aiter_bytes 会自动解压 gzip/deflate)
|
||||||
# aiter_raw() 返回原始数据块,无缓冲,实现真正的流式传输
|
byte_iterator = stream_response.aiter_bytes()
|
||||||
byte_iterator = stream_response.aiter_raw()
|
|
||||||
|
|
||||||
# 预读检测嵌套错误
|
# 预读检测嵌套错误
|
||||||
prefetched_chunks = await stream_processor.prefetch_and_check_error(
|
prefetched_chunks = await stream_processor.prefetch_and_check_error(
|
||||||
|
|||||||
@@ -476,8 +476,8 @@ class CliMessageHandlerBase(BaseMessageHandler):
|
|||||||
|
|
||||||
stream_response.raise_for_status()
|
stream_response.raise_for_status()
|
||||||
|
|
||||||
# 使用字节流迭代器(避免 aiter_lines 的性能问题)
|
# 使用字节流迭代器(避免 aiter_lines 的性能问题, aiter_bytes 会自动解压 gzip/deflate)
|
||||||
byte_iterator = stream_response.aiter_raw()
|
byte_iterator = stream_response.aiter_bytes()
|
||||||
|
|
||||||
# 预读第一个数据块,检测嵌套错误(HTTP 200 但响应体包含错误)
|
# 预读第一个数据块,检测嵌套错误(HTTP 200 但响应体包含错误)
|
||||||
prefetched_chunks = await self._prefetch_and_check_embedded_error(
|
prefetched_chunks = await self._prefetch_and_check_embedded_error(
|
||||||
@@ -531,7 +531,7 @@ class CliMessageHandlerBase(BaseMessageHandler):
|
|||||||
# 检查是否需要格式转换
|
# 检查是否需要格式转换
|
||||||
needs_conversion = self._needs_format_conversion(ctx)
|
needs_conversion = self._needs_format_conversion(ctx)
|
||||||
|
|
||||||
async for chunk in stream_response.aiter_raw():
|
async for chunk in stream_response.aiter_bytes():
|
||||||
# 在第一次输出数据前更新状态为 streaming
|
# 在第一次输出数据前更新状态为 streaming
|
||||||
if not streaming_status_updated:
|
if not streaming_status_updated:
|
||||||
self._update_usage_to_streaming_with_ctx(ctx)
|
self._update_usage_to_streaming_with_ctx(ctx)
|
||||||
|
|||||||
@@ -4,17 +4,28 @@ Handler 基础工具函数
|
|||||||
|
|
||||||
from typing import Any, Dict, Optional
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
|
from src.core.logger import logger
|
||||||
|
|
||||||
|
|
||||||
def extract_cache_creation_tokens(usage: Dict[str, Any]) -> int:
|
def extract_cache_creation_tokens(usage: Dict[str, Any]) -> int:
|
||||||
"""
|
"""
|
||||||
提取缓存创建 tokens(兼容新旧格式)
|
提取缓存创建 tokens(兼容三种格式)
|
||||||
|
|
||||||
Claude API 在不同版本中使用了不同的字段名来表示缓存创建 tokens:
|
根据 Anthropic API 文档,支持三种格式(按优先级):
|
||||||
- 新格式(2024年后):使用 claude_cache_creation_5_m_tokens 和
|
|
||||||
claude_cache_creation_1_h_tokens 分别表示 5 分钟和 1 小时缓存
|
|
||||||
- 旧格式:使用 cache_creation_input_tokens 表示总的缓存创建 tokens
|
|
||||||
|
|
||||||
此函数自动检测并适配两种格式,优先使用新格式。
|
1. **嵌套格式(优先级最高)**:
|
||||||
|
usage.cache_creation.ephemeral_5m_input_tokens
|
||||||
|
usage.cache_creation.ephemeral_1h_input_tokens
|
||||||
|
|
||||||
|
2. **扁平新格式(优先级第二)**:
|
||||||
|
usage.claude_cache_creation_5_m_tokens
|
||||||
|
usage.claude_cache_creation_1_h_tokens
|
||||||
|
|
||||||
|
3. **旧格式(优先级第三)**:
|
||||||
|
usage.cache_creation_input_tokens
|
||||||
|
|
||||||
|
优先使用嵌套格式,如果嵌套格式字段存在但值为 0,则智能 fallback 到旧格式。
|
||||||
|
扁平格式和嵌套格式互斥,按顺序检查。
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
usage: API 响应中的 usage 字典
|
usage: API 响应中的 usage 字典
|
||||||
@@ -22,20 +33,63 @@ def extract_cache_creation_tokens(usage: Dict[str, Any]) -> int:
|
|||||||
Returns:
|
Returns:
|
||||||
缓存创建 tokens 总数
|
缓存创建 tokens 总数
|
||||||
"""
|
"""
|
||||||
# 检查新格式字段是否存在(而非值是否为 0)
|
# 1. 检查嵌套格式(最新格式)
|
||||||
# 如果字段存在,即使值为 0 也是合法的,不应 fallback 到旧格式
|
cache_creation = usage.get("cache_creation")
|
||||||
has_new_format = (
|
if isinstance(cache_creation, dict):
|
||||||
|
cache_5m = int(cache_creation.get("ephemeral_5m_input_tokens", 0))
|
||||||
|
cache_1h = int(cache_creation.get("ephemeral_1h_input_tokens", 0))
|
||||||
|
total = cache_5m + cache_1h
|
||||||
|
|
||||||
|
if total > 0:
|
||||||
|
logger.debug(
|
||||||
|
f"Using nested cache_creation: 5m={cache_5m}, 1h={cache_1h}, total={total}"
|
||||||
|
)
|
||||||
|
return total
|
||||||
|
|
||||||
|
# 嵌套格式存在但为 0,fallback 到旧格式
|
||||||
|
old_format = int(usage.get("cache_creation_input_tokens", 0))
|
||||||
|
if old_format > 0:
|
||||||
|
logger.debug(
|
||||||
|
f"Nested cache_creation is 0, using old format: {old_format}"
|
||||||
|
)
|
||||||
|
return old_format
|
||||||
|
|
||||||
|
# 都是 0,返回 0
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# 2. 检查扁平新格式
|
||||||
|
has_flat_format = (
|
||||||
"claude_cache_creation_5_m_tokens" in usage
|
"claude_cache_creation_5_m_tokens" in usage
|
||||||
or "claude_cache_creation_1_h_tokens" in usage
|
or "claude_cache_creation_1_h_tokens" in usage
|
||||||
)
|
)
|
||||||
|
|
||||||
if has_new_format:
|
if has_flat_format:
|
||||||
cache_5m = usage.get("claude_cache_creation_5_m_tokens", 0)
|
cache_5m = int(usage.get("claude_cache_creation_5_m_tokens", 0))
|
||||||
cache_1h = usage.get("claude_cache_creation_1_h_tokens", 0)
|
cache_1h = int(usage.get("claude_cache_creation_1_h_tokens", 0))
|
||||||
return int(cache_5m) + int(cache_1h)
|
total = cache_5m + cache_1h
|
||||||
|
|
||||||
# 回退到旧格式
|
if total > 0:
|
||||||
return int(usage.get("cache_creation_input_tokens", 0))
|
logger.debug(
|
||||||
|
f"Using flat new format: 5m={cache_5m}, 1h={cache_1h}, total={total}"
|
||||||
|
)
|
||||||
|
return total
|
||||||
|
|
||||||
|
# 扁平格式存在但为 0,fallback 到旧格式
|
||||||
|
old_format = int(usage.get("cache_creation_input_tokens", 0))
|
||||||
|
if old_format > 0:
|
||||||
|
logger.debug(
|
||||||
|
f"Flat cache_creation is 0, using old format: {old_format}"
|
||||||
|
)
|
||||||
|
return old_format
|
||||||
|
|
||||||
|
# 都是 0,返回 0
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# 3. 回退到旧格式
|
||||||
|
old_format = int(usage.get("cache_creation_input_tokens", 0))
|
||||||
|
if old_format > 0:
|
||||||
|
logger.debug(f"Using old format: cache_creation_input_tokens={old_format}")
|
||||||
|
return old_format
|
||||||
|
|
||||||
|
|
||||||
def build_sse_headers(extra_headers: Optional[Dict[str, str]] = None) -> Dict[str, str]:
|
def build_sse_headers(extra_headers: Optional[Dict[str, str]] = None) -> Dict[str, str]:
|
||||||
|
|||||||
@@ -208,86 +208,120 @@ class CleanupScheduler:
|
|||||||
return
|
return
|
||||||
|
|
||||||
# 非首次运行,检查最近是否有缺失的日期需要回填
|
# 非首次运行,检查最近是否有缺失的日期需要回填
|
||||||
latest_stat = db.query(StatsDaily).order_by(StatsDaily.date.desc()).first()
|
from src.models.database import StatsDailyModel
|
||||||
|
|
||||||
if latest_stat:
|
yesterday_business_date = today_local.date() - timedelta(days=1)
|
||||||
latest_date_utc = latest_stat.date
|
max_backfill_days: int = SystemConfigService.get_config(
|
||||||
if latest_date_utc.tzinfo is None:
|
db, "max_stats_backfill_days", 30
|
||||||
latest_date_utc = latest_date_utc.replace(tzinfo=timezone.utc)
|
) or 30
|
||||||
else:
|
|
||||||
latest_date_utc = latest_date_utc.astimezone(timezone.utc)
|
|
||||||
|
|
||||||
# 使用业务日期计算缺失区间(避免用 UTC 年月日导致日期偏移,且对 DST 更安全)
|
# 计算回填检查的起始日期
|
||||||
latest_business_date = latest_date_utc.astimezone(app_tz).date()
|
check_start_date = yesterday_business_date - timedelta(
|
||||||
yesterday_business_date = today_local.date() - timedelta(days=1)
|
days=max_backfill_days - 1
|
||||||
missing_start_date = latest_business_date + timedelta(days=1)
|
)
|
||||||
|
|
||||||
if missing_start_date <= yesterday_business_date:
|
# 获取 StatsDaily 和 StatsDailyModel 中已有数据的日期集合
|
||||||
missing_days = (
|
existing_daily_dates = set()
|
||||||
yesterday_business_date - missing_start_date
|
existing_model_dates = set()
|
||||||
).days + 1
|
|
||||||
|
|
||||||
# 限制最大回填天数,防止停机很久后一次性回填太多
|
daily_stats = (
|
||||||
max_backfill_days: int = SystemConfigService.get_config(
|
db.query(StatsDaily.date)
|
||||||
db, "max_stats_backfill_days", 30
|
.filter(StatsDaily.date >= check_start_date.isoformat())
|
||||||
) or 30
|
.all()
|
||||||
if missing_days > max_backfill_days:
|
)
|
||||||
logger.warning(
|
for (stat_date,) in daily_stats:
|
||||||
f"缺失 {missing_days} 天数据超过最大回填限制 "
|
if stat_date.tzinfo is None:
|
||||||
f"{max_backfill_days} 天,只回填最近 {max_backfill_days} 天"
|
stat_date = stat_date.replace(tzinfo=timezone.utc)
|
||||||
|
existing_daily_dates.add(stat_date.astimezone(app_tz).date())
|
||||||
|
|
||||||
|
model_stats = (
|
||||||
|
db.query(StatsDailyModel.date)
|
||||||
|
.filter(StatsDailyModel.date >= check_start_date.isoformat())
|
||||||
|
.distinct()
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
for (stat_date,) in model_stats:
|
||||||
|
if stat_date.tzinfo is None:
|
||||||
|
stat_date = stat_date.replace(tzinfo=timezone.utc)
|
||||||
|
existing_model_dates.add(stat_date.astimezone(app_tz).date())
|
||||||
|
|
||||||
|
# 找出需要回填的日期
|
||||||
|
all_dates = set()
|
||||||
|
current = check_start_date
|
||||||
|
while current <= yesterday_business_date:
|
||||||
|
all_dates.add(current)
|
||||||
|
current += timedelta(days=1)
|
||||||
|
|
||||||
|
# 需要回填 StatsDaily 的日期
|
||||||
|
missing_daily_dates = all_dates - existing_daily_dates
|
||||||
|
# 需要回填 StatsDailyModel 的日期
|
||||||
|
missing_model_dates = all_dates - existing_model_dates
|
||||||
|
# 合并所有需要处理的日期
|
||||||
|
dates_to_process = missing_daily_dates | missing_model_dates
|
||||||
|
|
||||||
|
if dates_to_process:
|
||||||
|
sorted_dates = sorted(dates_to_process)
|
||||||
|
logger.info(
|
||||||
|
f"检测到 {len(dates_to_process)} 天的统计数据需要回填 "
|
||||||
|
f"(StatsDaily 缺失 {len(missing_daily_dates)} 天, "
|
||||||
|
f"StatsDailyModel 缺失 {len(missing_model_dates)} 天)"
|
||||||
|
)
|
||||||
|
|
||||||
|
users = (
|
||||||
|
db.query(DBUser.id).filter(DBUser.is_active.is_(True)).all()
|
||||||
|
)
|
||||||
|
|
||||||
|
failed_dates = 0
|
||||||
|
failed_users = 0
|
||||||
|
|
||||||
|
for current_date in sorted_dates:
|
||||||
|
try:
|
||||||
|
current_date_local = datetime.combine(
|
||||||
|
current_date, datetime.min.time(), tzinfo=app_tz
|
||||||
)
|
)
|
||||||
missing_start_date = yesterday_business_date - timedelta(
|
# 只在缺失时才聚合对应的表
|
||||||
days=max_backfill_days - 1
|
if current_date in missing_daily_dates:
|
||||||
)
|
|
||||||
missing_days = max_backfill_days
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
f"检测到缺失 {missing_days} 天的统计数据 "
|
|
||||||
f"({missing_start_date} ~ {yesterday_business_date}),开始回填..."
|
|
||||||
)
|
|
||||||
|
|
||||||
current_date = missing_start_date
|
|
||||||
users = (
|
|
||||||
db.query(DBUser.id).filter(DBUser.is_active.is_(True)).all()
|
|
||||||
)
|
|
||||||
|
|
||||||
while current_date <= yesterday_business_date:
|
|
||||||
try:
|
|
||||||
current_date_local = datetime.combine(
|
|
||||||
current_date, datetime.min.time(), tzinfo=app_tz
|
|
||||||
)
|
|
||||||
StatsAggregatorService.aggregate_daily_stats(
|
StatsAggregatorService.aggregate_daily_stats(
|
||||||
db, current_date_local
|
db, current_date_local
|
||||||
)
|
)
|
||||||
|
if current_date in missing_model_dates:
|
||||||
StatsAggregatorService.aggregate_daily_model_stats(
|
StatsAggregatorService.aggregate_daily_model_stats(
|
||||||
db, current_date_local
|
db, current_date_local
|
||||||
)
|
)
|
||||||
for (user_id,) in users:
|
# 用户统计在任一缺失时都回填
|
||||||
try:
|
for (user_id,) in users:
|
||||||
StatsAggregatorService.aggregate_user_daily_stats(
|
|
||||||
db, user_id, current_date_local
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(
|
|
||||||
f"回填用户 {user_id} 日期 {current_date} 失败: {e}"
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
db.rollback()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"回填日期 {current_date} 失败: {e}")
|
|
||||||
try:
|
try:
|
||||||
db.rollback()
|
StatsAggregatorService.aggregate_user_daily_stats(
|
||||||
except Exception:
|
db, user_id, current_date_local
|
||||||
pass
|
)
|
||||||
|
except Exception as e:
|
||||||
|
failed_users += 1
|
||||||
|
logger.warning(
|
||||||
|
f"回填用户 {user_id} 日期 {current_date} 失败: {e}"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
db.rollback()
|
||||||
|
except Exception as rollback_err:
|
||||||
|
logger.error(f"回滚失败: {rollback_err}")
|
||||||
|
except Exception as e:
|
||||||
|
failed_dates += 1
|
||||||
|
logger.warning(f"回填日期 {current_date} 失败: {e}")
|
||||||
|
try:
|
||||||
|
db.rollback()
|
||||||
|
except Exception as rollback_err:
|
||||||
|
logger.error(f"回滚失败: {rollback_err}")
|
||||||
|
|
||||||
current_date += timedelta(days=1)
|
StatsAggregatorService.update_summary(db)
|
||||||
|
|
||||||
StatsAggregatorService.update_summary(db)
|
if failed_dates > 0 or failed_users > 0:
|
||||||
logger.info(f"缺失数据回填完成,共 {missing_days} 天")
|
logger.warning(
|
||||||
|
f"回填完成,共处理 {len(dates_to_process)} 天,"
|
||||||
|
f"失败: {failed_dates} 天, {failed_users} 个用户记录"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
logger.info("统计数据已是最新,无需回填")
|
logger.info(f"缺失数据回填完成,共处理 {len(dates_to_process)} 天")
|
||||||
|
else:
|
||||||
|
logger.info("统计数据已是最新,无需回填")
|
||||||
return
|
return
|
||||||
|
|
||||||
# 定时任务:聚合昨天的数据
|
# 定时任务:聚合昨天的数据
|
||||||
|
|||||||
@@ -8,86 +8,116 @@ from src.api.handlers.base.utils import build_sse_headers, extract_cache_creatio
|
|||||||
class TestExtractCacheCreationTokens:
|
class TestExtractCacheCreationTokens:
|
||||||
"""测试 extract_cache_creation_tokens 函数"""
|
"""测试 extract_cache_creation_tokens 函数"""
|
||||||
|
|
||||||
def test_new_format_only(self) -> None:
|
# === 嵌套格式测试(优先级最高)===
|
||||||
"""测试只有新格式字段"""
|
|
||||||
|
def test_nested_cache_creation_format(self) -> None:
|
||||||
|
"""测试嵌套格式正常情况"""
|
||||||
|
usage = {
|
||||||
|
"cache_creation": {
|
||||||
|
"ephemeral_5m_input_tokens": 456,
|
||||||
|
"ephemeral_1h_input_tokens": 100,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert extract_cache_creation_tokens(usage) == 556
|
||||||
|
|
||||||
|
def test_nested_cache_creation_with_old_format_fallback(self) -> None:
|
||||||
|
"""测试嵌套格式为 0 时回退到旧格式"""
|
||||||
|
usage = {
|
||||||
|
"cache_creation": {
|
||||||
|
"ephemeral_5m_input_tokens": 0,
|
||||||
|
"ephemeral_1h_input_tokens": 0,
|
||||||
|
},
|
||||||
|
"cache_creation_input_tokens": 549,
|
||||||
|
}
|
||||||
|
assert extract_cache_creation_tokens(usage) == 549
|
||||||
|
|
||||||
|
def test_nested_has_priority_over_flat(self) -> None:
|
||||||
|
"""测试嵌套格式优先于扁平格式"""
|
||||||
|
usage = {
|
||||||
|
"cache_creation": {
|
||||||
|
"ephemeral_5m_input_tokens": 100,
|
||||||
|
"ephemeral_1h_input_tokens": 200,
|
||||||
|
},
|
||||||
|
"claude_cache_creation_5_m_tokens": 999, # 应该被忽略
|
||||||
|
"claude_cache_creation_1_h_tokens": 888, # 应该被忽略
|
||||||
|
"cache_creation_input_tokens": 777, # 应该被忽略
|
||||||
|
}
|
||||||
|
assert extract_cache_creation_tokens(usage) == 300
|
||||||
|
|
||||||
|
# === 扁平格式测试(优先级第二)===
|
||||||
|
|
||||||
|
def test_flat_new_format_still_works(self) -> None:
|
||||||
|
"""测试扁平新格式兼容性"""
|
||||||
usage = {
|
usage = {
|
||||||
"claude_cache_creation_5_m_tokens": 100,
|
"claude_cache_creation_5_m_tokens": 100,
|
||||||
"claude_cache_creation_1_h_tokens": 200,
|
"claude_cache_creation_1_h_tokens": 200,
|
||||||
}
|
}
|
||||||
assert extract_cache_creation_tokens(usage) == 300
|
assert extract_cache_creation_tokens(usage) == 300
|
||||||
|
|
||||||
def test_new_format_5m_only(self) -> None:
|
def test_flat_new_format_with_old_format_fallback(self) -> None:
|
||||||
"""测试只有 5 分钟缓存"""
|
"""测试扁平格式为 0 时回退到旧格式"""
|
||||||
|
usage = {
|
||||||
|
"claude_cache_creation_5_m_tokens": 0,
|
||||||
|
"claude_cache_creation_1_h_tokens": 0,
|
||||||
|
"cache_creation_input_tokens": 549,
|
||||||
|
}
|
||||||
|
assert extract_cache_creation_tokens(usage) == 549
|
||||||
|
|
||||||
|
def test_flat_new_format_5m_only(self) -> None:
|
||||||
|
"""测试只有 5 分钟扁平缓存"""
|
||||||
usage = {
|
usage = {
|
||||||
"claude_cache_creation_5_m_tokens": 150,
|
"claude_cache_creation_5_m_tokens": 150,
|
||||||
"claude_cache_creation_1_h_tokens": 0,
|
"claude_cache_creation_1_h_tokens": 0,
|
||||||
}
|
}
|
||||||
assert extract_cache_creation_tokens(usage) == 150
|
assert extract_cache_creation_tokens(usage) == 150
|
||||||
|
|
||||||
def test_new_format_1h_only(self) -> None:
|
def test_flat_new_format_1h_only(self) -> None:
|
||||||
"""测试只有 1 小时缓存"""
|
"""测试只有 1 小时扁平缓存"""
|
||||||
usage = {
|
usage = {
|
||||||
"claude_cache_creation_5_m_tokens": 0,
|
"claude_cache_creation_5_m_tokens": 0,
|
||||||
"claude_cache_creation_1_h_tokens": 250,
|
"claude_cache_creation_1_h_tokens": 250,
|
||||||
}
|
}
|
||||||
assert extract_cache_creation_tokens(usage) == 250
|
assert extract_cache_creation_tokens(usage) == 250
|
||||||
|
|
||||||
|
# === 旧格式测试(优先级第三)===
|
||||||
|
|
||||||
def test_old_format_only(self) -> None:
|
def test_old_format_only(self) -> None:
|
||||||
"""测试只有旧格式字段"""
|
"""测试只有旧格式"""
|
||||||
usage = {
|
usage = {
|
||||||
"cache_creation_input_tokens": 500,
|
"cache_creation_input_tokens": 549,
|
||||||
}
|
}
|
||||||
assert extract_cache_creation_tokens(usage) == 500
|
assert extract_cache_creation_tokens(usage) == 549
|
||||||
|
|
||||||
def test_both_formats_prefers_new(self) -> None:
|
# === 边界情况测试 ===
|
||||||
"""测试同时存在时优先使用新格式"""
|
|
||||||
usage = {
|
|
||||||
"claude_cache_creation_5_m_tokens": 100,
|
|
||||||
"claude_cache_creation_1_h_tokens": 200,
|
|
||||||
"cache_creation_input_tokens": 999, # 应该被忽略
|
|
||||||
}
|
|
||||||
assert extract_cache_creation_tokens(usage) == 300
|
|
||||||
|
|
||||||
def test_empty_usage(self) -> None:
|
def test_no_cache_creation_tokens(self) -> None:
|
||||||
"""测试空字典"""
|
"""测试没有任何缓存字段"""
|
||||||
usage = {}
|
usage = {}
|
||||||
assert extract_cache_creation_tokens(usage) == 0
|
assert extract_cache_creation_tokens(usage) == 0
|
||||||
|
|
||||||
def test_all_zeros(self) -> None:
|
def test_all_formats_zero(self) -> None:
|
||||||
"""测试所有字段都为 0"""
|
"""测试所有格式都为 0"""
|
||||||
usage = {
|
usage = {
|
||||||
|
"cache_creation": {
|
||||||
|
"ephemeral_5m_input_tokens": 0,
|
||||||
|
"ephemeral_1h_input_tokens": 0,
|
||||||
|
},
|
||||||
"claude_cache_creation_5_m_tokens": 0,
|
"claude_cache_creation_5_m_tokens": 0,
|
||||||
"claude_cache_creation_1_h_tokens": 0,
|
"claude_cache_creation_1_h_tokens": 0,
|
||||||
"cache_creation_input_tokens": 0,
|
"cache_creation_input_tokens": 0,
|
||||||
}
|
}
|
||||||
assert extract_cache_creation_tokens(usage) == 0
|
assert extract_cache_creation_tokens(usage) == 0
|
||||||
|
|
||||||
def test_partial_new_format_with_old_format_fallback(self) -> None:
|
|
||||||
"""测试新格式字段不存在时回退到旧格式"""
|
|
||||||
usage = {
|
|
||||||
"cache_creation_input_tokens": 123,
|
|
||||||
}
|
|
||||||
assert extract_cache_creation_tokens(usage) == 123
|
|
||||||
|
|
||||||
def test_new_format_zero_should_not_fallback(self) -> None:
|
|
||||||
"""测试新格式字段存在但为 0 时,不应 fallback 到旧格式"""
|
|
||||||
usage = {
|
|
||||||
"claude_cache_creation_5_m_tokens": 0,
|
|
||||||
"claude_cache_creation_1_h_tokens": 0,
|
|
||||||
"cache_creation_input_tokens": 456,
|
|
||||||
}
|
|
||||||
# 新格式字段存在,即使值为 0 也应该使用新格式(返回 0)
|
|
||||||
# 而不是 fallback 到旧格式(返回 456)
|
|
||||||
assert extract_cache_creation_tokens(usage) == 0
|
|
||||||
|
|
||||||
def test_unrelated_fields_ignored(self) -> None:
|
def test_unrelated_fields_ignored(self) -> None:
|
||||||
"""测试忽略无关字段"""
|
"""测试忽略无关字段"""
|
||||||
usage = {
|
usage = {
|
||||||
"input_tokens": 1000,
|
"input_tokens": 1000,
|
||||||
"output_tokens": 2000,
|
"output_tokens": 2000,
|
||||||
"cache_read_input_tokens": 300,
|
"cache_read_input_tokens": 300,
|
||||||
"claude_cache_creation_5_m_tokens": 50,
|
"cache_creation": {
|
||||||
"claude_cache_creation_1_h_tokens": 75,
|
"ephemeral_5m_input_tokens": 50,
|
||||||
|
"ephemeral_1h_input_tokens": 75,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
assert extract_cache_creation_tokens(usage) == 125
|
assert extract_cache_creation_tokens(usage) == 125
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user