26 Commits

Author SHA1 Message Date
fawney19
7553b0da80 fix: 优化自动刷新交互和ESC关闭样式
- 自动刷新改为按钮切换模式,移除独立Switch开关
- 自动刷新间隔从30s改为10s
- ESC关闭弹窗后blur焦点,避免样式残留
2025-12-19 18:47:14 +08:00
fawney19
8f30bf0bef Merge pull request #32 from htmambo/master
个性化处理
2025-12-19 18:46:26 +08:00
hoping
8c12174521 个性化处理
1. 为所有抽屉和对话框添加 ESC 键关闭功能;
2. 为`使用记录`表格添加自动刷新开关;
3. 为后端 API 请求增加 User-Agent 头部;
4. 修改启动命令支持从.env中读取数据库和Redis配置。
2025-12-19 17:31:15 +08:00
fawney19
6aa1876955 feat: add Dockerfile.app.local for China mirror support 2025-12-19 16:20:02 +08:00
fawney19
7f07122aea refactor: separate frontend build from base image for faster incremental builds 2025-12-19 16:02:38 +08:00
fawney19
c2ddc6bd3c refactor: optimize Docker build with multi-stage and slim runtime base image 2025-12-19 15:51:21 +08:00
fawney19
af476ff21e feat: enhance error logging and upstream response tracking for provider failures 2025-12-19 15:29:48 +08:00
fawney19
3bbc1c6b66 feat: add provider compatibility error detection for intelligent failover
- Introduce ProviderCompatibilityException for unsupported parameter/feature errors
- Add COMPATIBILITY_ERROR_PATTERNS to detect provider-specific limitations
- Implement _is_compatibility_error() method in ErrorClassifier
- Prioritize compatibility error checking before client error validation
- Remove 'max_tokens' from CLIENT_ERROR_PATTERNS as it can indicate compatibility issues
- Enable automatic failover when provider doesn't support requested features
- Improve error classification accuracy with pattern matching for common compatibility issues
2025-12-19 13:28:26 +08:00
fawney19
c69a0a8506 refactor: remove stream smoothing config from system settings and improve base image caching
- Remove stream_smoothing configuration from SystemConfigService (moved to handler default)
- Remove stream smoothing UI controls from admin settings page
- Add AdminClearSingleAffinityAdapter for targeted cache invalidation
- Add clearSingleAffinity API endpoint to clear specific affinity cache entries
- Include global_model_id in affinity list response for UI deletion support
- Improve CI/CD workflow with hash-based base image change detection
- Add hash label to base image for reliable cache invalidation detection
- Use remote image inspection to determine if base image rebuild is needed
- Include Dockerfile.base in hash calculation for proper dependency tracking
2025-12-19 13:09:56 +08:00
fawney19
1fae202bde Merge pull request #30 from AAEE86/master
chore: Modify the order of API format enumeration
2025-12-19 12:34:22 +08:00
fawney19
b9a26c4550 fix: add SETUPTOOLS_SCM_PRETEND_VERSION for CI builds 2025-12-19 12:01:19 +08:00
AAEE86
e42bd35d48 chore: Modify the order of API format enumeration - Move CLAUDE_CLI before OPENAI 2025-12-19 11:44:10 +08:00
fawney19
f22a073fd9 fix: rebuild app image when base image changes during deployment
- Track BASE_REBUILT flag to detect base image rebuilds
- Force app image rebuild when base image is rebuilt
- Prevents stale app images built with outdated base images
- Ensures consistent deployment when base dependencies change
2025-12-19 11:32:43 +08:00
fawney19
5c7ad089d2 fix: disable nginx buffering for streaming responses
- Add X-Accel-Buffering: no header to prevent nginx from buffering streamed content
- Ensures immediate delivery of each chunk without proxy buffering delays
- Improves real-time streaming performance and responsiveness
- Applies to both production and local Dockerfiles
2025-12-19 11:26:15 +08:00
fawney19
97425ac68f refactor: make stream smoothing parameters configurable and add models cache invalidation
- Move stream smoothing parameters (chunk_size, delay_ms) to database config
- Remove hardcoded stream smoothing constants from StreamProcessor
- Simplify dynamic delay calculation by using config values directly
- Add invalidate_models_list_cache() function to clear /v1/models endpoint cache
- Call cache invalidation on model create, update, delete, and bulk operations
- Update admin UI to allow runtime configuration of smoothing parameters
- Improve model listing freshness when models are modified
2025-12-19 11:03:46 +08:00
fawney19
912f6643e2 tune: adjust stream smoothing parameters for better user experience
- Increase chunk size from 5 to 20 characters for fewer delays
- Reduce min delay from 15ms to 8ms for faster playback
- Reduce max delay from 24ms to 15ms for better responsiveness
- Adjust text thresholds to better differentiate content types
- Apply parameter tuning to both StreamProcessor and _LightweightSmoother
2025-12-19 09:51:09 +08:00
fawney19
6c0373fda6 refactor: simplify text splitting logic in stream processor
- Remove complex conditional logic for short/medium/long text differentiation
- Unify text splitting to always use consistent CHUNK_SIZE-based splitting
- Rely on dynamic delay calculation for output speed adjustment
- Reduce code complexity in both main smoother and lightweight smoother
2025-12-19 09:48:11 +08:00
fawney19
070121717d refactor: consolidate stream smoothing into StreamProcessor with intelligent timing
- Move StreamSmoother functionality directly into StreamProcessor for better integration
- Create ContentExtractor strategy pattern for format-agnostic content extraction
- Implement intelligent dynamic delay calculation based on text length
- Support three text length tiers: short (char-by-char), medium (chunked), long (chunked)
- Remove manual chunk_size and delay_ms configuration - now auto-calculated
- Simplify admin UI to single toggle switch with auto timing adjustment
- Extract format detection logic to reusable content_extractors module
- Improve code maintainability with cleaner architecture
2025-12-19 09:46:22 +08:00
fawney19
85fafeacb8 feat: add stream smoothing feature for improved user experience
- Implement StreamSmoother class to split large content chunks into smaller pieces with delay
- Support OpenAI, Claude, and Gemini API response formats for smooth playback
- Add stream smoothing configuration to system settings (enable, chunk size, delay)
- Create streamlined API for stream smoothing with StreamSmoothingConfig dataclass
- Add admin UI controls for configuring stream smoothing parameters
- Use batch configuration loading to minimize database queries
- Enable typing effect simulation for better user experience in streaming responses
2025-12-19 03:15:19 +08:00
fawney19
daf8b870f0 fix: include Dockerfile.base.local in dependency hash calculation
- Add Dockerfile.base.local to deps hash to detect Docker configuration changes
- Ensures deployment rebuilds when nginx proxy settings are modified
- Prevents stale Docker images from being reused after config changes
2025-12-19 02:38:46 +08:00
fawney19
880fb61c66 fix: disable gzip compression in nginx proxy configuration
- Add gzip off directive to prevent nginx from compressing proxied responses
- Ensures stream integrity for chunked transfer encoding
- Applies to both production and local Dockerfiles
2025-12-19 02:17:07 +08:00
fawney19
7e792dabfc refactor: use background task for client disconnection monitoring
- Replace time-based throttling with background task for disconnect checks
- Remove time.monotonic() and related throttling logic
- Prevent blocking of stream transmission during connection checks
- Properly clean up background task with try/finally block
- Improve throughput and responsiveness of stream processing
2025-12-19 01:59:56 +08:00
fawney19
cd06169b2f fix: detect OpenAI format stream completion via finish_reason
- Add detection of finish_reason in OpenAI API responses to mark stream completion
- Ensures OpenAI API streams are properly marked as complete even without explicit completion events
- Complements existing completion event detection for other API formats
2025-12-19 01:44:35 +08:00
fawney19
50ffd47546 fix: handle client disconnection after stream completion gracefully
- Check has_completion flag before marking client disconnection as failure
- Allow graceful termination if response already completed when client disconnects
- Change logging level to info for post-completion disconnections
- Prevent false error reporting when client closes connection after receiving full response
2025-12-19 01:36:20 +08:00
fawney19
5f0c1fb347 refactor: remove unused response normalizer module
- Delete unused ResponseNormalizer class and its initialization logic
- Remove response_normalizer and enable_response_normalization parameters from handlers
- Simplify chat adapter base initialization by removing normalizer setup
- Clean up unused imports in handler modules
2025-12-19 01:20:30 +08:00
fawney19
7b932d7afb refactor: optimize middleware with pure ASGI implementation and enhance security measures
- Replace BaseHTTPMiddleware with pure ASGI implementation in plugin middleware for better streaming response handling
- Add trusted proxy count configuration for client IP extraction in reverse proxy environments
- Implement audit log cleanup scheduler with configurable retention period
- Replace plaintext token logging with SHA256 hash fingerprints for security
- Fix database session lifecycle management in middleware
- Improve request tracing and error logging throughout the system
- Add comprehensive tests for pipeline architecture
2025-12-18 19:07:20 +08:00
60 changed files with 1962 additions and 636 deletions

View File

@@ -1,8 +1,16 @@
# ==================== 必须配置(启动前) ====================
# 以下配置项必须在项目启动前设置
# 数据库密码
# 数据库配置
DB_HOST=localhost
DB_PORT=5432
DB_USER=postgres
DB_NAME=aether
DB_PASSWORD=your_secure_password_here
# Redis 配置
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your_redis_password_here
# JWT密钥使用 python generate_keys.py 生成)

View File

@@ -15,6 +15,8 @@ env:
REGISTRY: ghcr.io
BASE_IMAGE_NAME: fawney19/aether-base
APP_IMAGE_NAME: fawney19/aether
# Files that affect base image - used for hash calculation
BASE_FILES: "Dockerfile.base pyproject.toml frontend/package.json frontend/package-lock.json"
jobs:
check-base-changes:
@@ -23,8 +25,13 @@ jobs:
base_changed: ${{ steps.check.outputs.base_changed }}
steps:
- uses: actions/checkout@v4
- name: Log in to Container Registry
uses: docker/login-action@v3
with:
fetch-depth: 2
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Check if base image needs rebuild
id: check
@@ -34,10 +41,26 @@ jobs:
exit 0
fi
# Check if base-related files changed
if git diff --name-only HEAD~1 HEAD | grep -qE '^(Dockerfile\.base|pyproject\.toml|frontend/package.*\.json)$'; then
# Calculate current hash of base-related files
CURRENT_HASH=$(cat ${{ env.BASE_FILES }} 2>/dev/null | sha256sum | cut -d' ' -f1)
echo "Current base files hash: $CURRENT_HASH"
# Try to get hash label from remote image config
# Pull the image config and extract labels
REMOTE_HASH=""
if docker pull ${{ env.REGISTRY }}/${{ env.BASE_IMAGE_NAME }}:latest 2>/dev/null; then
REMOTE_HASH=$(docker inspect ${{ env.REGISTRY }}/${{ env.BASE_IMAGE_NAME }}:latest --format '{{ index .Config.Labels "org.opencontainers.image.base.hash" }}' 2>/dev/null) || true
fi
if [ -z "$REMOTE_HASH" ] || [ "$REMOTE_HASH" == "<no value>" ]; then
# No remote image or no hash label, need to rebuild
echo "No remote base image or hash label found, need rebuild"
echo "base_changed=true" >> $GITHUB_OUTPUT
elif [ "$CURRENT_HASH" != "$REMOTE_HASH" ]; then
echo "Hash mismatch: remote=$REMOTE_HASH, current=$CURRENT_HASH"
echo "base_changed=true" >> $GITHUB_OUTPUT
else
echo "Hash matches, no rebuild needed"
echo "base_changed=false" >> $GITHUB_OUTPUT
fi
@@ -61,6 +84,12 @@ jobs:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Calculate base files hash
id: hash
run: |
HASH=$(cat ${{ env.BASE_FILES }} 2>/dev/null | sha256sum | cut -d' ' -f1)
echo "hash=$HASH" >> $GITHUB_OUTPUT
- name: Extract metadata for base image
id: meta
uses: docker/metadata-action@v5
@@ -69,6 +98,8 @@ jobs:
tags: |
type=raw,value=latest
type=sha,prefix=
labels: |
org.opencontainers.image.base.hash=${{ steps.hash.outputs.hash }}
- name: Build and push base image
uses: docker/build-push-action@v5
@@ -117,7 +148,7 @@ jobs:
- name: Update Dockerfile.app to use registry base image
run: |
sed -i "s|FROM aether-base:latest|FROM ${{ env.REGISTRY }}/${{ env.BASE_IMAGE_NAME }}:latest|g" Dockerfile.app
sed -i "s|FROM aether-base:latest AS builder|FROM ${{ env.REGISTRY }}/${{ env.BASE_IMAGE_NAME }}:latest AS builder|g" Dockerfile.app
- name: Build and push app image
uses: docker/build-push-action@v5

View File

@@ -1,16 +1,134 @@
# 应用镜像:基于基础镜像,只复制代码(秒级构建)
# 运行镜像:从 base 提取产物到精简运行时
# 构建命令: docker build -f Dockerfile.app -t aether-app:latest .
FROM aether-base:latest
# 用于 GitHub Actions CI官方源
FROM aether-base:latest AS builder
WORKDIR /app
# 复制前端源码并构建
COPY frontend/ ./frontend/
RUN cd frontend && npm run build
# ==================== 运行时镜像 ====================
FROM python:3.12-slim
WORKDIR /app
# 运行时依赖(无 gcc/nodejs/npm
RUN apt-get update && apt-get install -y \
nginx \
supervisor \
libpq5 \
curl \
&& rm -rf /var/lib/apt/lists/*
# 从 base 镜像复制 Python 包
COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
# 只复制需要的 Python 可执行文件
COPY --from=builder /usr/local/bin/gunicorn /usr/local/bin/
COPY --from=builder /usr/local/bin/uvicorn /usr/local/bin/
COPY --from=builder /usr/local/bin/alembic /usr/local/bin/
# 从 builder 阶段复制前端构建产物
COPY --from=builder /app/frontend/dist /usr/share/nginx/html
# 复制后端代码
COPY src/ ./src/
COPY alembic.ini ./
COPY alembic/ ./alembic/
# 构建前端(使用基础镜像中已安装的 node_modules
COPY frontend/ /tmp/frontend/
RUN cd /tmp/frontend && npm run build && \
cp -r dist/* /usr/share/nginx/html/ && \
rm -rf /tmp/frontend
# Nginx 配置模板
RUN printf '%s\n' \
'server {' \
' listen 80;' \
' server_name _;' \
' root /usr/share/nginx/html;' \
' index index.html;' \
' client_max_body_size 100M;' \
'' \
' location ~* \.(js|css|png|jpg|jpeg|gif|ico|svg|woff|woff2|ttf|eot)$ {' \
' expires 1y;' \
' add_header Cache-Control "public, no-transform";' \
' try_files $uri =404;' \
' }' \
'' \
' location ~ ^/(src|node_modules)/ {' \
' deny all;' \
' return 404;' \
' }' \
'' \
' location ~ ^/(dashboard|admin|login)(/|$) {' \
' try_files $uri $uri/ /index.html;' \
' }' \
'' \
' location / {' \
' try_files $uri $uri/ @backend;' \
' }' \
'' \
' location @backend {' \
' proxy_pass http://127.0.0.1:PORT_PLACEHOLDER;' \
' proxy_http_version 1.1;' \
' proxy_set_header Host $host;' \
' proxy_set_header X-Real-IP $remote_addr;' \
' proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;' \
' proxy_set_header X-Forwarded-Proto $scheme;' \
' proxy_set_header Connection "";' \
' proxy_set_header Accept $http_accept;' \
' proxy_set_header Content-Type $content_type;' \
' proxy_set_header Authorization $http_authorization;' \
' proxy_set_header X-Api-Key $http_x_api_key;' \
' proxy_buffering off;' \
' proxy_cache off;' \
' proxy_request_buffering off;' \
' chunked_transfer_encoding on;' \
' gzip off;' \
' add_header X-Accel-Buffering no;' \
' proxy_connect_timeout 600s;' \
' proxy_send_timeout 600s;' \
' proxy_read_timeout 600s;' \
' }' \
'}' > /etc/nginx/sites-available/default.template
# Supervisor 配置
RUN printf '%s\n' \
'[supervisord]' \
'nodaemon=true' \
'logfile=/var/log/supervisor/supervisord.log' \
'pidfile=/var/run/supervisord.pid' \
'' \
'[program:nginx]' \
'command=/bin/bash -c "sed \"s/PORT_PLACEHOLDER/${PORT:-8084}/g\" /etc/nginx/sites-available/default.template > /etc/nginx/sites-available/default && /usr/sbin/nginx -g \"daemon off;\""' \
'autostart=true' \
'autorestart=true' \
'stdout_logfile=/var/log/nginx/access.log' \
'stderr_logfile=/var/log/nginx/error.log' \
'' \
'[program:app]' \
'command=gunicorn src.main:app -w %(ENV_GUNICORN_WORKERS)s -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:%(ENV_PORT)s --timeout 120 --access-logfile - --error-logfile - --log-level info' \
'directory=/app' \
'autostart=true' \
'autorestart=true' \
'stdout_logfile=/dev/stdout' \
'stdout_logfile_maxbytes=0' \
'stderr_logfile=/dev/stderr' \
'stderr_logfile_maxbytes=0' \
'environment=PYTHONUNBUFFERED=1,PYTHONIOENCODING=utf-8,LANG=C.UTF-8,LC_ALL=C.UTF-8,DOCKER_CONTAINER=true' > /etc/supervisor/conf.d/supervisord.conf
# 创建目录
RUN mkdir -p /var/log/supervisor /app/logs /app/data
# 环境变量
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONIOENCODING=utf-8 \
LANG=C.UTF-8 \
LC_ALL=C.UTF-8 \
PORT=8084
EXPOSE 80
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost/health || exit 1
CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.conf"]

135
Dockerfile.app.local Normal file
View File

@@ -0,0 +1,135 @@
# 运行镜像:从 base 提取产物到精简运行时(国内镜像源版本)
# 构建命令: docker build -f Dockerfile.app.local -t aether-app:latest .
# 用于本地/国内服务器部署
FROM aether-base:latest AS builder
WORKDIR /app
# 复制前端源码并构建
COPY frontend/ ./frontend/
RUN cd frontend && npm run build
# ==================== 运行时镜像 ====================
FROM python:3.12-slim
WORKDIR /app
# 运行时依赖(使用清华镜像源)
RUN sed -i 's/deb.debian.org/mirrors.tuna.tsinghua.edu.cn/g' /etc/apt/sources.list.d/debian.sources && \
apt-get update && apt-get install -y \
nginx \
supervisor \
libpq5 \
curl \
&& rm -rf /var/lib/apt/lists/*
# 从 base 镜像复制 Python 包
COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
# 只复制需要的 Python 可执行文件
COPY --from=builder /usr/local/bin/gunicorn /usr/local/bin/
COPY --from=builder /usr/local/bin/uvicorn /usr/local/bin/
COPY --from=builder /usr/local/bin/alembic /usr/local/bin/
# 从 builder 阶段复制前端构建产物
COPY --from=builder /app/frontend/dist /usr/share/nginx/html
# 复制后端代码
COPY src/ ./src/
COPY alembic.ini ./
COPY alembic/ ./alembic/
# Nginx 配置模板
RUN printf '%s\n' \
'server {' \
' listen 80;' \
' server_name _;' \
' root /usr/share/nginx/html;' \
' index index.html;' \
' client_max_body_size 100M;' \
'' \
' location ~* \.(js|css|png|jpg|jpeg|gif|ico|svg|woff|woff2|ttf|eot)$ {' \
' expires 1y;' \
' add_header Cache-Control "public, no-transform";' \
' try_files $uri =404;' \
' }' \
'' \
' location ~ ^/(src|node_modules)/ {' \
' deny all;' \
' return 404;' \
' }' \
'' \
' location ~ ^/(dashboard|admin|login)(/|$) {' \
' try_files $uri $uri/ /index.html;' \
' }' \
'' \
' location / {' \
' try_files $uri $uri/ @backend;' \
' }' \
'' \
' location @backend {' \
' proxy_pass http://127.0.0.1:PORT_PLACEHOLDER;' \
' proxy_http_version 1.1;' \
' proxy_set_header Host $host;' \
' proxy_set_header X-Real-IP $remote_addr;' \
' proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;' \
' proxy_set_header X-Forwarded-Proto $scheme;' \
' proxy_set_header Connection "";' \
' proxy_set_header Accept $http_accept;' \
' proxy_set_header Content-Type $content_type;' \
' proxy_set_header Authorization $http_authorization;' \
' proxy_set_header X-Api-Key $http_x_api_key;' \
' proxy_buffering off;' \
' proxy_cache off;' \
' proxy_request_buffering off;' \
' chunked_transfer_encoding on;' \
' gzip off;' \
' add_header X-Accel-Buffering no;' \
' proxy_connect_timeout 600s;' \
' proxy_send_timeout 600s;' \
' proxy_read_timeout 600s;' \
' }' \
'}' > /etc/nginx/sites-available/default.template
# Supervisor 配置
RUN printf '%s\n' \
'[supervisord]' \
'nodaemon=true' \
'logfile=/var/log/supervisor/supervisord.log' \
'pidfile=/var/run/supervisord.pid' \
'' \
'[program:nginx]' \
'command=/bin/bash -c "sed \"s/PORT_PLACEHOLDER/${PORT:-8084}/g\" /etc/nginx/sites-available/default.template > /etc/nginx/sites-available/default && /usr/sbin/nginx -g \"daemon off;\""' \
'autostart=true' \
'autorestart=true' \
'stdout_logfile=/var/log/nginx/access.log' \
'stderr_logfile=/var/log/nginx/error.log' \
'' \
'[program:app]' \
'command=gunicorn src.main:app -w %(ENV_GUNICORN_WORKERS)s -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:%(ENV_PORT)s --timeout 120 --access-logfile - --error-logfile - --log-level info' \
'directory=/app' \
'autostart=true' \
'autorestart=true' \
'stdout_logfile=/dev/stdout' \
'stdout_logfile_maxbytes=0' \
'stderr_logfile=/dev/stderr' \
'stderr_logfile_maxbytes=0' \
'environment=PYTHONUNBUFFERED=1,PYTHONIOENCODING=utf-8,LANG=C.UTF-8,LC_ALL=C.UTF-8,DOCKER_CONTAINER=true' > /etc/supervisor/conf.d/supervisord.conf
# 创建目录
RUN mkdir -p /var/log/supervisor /app/logs /app/data
# 环境变量
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONIOENCODING=utf-8 \
LANG=C.UTF-8 \
LC_ALL=C.UTF-8 \
PORT=8084
EXPOSE 80
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost/health || exit 1
CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.conf"]

View File

@@ -1,122 +1,25 @@
# 基础镜像:包含所有依赖,只在依赖变化时需要重建
# 构建镜像:编译环境 + 预编译的依赖
# 用于 GitHub Actions CI 构建(不使用国内镜像源)
# 构建命令: docker build -f Dockerfile.base -t aether-base:latest .
# 只在 pyproject.toml 或 frontend/package*.json 变化时需要重建
FROM python:3.12-slim
WORKDIR /app
# 系统依赖
# 构建工具
RUN apt-get update && apt-get install -y \
nginx \
supervisor \
libpq-dev \
gcc \
curl \
gettext-base \
nodejs \
npm \
&& rm -rf /var/lib/apt/lists/*
# Python 依赖(安装到系统,不用 -e 模式)
# Python 依赖
COPY pyproject.toml README.md ./
RUN mkdir -p src && touch src/__init__.py && \
pip install --no-cache-dir .
SETUPTOOLS_SCM_PRETEND_VERSION=0.1.0 pip install --no-cache-dir . && \
pip cache purge
# 前端依赖
COPY frontend/package*.json /tmp/frontend/
WORKDIR /tmp/frontend
RUN npm ci
# Nginx 配置模板
RUN printf '%s\n' \
'server {' \
' listen 80;' \
' server_name _;' \
' root /usr/share/nginx/html;' \
' index index.html;' \
' client_max_body_size 100M;' \
'' \
' location ~* \.(js|css|png|jpg|jpeg|gif|ico|svg|woff|woff2|ttf|eot)$ {' \
' expires 1y;' \
' add_header Cache-Control "public, no-transform";' \
' try_files $uri =404;' \
' }' \
'' \
' location ~ ^/(src|node_modules)/ {' \
' deny all;' \
' return 404;' \
' }' \
'' \
' location ~ ^/(dashboard|admin|login)(/|$) {' \
' try_files $uri $uri/ /index.html;' \
' }' \
'' \
' location / {' \
' try_files $uri $uri/ @backend;' \
' }' \
'' \
' location @backend {' \
' proxy_pass http://127.0.0.1:PORT_PLACEHOLDER;' \
' proxy_http_version 1.1;' \
' proxy_set_header Host $host;' \
' proxy_set_header X-Real-IP $remote_addr;' \
' proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;' \
' proxy_set_header X-Forwarded-Proto $scheme;' \
' proxy_set_header Connection "";' \
' proxy_set_header Accept $http_accept;' \
' proxy_set_header Content-Type $content_type;' \
' proxy_set_header Authorization $http_authorization;' \
' proxy_set_header X-Api-Key $http_x_api_key;' \
' proxy_buffering off;' \
' proxy_cache off;' \
' proxy_request_buffering off;' \
' chunked_transfer_encoding on;' \
' proxy_connect_timeout 600s;' \
' proxy_send_timeout 600s;' \
' proxy_read_timeout 600s;' \
' }' \
'}' > /etc/nginx/sites-available/default.template
# Supervisor 配置
RUN printf '%s\n' \
'[supervisord]' \
'nodaemon=true' \
'logfile=/var/log/supervisor/supervisord.log' \
'pidfile=/var/run/supervisord.pid' \
'' \
'[program:nginx]' \
'command=/bin/bash -c "sed \"s/PORT_PLACEHOLDER/${PORT:-8084}/g\" /etc/nginx/sites-available/default.template > /etc/nginx/sites-available/default && /usr/sbin/nginx -g \"daemon off;\""' \
'autostart=true' \
'autorestart=true' \
'stdout_logfile=/var/log/nginx/access.log' \
'stderr_logfile=/var/log/nginx/error.log' \
'' \
'[program:app]' \
'command=gunicorn src.main:app -w %(ENV_GUNICORN_WORKERS)s -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:%(ENV_PORT)s --timeout 120 --access-logfile - --error-logfile - --log-level info' \
'directory=/app' \
'autostart=true' \
'autorestart=true' \
'stdout_logfile=/dev/stdout' \
'stdout_logfile_maxbytes=0' \
'stderr_logfile=/dev/stderr' \
'stderr_logfile_maxbytes=0' \
'environment=PYTHONUNBUFFERED=1,PYTHONIOENCODING=utf-8,LANG=C.UTF-8,LC_ALL=C.UTF-8,DOCKER_CONTAINER=true' > /etc/supervisor/conf.d/supervisord.conf
# 创建目录
RUN mkdir -p /var/log/supervisor /app/logs /app/data /usr/share/nginx/html
WORKDIR /app
# 环境变量
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONIOENCODING=utf-8 \
LANG=C.UTF-8 \
LC_ALL=C.UTF-8 \
PORT=8084
EXPOSE 80
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost/health || exit 1
CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.conf"]
# 前端依赖(只安装,不构建)
COPY frontend/package*.json ./frontend/
RUN cd frontend && npm ci

View File

@@ -1,18 +1,15 @@
# 基础镜像:包含所有依赖,只在依赖变化时需要重建
# 构建命令: docker build -f Dockerfile.base -t aether-base:latest .
# 构建镜像:编译环境 + 预编译的依赖(国内镜像源版本)
# 构建命令: docker build -f Dockerfile.base.local -t aether-base:latest .
# 只在 pyproject.toml 或 frontend/package*.json 变化时需要重建
FROM python:3.12-slim
WORKDIR /app
# 系统依赖
# 构建工具(使用清华镜像源)
RUN sed -i 's/deb.debian.org/mirrors.tuna.tsinghua.edu.cn/g' /etc/apt/sources.list.d/debian.sources && \
apt-get update && apt-get install -y \
nginx \
supervisor \
libpq-dev \
gcc \
curl \
gettext-base \
nodejs \
npm \
&& rm -rf /var/lib/apt/lists/*
@@ -20,107 +17,12 @@ RUN sed -i 's/deb.debian.org/mirrors.tuna.tsinghua.edu.cn/g' /etc/apt/sources.li
# pip 镜像源
RUN pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
# Python 依赖(安装到系统,不用 -e 模式)
# Python 依赖
COPY pyproject.toml README.md ./
RUN mkdir -p src && touch src/__init__.py && \
SETUPTOOLS_SCM_PRETEND_VERSION=0.1.0 pip install --no-cache-dir .
SETUPTOOLS_SCM_PRETEND_VERSION=0.1.0 pip install --no-cache-dir . && \
pip cache purge
# 前端依赖
COPY frontend/package*.json /tmp/frontend/
WORKDIR /tmp/frontend
RUN npm config set registry https://registry.npmmirror.com && npm ci
# Nginx 配置模板
RUN printf '%s\n' \
'server {' \
' listen 80;' \
' server_name _;' \
' root /usr/share/nginx/html;' \
' index index.html;' \
' client_max_body_size 100M;' \
'' \
' location ~* \.(js|css|png|jpg|jpeg|gif|ico|svg|woff|woff2|ttf|eot)$ {' \
' expires 1y;' \
' add_header Cache-Control "public, no-transform";' \
' try_files $uri =404;' \
' }' \
'' \
' location ~ ^/(src|node_modules)/ {' \
' deny all;' \
' return 404;' \
' }' \
'' \
' location ~ ^/(dashboard|admin|login)(/|$) {' \
' try_files $uri $uri/ /index.html;' \
' }' \
'' \
' location / {' \
' try_files $uri $uri/ @backend;' \
' }' \
'' \
' location @backend {' \
' proxy_pass http://127.0.0.1:PORT_PLACEHOLDER;' \
' proxy_http_version 1.1;' \
' proxy_set_header Host $host;' \
' proxy_set_header X-Real-IP $remote_addr;' \
' proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;' \
' proxy_set_header X-Forwarded-Proto $scheme;' \
' proxy_set_header Connection "";' \
' proxy_set_header Accept $http_accept;' \
' proxy_set_header Content-Type $content_type;' \
' proxy_set_header Authorization $http_authorization;' \
' proxy_set_header X-Api-Key $http_x_api_key;' \
' proxy_buffering off;' \
' proxy_cache off;' \
' proxy_request_buffering off;' \
' chunked_transfer_encoding on;' \
' proxy_connect_timeout 600s;' \
' proxy_send_timeout 600s;' \
' proxy_read_timeout 600s;' \
' }' \
'}' > /etc/nginx/sites-available/default.template
# Supervisor 配置
RUN printf '%s\n' \
'[supervisord]' \
'nodaemon=true' \
'logfile=/var/log/supervisor/supervisord.log' \
'pidfile=/var/run/supervisord.pid' \
'' \
'[program:nginx]' \
'command=/bin/bash -c "sed \"s/PORT_PLACEHOLDER/${PORT:-8084}/g\" /etc/nginx/sites-available/default.template > /etc/nginx/sites-available/default && /usr/sbin/nginx -g \"daemon off;\""' \
'autostart=true' \
'autorestart=true' \
'stdout_logfile=/var/log/nginx/access.log' \
'stderr_logfile=/var/log/nginx/error.log' \
'' \
'[program:app]' \
'command=gunicorn src.main:app -w %(ENV_GUNICORN_WORKERS)s -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:%(ENV_PORT)s --timeout 120 --access-logfile - --error-logfile - --log-level info' \
'directory=/app' \
'autostart=true' \
'autorestart=true' \
'stdout_logfile=/dev/stdout' \
'stdout_logfile_maxbytes=0' \
'stderr_logfile=/dev/stderr' \
'stderr_logfile_maxbytes=0' \
'environment=PYTHONUNBUFFERED=1,PYTHONIOENCODING=utf-8,LANG=C.UTF-8,LC_ALL=C.UTF-8,DOCKER_CONTAINER=true' > /etc/supervisor/conf.d/supervisord.conf
# 创建目录
RUN mkdir -p /var/log/supervisor /app/logs /app/data /usr/share/nginx/html
WORKDIR /app
# 环境变量
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONIOENCODING=utf-8 \
LANG=C.UTF-8 \
LC_ALL=C.UTF-8 \
PORT=8084
EXPOSE 80
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost/health || exit 1
CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.conf"]
# 前端依赖(只安装,不构建,使用淘宝镜像源)
COPY frontend/package*.json ./frontend/
RUN cd frontend && npm config set registry https://registry.npmmirror.com && npm ci

View File

@@ -21,9 +21,9 @@ HASH_FILE=".deps-hash"
CODE_HASH_FILE=".code-hash"
MIGRATION_HASH_FILE=".migration-hash"
# 计算依赖文件的哈希值
# 计算依赖文件的哈希值(包含 Dockerfile.base.local
calc_deps_hash() {
cat pyproject.toml frontend/package.json frontend/package-lock.json 2>/dev/null | md5sum | cut -d' ' -f1
cat pyproject.toml frontend/package.json frontend/package-lock.json Dockerfile.base.local 2>/dev/null | md5sum | cut -d' ' -f1
}
# 计算代码文件的哈希值
@@ -88,7 +88,7 @@ build_base() {
# 构建应用镜像
build_app() {
echo ">>> Building app image (code only)..."
docker build -f Dockerfile.app -t aether-app:latest .
docker build -f Dockerfile.app.local -t aether-app:latest .
save_code_hash
}
@@ -162,25 +162,32 @@ git pull
# 标记是否需要重启
NEED_RESTART=false
BASE_REBUILT=false
# 检查基础镜像是否存在,或依赖是否变化
if ! docker image inspect aether-base:latest >/dev/null 2>&1; then
echo ">>> Base image not found, building..."
build_base
BASE_REBUILT=true
NEED_RESTART=true
elif check_deps_changed; then
echo ">>> Dependencies changed, rebuilding base image..."
build_base
BASE_REBUILT=true
NEED_RESTART=true
else
echo ">>> Dependencies unchanged."
fi
# 检查代码是否变化
# 检查代码是否变化,或者 base 重建了app 依赖 base
if ! docker image inspect aether-app:latest >/dev/null 2>&1; then
echo ">>> App image not found, building..."
build_app
NEED_RESTART=true
elif [ "$BASE_REBUILT" = true ]; then
echo ">>> Base image rebuilt, rebuilding app image..."
build_app
NEED_RESTART=true
elif check_code_changed; then
echo ">>> Code changed, rebuilding app image..."
build_app

3
dev.sh
View File

@@ -8,7 +8,8 @@ source .env
set +a
# 构建 DATABASE_URL
export DATABASE_URL="postgresql://postgres:${DB_PASSWORD}@localhost:5432/aether"
export DATABASE_URL="postgresql://${DB_USER:-postgres}:${DB_PASSWORD}@${DB_HOST:-localhost}:${DB_PORT:-5432}/${DB_NAME:-aether}"
export REDIS_URL=redis://:${REDIS_PASSWORD}@${REDIS_HOST:-localhost}:${REDIS_PORT:-6379}/0
# 启动 uvicorn热重载模式
echo "🚀 启动本地开发服务器..."

View File

@@ -41,7 +41,7 @@ services:
app:
build:
context: .
dockerfile: Dockerfile.app
dockerfile: Dockerfile.app.local
image: aether-app:latest
container_name: aether-app
environment:

View File

@@ -66,6 +66,7 @@ export interface UserAffinity {
key_name: string | null
key_prefix: string | null // Provider Key 脱敏显示前4...后4
rate_multiplier: number
global_model_id: string | null // 原始的 global_model_id用于删除
model_name: string | null // 模型名称(如 claude-haiku-4-5-20250514
model_display_name: string | null // 模型显示名称(如 Claude Haiku 4.5
api_format: string | null // API 格式 (claude/openai)
@@ -119,6 +120,18 @@ export const cacheApi = {
await api.delete(`/api/admin/monitoring/cache/users/${userIdentifier}`)
},
/**
* 清除单条缓存亲和性
*
* @param affinityKey API Key ID
* @param endpointId Endpoint ID
* @param modelId GlobalModel ID
* @param apiFormat API 格式 (claude/openai)
*/
async clearSingleAffinity(affinityKey: string, endpointId: string, modelId: string, apiFormat: string): Promise<void> {
await api.delete(`/api/admin/monitoring/cache/affinity/${affinityKey}/${endpointId}/${modelId}/${apiFormat}`)
},
/**
* 清除所有缓存
*/

View File

@@ -92,6 +92,7 @@
<script setup lang="ts">
import { computed, useSlots, type Component } from 'vue'
import { useEscapeKey } from '@/composables/useEscapeKey'
// Props 定义
const props = defineProps<{
@@ -157,4 +158,14 @@ const maxWidthClass = computed(() => {
const containerZIndex = computed(() => props.zIndex || 60)
const backdropZIndex = computed(() => props.zIndex || 60)
const contentZIndex = computed(() => (props.zIndex || 60) + 10)
// 添加 ESC 键监听
useEscapeKey(() => {
if (isOpen.value) {
handleClose()
}
}, {
disableOnInput: true,
once: false
})
</script>

View File

@@ -0,0 +1,80 @@
import { onMounted, onUnmounted, ref } from 'vue'
/**
* ESC 键监听 Composable简化版本直接使用独立监听器
* 用于按 ESC 键关闭弹窗或其他可关闭的组件
*
* @param callback - 按 ESC 键时执行的回调函数
* @param options - 配置选项
*/
export function useEscapeKey(
callback: () => void,
options: {
/** 是否在输入框获得焦点时禁用 ESC 键,默认 true */
disableOnInput?: boolean
/** 是否只监听一次,默认 false */
once?: boolean
} = {}
) {
const { disableOnInput = true, once = false } = options
const isActive = ref(true)
function handleKeyDown(event: KeyboardEvent) {
// 只处理 ESC 键
if (event.key !== 'Escape') return
// 检查组件是否还活跃
if (!isActive.value) return
// 如果配置了在输入框获得焦点时禁用,则检查当前焦点元素
if (disableOnInput) {
const activeElement = document.activeElement
const isInputElement = activeElement && (
activeElement.tagName === 'INPUT' ||
activeElement.tagName === 'TEXTAREA' ||
activeElement.tagName === 'SELECT' ||
activeElement.contentEditable === 'true' ||
activeElement.getAttribute('role') === 'textbox' ||
activeElement.getAttribute('role') === 'combobox'
)
// 如果焦点在输入框中,不处理 ESC 键
if (isInputElement) return
}
// 执行回调
callback()
// 移除当前元素的焦点,避免残留样式
if (document.activeElement instanceof HTMLElement) {
document.activeElement.blur()
}
// 如果只监听一次,则移除监听器
if (once) {
removeEventListener()
}
}
function addEventListener() {
document.addEventListener('keydown', handleKeyDown)
}
function removeEventListener() {
document.removeEventListener('keydown', handleKeyDown)
}
onMounted(() => {
addEventListener()
})
onUnmounted(() => {
isActive.value = false
removeEventListener()
})
return {
addEventListener,
removeEventListener
}
}

View File

@@ -132,7 +132,7 @@
type="number"
min="1"
max="10000"
placeholder="100"
placeholder="留空不限制"
class="h-10"
@update:model-value="(v) => form.rate_limit = parseNumberInput(v, { min: 1, max: 10000 })"
/>
@@ -376,7 +376,7 @@ const form = ref<StandaloneKeyFormData>({
initial_balance_usd: 10,
expire_days: undefined,
never_expire: true,
rate_limit: 100,
rate_limit: undefined,
auto_delete_on_expiry: false,
allowed_providers: [],
allowed_api_formats: [],
@@ -389,7 +389,7 @@ function resetForm() {
initial_balance_usd: 10,
expire_days: undefined,
never_expire: true,
rate_limit: 100,
rate_limit: undefined,
auto_delete_on_expiry: false,
allowed_providers: [],
allowed_api_formats: [],
@@ -408,7 +408,7 @@ function loadKeyData() {
initial_balance_usd: props.apiKey.initial_balance_usd,
expire_days: props.apiKey.expire_days,
never_expire: props.apiKey.never_expire,
rate_limit: props.apiKey.rate_limit || 100,
rate_limit: props.apiKey.rate_limit,
auto_delete_on_expiry: props.apiKey.auto_delete_on_expiry,
allowed_providers: props.apiKey.allowed_providers || [],
allowed_api_formats: props.apiKey.allowed_api_formats || [],

View File

@@ -698,6 +698,7 @@ import {
Layers,
BarChart3
} from 'lucide-vue-next'
import { useEscapeKey } from '@/composables/useEscapeKey'
import { useToast } from '@/composables/useToast'
import Card from '@/components/ui/card.vue'
import Badge from '@/components/ui/badge.vue'
@@ -833,6 +834,16 @@ watch(() => props.open, (newOpen) => {
detailTab.value = 'basic'
}
})
// 添加 ESC 键监听
useEscapeKey(() => {
if (props.open) {
handleClose()
}
}, {
disableOnInput: true,
once: false
})
</script>
<style scoped>

View File

@@ -655,6 +655,7 @@ import {
GripVertical,
Copy
} from 'lucide-vue-next'
import { useEscapeKey } from '@/composables/useEscapeKey'
import Button from '@/components/ui/button.vue'
import Badge from '@/components/ui/badge.vue'
import Card from '@/components/ui/card.vue'
@@ -1296,6 +1297,16 @@ async function loadEndpoints() {
showError(err.response?.data?.detail || '加载端点失败', '错误')
}
}
// 添加 ESC 键监听
useEscapeKey(() => {
if (props.open) {
handleClose()
}
}, {
disableOnInput: true,
once: false
})
</script>
<style scoped>

View File

@@ -25,7 +25,7 @@
</h3>
<div class="flex items-center gap-1 text-sm font-mono text-muted-foreground bg-muted px-2 py-0.5 rounded">
<span>{{ detail?.model || '-' }}</span>
<template v-if="detail?.target_model">
<template v-if="detail?.target_model && detail.target_model !== detail.model">
<svg
xmlns="http://www.w3.org/2000/svg"
viewBox="0 0 20 20"
@@ -472,6 +472,7 @@
<script setup lang="ts">
import { ref, watch, computed } from 'vue'
import Button from '@/components/ui/button.vue'
import { useEscapeKey } from '@/composables/useEscapeKey'
import Card from '@/components/ui/card.vue'
import Badge from '@/components/ui/badge.vue'
import Separator from '@/components/ui/separator.vue'
@@ -897,6 +898,16 @@ const providerHeadersWithDiff = computed(() => {
return result
})
// 添加 ESC 键监听
useEscapeKey(() => {
if (props.isOpen) {
handleClose()
}
}, {
disableOnInput: true,
once: false
})
</script>
<style scoped>

View File

@@ -136,11 +136,20 @@
<!-- 分隔线 -->
<div class="hidden sm:block h-4 w-px bg-border" />
<!-- 刷新按钮 -->
<RefreshButton
:loading="loading"
@click="$emit('refresh')"
/>
<!-- 自动刷新按钮 -->
<Button
variant="ghost"
size="icon"
class="h-8 w-8"
:class="autoRefresh ? 'text-primary' : ''"
:title="autoRefresh ? '点击关闭自动刷新' : '点击开启自动刷新每10秒刷新'"
@click="$emit('update:autoRefresh', !autoRefresh)"
>
<RefreshCcw
class="w-3.5 h-3.5"
:class="autoRefresh ? 'animate-spin' : ''"
/>
</Button>
</template>
<Table>
@@ -408,6 +417,7 @@ import { ref, computed, onUnmounted, watch } from 'vue'
import {
TableCard,
Badge,
Button,
Select,
SelectTrigger,
SelectValue,
@@ -420,8 +430,8 @@ import {
TableHead,
TableCell,
Pagination,
RefreshButton,
} from '@/components/ui'
import { RefreshCcw } from 'lucide-vue-next'
import { formatTokens, formatCurrency } from '@/utils/format'
import { formatDateTime } from '../composables'
import { useRowClick } from '@/composables/useRowClick'
@@ -453,6 +463,8 @@ const props = defineProps<{
pageSize: number
totalRecords: number
pageSizeOptions: number[]
// 自动刷新
autoRefresh: boolean
}>()
const emit = defineEmits<{
@@ -463,6 +475,7 @@ const emit = defineEmits<{
'update:filterStatus': [value: string]
'update:currentPage': [value: number]
'update:pageSize': [value: number]
'update:autoRefresh': [value: boolean]
'refresh': []
'showDetail': [id: string]
}>()

View File

@@ -142,32 +142,37 @@ async function resetAffinitySearch() {
await fetchAffinityList()
}
async function clearUserCache(identifier: string, displayName?: string) {
const target = identifier?.trim()
if (!target) {
showError('无法识别标识符')
async function clearSingleAffinity(item: UserAffinity) {
const affinityKey = item.affinity_key?.trim()
const endpointId = item.endpoint_id?.trim()
const modelId = item.global_model_id?.trim()
const apiFormat = item.api_format?.trim()
if (!affinityKey || !endpointId || !modelId || !apiFormat) {
showError('缓存记录信息不完整,无法删除')
return
}
const label = displayName || target
const label = item.user_api_key_name || affinityKey
const modelLabel = item.model_display_name || item.model_name || modelId
const confirmed = await showConfirm({
title: '确认清除',
message: `确定要清除 ${label} 的缓存吗?`,
message: `确定要清除 ${label} 在模型 ${modelLabel} 上的缓存亲和性吗?`,
confirmText: '确认清除',
variant: 'destructive'
})
if (!confirmed) return
clearingRowAffinityKey.value = target
clearingRowAffinityKey.value = affinityKey
try {
await cacheApi.clearUserCache(target)
await cacheApi.clearSingleAffinity(affinityKey, endpointId, modelId, apiFormat)
showSuccess('清除成功')
await fetchCacheStats()
await fetchAffinityList(tableKeyword.value.trim() || undefined)
} catch (error) {
showError('清除失败')
log.error('清除用户缓存失败', error)
log.error('清除单条缓存失败', error)
} finally {
clearingRowAffinityKey.value = null
}
@@ -618,7 +623,7 @@ onBeforeUnmount(() => {
class="h-7 w-7 text-muted-foreground/70 hover:text-destructive"
:disabled="clearingRowAffinityKey === item.affinity_key"
title="清除缓存"
@click="clearUserCache(item.affinity_key, item.user_api_key_name || item.affinity_key)"
@click="clearSingleAffinity(item)"
>
<Trash2 class="h-3.5 w-3.5" />
</Button>
@@ -668,7 +673,7 @@ onBeforeUnmount(() => {
variant="ghost"
class="h-7 w-7 text-muted-foreground/70 hover:text-destructive shrink-0"
:disabled="clearingRowAffinityKey === item.affinity_key"
@click="clearUserCache(item.affinity_key, item.user_api_key_name || item.affinity_key)"
@click="clearSingleAffinity(item)"
>
<Trash2 class="h-3.5 w-3.5" />
</Button>

View File

@@ -185,32 +185,13 @@
</div>
</CardSection>
<!-- API Key 管理配置 -->
<!-- 独立余额 Key 过期管理 -->
<CardSection
title="API Key 管理"
description="API Key 相关配置"
title="独立余额 Key 过期管理"
description="独立余额 Key 的过期处理策略(普通用户 Key 不会过期)"
>
<div class="grid grid-cols-1 md:grid-cols-2 gap-6">
<div>
<Label
for="api-key-expire"
class="block text-sm font-medium"
>
API密钥过期天数
</Label>
<Input
id="api-key-expire"
v-model.number="systemConfig.api_key_expire_days"
type="number"
placeholder="0"
class="mt-1"
/>
<p class="mt-1 text-xs text-muted-foreground">
0 表示永不过期
</p>
</div>
<div class="flex items-center h-full pt-6">
<div class="flex items-center h-full">
<div class="flex items-center space-x-2">
<Checkbox
id="auto-delete-expired-keys"
@@ -224,7 +205,7 @@
自动删除过期 Key
</Label>
<p class="text-xs text-muted-foreground">
关闭时仅禁用过期 Key
关闭时仅禁用过期 Key不会物理删除
</p>
</div>
</div>
@@ -448,6 +429,25 @@
避免单次操作过大影响性能
</p>
</div>
<div>
<Label
for="audit-log-retention-days"
class="block text-sm font-medium"
>
审计日志保留天数
</Label>
<Input
id="audit-log-retention-days"
v-model.number="systemConfig.audit_log_retention_days"
type="number"
placeholder="30"
class="mt-1"
/>
<p class="mt-1 text-xs text-muted-foreground">
超过后删除审计日志记录
</p>
</div>
</div>
<!-- 清理策略说明 -->
@@ -460,9 +460,11 @@
<p>2. <strong>压缩日志阶段</strong>: body 字段被压缩存储节省空间</p>
<p>3. <strong>统计阶段</strong>: 仅保留 tokens成本等统计信息</p>
<p>4. <strong>归档删除</strong>: 超过保留期限后完全删除记录</p>
<p>5. <strong>审计日志</strong>: 独立清理记录用户登录操作等安全事件</p>
</div>
</div>
</CardSection>
</div>
<!-- 导入配置对话框 -->
@@ -796,8 +798,7 @@ interface SystemConfig {
// 用户注册
enable_registration: boolean
require_email_verification: boolean
// API Key 管理
api_key_expire_days: number
// 独立余额 Key 过期管理
auto_delete_expired_keys: boolean
// 日志记录
request_log_level: string
@@ -811,6 +812,7 @@ interface SystemConfig {
header_retention_days: number
log_retention_days: number
cleanup_batch_size: number
audit_log_retention_days: number
}
const loading = ref(false)
@@ -845,8 +847,7 @@ const systemConfig = ref<SystemConfig>({
// 用户注册
enable_registration: false,
require_email_verification: false,
// API Key 管理
api_key_expire_days: 0,
// 独立余额 Key 过期管理
auto_delete_expired_keys: false,
// 日志记录
request_log_level: 'basic',
@@ -860,6 +861,7 @@ const systemConfig = ref<SystemConfig>({
header_retention_days: 90,
log_retention_days: 365,
cleanup_batch_size: 1000,
audit_log_retention_days: 30,
})
// 计算属性KB 和 字节 之间的转换
@@ -901,8 +903,7 @@ async function loadSystemConfig() {
// 用户注册
'enable_registration',
'require_email_verification',
// API Key 管理
'api_key_expire_days',
// 独立余额 Key 过期管理
'auto_delete_expired_keys',
// 日志记录
'request_log_level',
@@ -916,6 +917,7 @@ async function loadSystemConfig() {
'header_retention_days',
'log_retention_days',
'cleanup_batch_size',
'audit_log_retention_days',
]
for (const key of configs) {
@@ -960,12 +962,7 @@ async function saveSystemConfig() {
value: systemConfig.value.require_email_verification,
description: '是否需要邮箱验证'
},
// API Key 管理
{
key: 'api_key_expire_days',
value: systemConfig.value.api_key_expire_days,
description: 'API密钥过期天数'
},
// 独立余额 Key 过期管理
{
key: 'auto_delete_expired_keys',
value: systemConfig.value.auto_delete_expired_keys,
@@ -1023,6 +1020,11 @@ async function saveSystemConfig() {
value: systemConfig.value.cleanup_batch_size,
description: '每批次清理的记录数'
},
{
key: 'audit_log_retention_days',
value: systemConfig.value.audit_log_retention_days,
description: '审计日志保留天数'
},
]
const promises = configItems.map(item =>

View File

@@ -65,6 +65,7 @@
:page-size="pageSize"
:total-records="totalRecords"
:page-size-options="pageSizeOptions"
:auto-refresh="globalAutoRefresh"
@update:selected-period="handlePeriodChange"
@update:filter-user="handleFilterUserChange"
@update:filter-model="handleFilterModelChange"
@@ -72,6 +73,7 @@
@update:filter-status="handleFilterStatusChange"
@update:current-page="handlePageChange"
@update:page-size="handlePageSizeChange"
@update:auto-refresh="handleAutoRefreshChange"
@refresh="refreshData"
@export="exportData"
@show-detail="showRequestDetail"
@@ -214,7 +216,10 @@ const hasActiveRequests = computed(() => activeRequestIds.value.length > 0)
// 自动刷新定时器
let autoRefreshTimer: ReturnType<typeof setInterval> | null = null
const AUTO_REFRESH_INTERVAL = 1000 // 1秒刷新一次
let globalAutoRefreshTimer: ReturnType<typeof setInterval> | null = null
const AUTO_REFRESH_INTERVAL = 1000 // 1秒刷新一次用于活跃请求
const GLOBAL_AUTO_REFRESH_INTERVAL = 10000 // 10秒刷新一次全局自动刷新
const globalAutoRefresh = ref(false) // 全局自动刷新开关
// 轮询活跃请求状态(轻量级,只更新状态变化的记录)
async function pollActiveRequests() {
@@ -278,9 +283,34 @@ watch(hasActiveRequests, (hasActive) => {
}
}, { immediate: true })
// 启动全局自动刷新
function startGlobalAutoRefresh() {
if (globalAutoRefreshTimer) return
globalAutoRefreshTimer = setInterval(refreshData, GLOBAL_AUTO_REFRESH_INTERVAL)
}
// 停止全局自动刷新
function stopGlobalAutoRefresh() {
if (globalAutoRefreshTimer) {
clearInterval(globalAutoRefreshTimer)
globalAutoRefreshTimer = null
}
}
// 处理自动刷新开关变化
function handleAutoRefreshChange(value: boolean) {
globalAutoRefresh.value = value
if (value) {
startGlobalAutoRefresh()
} else {
stopGlobalAutoRefresh()
}
}
// 组件卸载时清理定时器
onUnmounted(() => {
stopAutoRefresh()
stopGlobalAutoRefresh()
})
// 用户页面的前端分页

View File

@@ -350,6 +350,7 @@ import {
Layers,
Image as ImageIcon
} from 'lucide-vue-next'
import { useEscapeKey } from '@/composables/useEscapeKey'
import { useToast } from '@/composables/useToast'
import Card from '@/components/ui/card.vue'
import Badge from '@/components/ui/badge.vue'
@@ -453,6 +454,16 @@ function getFirst1hCachePrice(tieredPricing: TieredPricingConfig | undefined | n
if (!tieredPricing?.tiers?.length) return '-'
return get1hCachePrice(tieredPricing.tiers[0])
}
// 添加 ESC 键监听
useEscapeKey(() => {
if (props.open) {
handleClose()
}
}, {
disableOnInput: true,
once: false
})
</script>
<style scoped>

View File

@@ -223,7 +223,7 @@ class AdminCreateStandaloneKeyAdapter(AdminApiAdapter):
allowed_providers=self.key_data.allowed_providers,
allowed_api_formats=self.key_data.allowed_api_formats,
allowed_models=self.key_data.allowed_models,
rate_limit=self.key_data.rate_limit or 100,
rate_limit=self.key_data.rate_limit, # None 表示不限制
expire_days=self.key_data.expire_days,
initial_balance_usd=self.key_data.initial_balance_usd,
is_standalone=True, # 标记为独立Key

View File

@@ -186,6 +186,30 @@ async def clear_user_cache(
return await pipeline.run(adapter=adapter, http_request=request, db=db, mode=adapter.mode)
@router.delete("/affinity/{affinity_key}/{endpoint_id}/{model_id}/{api_format}")
async def clear_single_affinity(
affinity_key: str,
endpoint_id: str,
model_id: str,
api_format: str,
request: Request,
db: Session = Depends(get_db),
) -> Any:
"""
Clear a single cache affinity entry
Parameters:
- affinity_key: API Key ID
- endpoint_id: Endpoint ID
- model_id: Model ID (GlobalModel ID)
- api_format: API format (claude/openai)
"""
adapter = AdminClearSingleAffinityAdapter(
affinity_key=affinity_key, endpoint_id=endpoint_id, model_id=model_id, api_format=api_format
)
return await pipeline.run(adapter=adapter, http_request=request, db=db, mode=adapter.mode)
@router.delete("")
async def clear_all_cache(
request: Request,
@@ -655,6 +679,7 @@ class AdminListAffinitiesAdapter(AdminApiAdapter):
"key_name": key.name if key else None,
"key_prefix": provider_key_masked,
"rate_multiplier": key.rate_multiplier if key else 1.0,
"global_model_id": affinity.get("model_name"), # 原始的 global_model_id
"model_name": (
global_model_map.get(affinity.get("model_name")).name
if affinity.get("model_name") and global_model_map.get(affinity.get("model_name"))
@@ -817,6 +842,65 @@ class AdminClearUserCacheAdapter(AdminApiAdapter):
raise HTTPException(status_code=500, detail=f"清除失败: {exc}")
@dataclass
class AdminClearSingleAffinityAdapter(AdminApiAdapter):
affinity_key: str
endpoint_id: str
model_id: str
api_format: str
async def handle(self, context: ApiRequestContext) -> Dict[str, Any]: # type: ignore[override]
db = context.db
try:
redis_client = get_redis_client_sync()
affinity_mgr = await get_affinity_manager(redis_client)
# 直接获取指定的亲和性记录(无需遍历全部)
existing_affinity = await affinity_mgr.get_affinity(
self.affinity_key, self.api_format, self.model_id
)
if not existing_affinity:
raise HTTPException(status_code=404, detail="未找到指定的缓存亲和性记录")
# 验证 endpoint_id 是否匹配
if existing_affinity.endpoint_id != self.endpoint_id:
raise HTTPException(status_code=404, detail="未找到指定的缓存亲和性记录")
# 失效单条记录
await affinity_mgr.invalidate_affinity(
self.affinity_key, self.api_format, self.model_id, endpoint_id=self.endpoint_id
)
# 获取用于日志的信息
api_key = db.query(ApiKey).filter(ApiKey.id == self.affinity_key).first()
api_key_name = api_key.name if api_key else None
logger.info(
f"已清除单条缓存亲和性: affinity_key={self.affinity_key[:8]}..., "
f"endpoint_id={self.endpoint_id[:8]}..., model_id={self.model_id[:8]}..."
)
context.add_audit_metadata(
action="cache_clear_single",
affinity_key=self.affinity_key,
endpoint_id=self.endpoint_id,
model_id=self.model_id,
)
return {
"status": "ok",
"message": f"已清除缓存亲和性: {api_key_name or self.affinity_key[:8]}",
"affinity_key": self.affinity_key,
"endpoint_id": self.endpoint_id,
"model_id": self.model_id,
}
except HTTPException:
raise
except Exception as exc:
logger.exception(f"清除单条缓存亲和性失败: {exc}")
raise HTTPException(status_code=500, detail=f"清除失败: {exc}")
class AdminClearAllCacheAdapter(AdminApiAdapter):
async def handle(self, context: ApiRequestContext) -> Dict[str, Any]: # type: ignore[override]
try:

View File

@@ -4,6 +4,7 @@ Provider Query API 端点
"""
import asyncio
import os
from typing import Optional
import httpx
@@ -45,7 +46,11 @@ async def _fetch_openai_models(
Returns:
tuple[list, Optional[str]]: (模型列表, 错误信息)
"""
headers = {"Authorization": f"Bearer {api_key}"}
useragent = os.getenv("OPENAI_USER_AGENT") or "codex_cli_rs/0.73.0 (Mac OS 14.8.4; x86_64) Apple_Terminal/453"
headers = {
"Authorization": f"Bearer {api_key}",
"User-Agent": useragent,
}
if extra_headers:
# 防止 extra_headers 覆盖 Authorization
safe_headers = {k: v for k, v in extra_headers.items() if k.lower() != "authorization"}
@@ -91,10 +96,12 @@ async def _fetch_claude_models(
Returns:
tuple[list, Optional[str]]: (模型列表, 错误信息)
"""
useragent = os.getenv("CLAUDE_USER_AGENT") or "claude-cli/2.0.62 (external, cli)"
headers = {
"x-api-key": api_key,
"Authorization": f"Bearer {api_key}",
"anthropic-version": "2023-06-01",
"User-Agent": useragent,
}
# 构建 /v1/models URL
@@ -142,9 +149,12 @@ async def _fetch_gemini_models(
models_url = f"{base_url_clean}/models?key={api_key}"
else:
models_url = f"{base_url_clean}/v1beta/models?key={api_key}"
useragent = os.getenv("GEMINI_USER_AGENT") or "gemini-cli/0.1.0 (external, cli)"
headers = {
"User-Agent": useragent,
}
try:
response = await client.get(models_url)
response = await client.get(models_url, headers=headers)
logger.debug(f"Gemini models request to {models_url}: status={response.status_code}")
if response.status_code == 200:
data = response.json()

View File

@@ -9,6 +9,7 @@ from fastapi import APIRouter, Depends, Request
from sqlalchemy.orm import Session, joinedload
from src.api.base.admin_adapter import AdminApiAdapter
from src.api.base.models_service import invalidate_models_list_cache
from src.api.base.pipeline import ApiRequestPipeline
from src.core.exceptions import InvalidRequestException, NotFoundException
from src.core.logger import logger
@@ -419,4 +420,8 @@ class AdminBatchAssignModelsToProviderAdapter(AdminApiAdapter):
f"Batch assigned {len(success)} GlobalModels to provider {provider.name} by {context.user.username}"
)
# 清除 /v1/models 列表缓存
if success:
await invalidate_models_list_cache()
return BatchAssignModelsToProviderResponse(success=success, errors=errors)

View File

@@ -140,7 +140,7 @@ class AnnouncementOptionalAuthAdapter(ApiAdapter):
if not authorization or not authorization.lower().startswith("bearer "):
return None
token = authorization.replace("Bearer ", "").strip()
token = authorization[7:].strip()
try:
payload = await AuthService.verify_token(token, token_type="access")
user_id = payload.get("user_id")

View File

@@ -55,6 +55,23 @@ async def _set_cached_models(api_formats: list[str], models: list["ModelInfo"])
logger.warning(f"[ModelsService] 缓存写入失败: {e}")
async def invalidate_models_list_cache() -> None:
"""
清除所有 /v1/models 列表缓存
在模型创建、更新、删除时调用,确保模型列表实时更新
"""
# 清除所有格式的缓存
all_formats = ["CLAUDE", "OPENAI", "GEMINI"]
for fmt in all_formats:
cache_key = f"{_CACHE_KEY_PREFIX}:{fmt}"
try:
await CacheService.delete(cache_key)
logger.debug(f"[ModelsService] 已清除缓存: {cache_key}")
except Exception as e:
logger.warning(f"[ModelsService] 清除缓存失败 {cache_key}: {e}")
@dataclass
class ModelInfo:
"""统一的模型信息结构"""

View File

@@ -177,7 +177,7 @@ class ApiRequestPipeline:
if not authorization or not authorization.lower().startswith("bearer "):
raise HTTPException(status_code=401, detail="缺少管理员凭证")
token = authorization.replace("Bearer ", "").strip()
token = authorization[7:].strip()
try:
payload = await self.auth_service.verify_token(token, token_type="access")
except HTTPException:
@@ -204,7 +204,7 @@ class ApiRequestPipeline:
if not authorization or not authorization.lower().startswith("bearer "):
raise HTTPException(status_code=401, detail="缺少用户凭证")
token = authorization.replace("Bearer ", "").strip()
token = authorization[7:].strip()
try:
payload = await self.auth_service.verify_token(token, token_type="access")
except HTTPException:

View File

@@ -28,7 +28,7 @@
from __future__ import annotations
import time
from typing import Any, Callable, Dict, Optional, Protocol, runtime_checkable
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Protocol, runtime_checkable
from fastapi import Request
from fastapi.responses import JSONResponse, StreamingResponse
@@ -43,6 +43,9 @@ from src.services.provider.format import normalize_api_format
from src.services.system.audit import audit_service
from src.services.usage.service import UsageService
if TYPE_CHECKING:
from src.api.handlers.base.stream_context import StreamContext
class MessageTelemetry:
@@ -399,6 +402,41 @@ class BaseMessageHandler:
# 创建后台任务,不阻塞当前流
asyncio.create_task(_do_update())
def _update_usage_to_streaming_with_ctx(self, ctx: "StreamContext") -> None:
"""更新 Usage 状态为 streaming同时更新 provider 和 target_model
使用 asyncio 后台任务执行数据库更新,避免阻塞流式传输
Args:
ctx: 流式上下文,包含 provider_name 和 mapped_model
"""
import asyncio
from src.database.database import get_db
target_request_id = self.request_id
provider = ctx.provider_name
target_model = ctx.mapped_model
async def _do_update() -> None:
try:
db_gen = get_db()
db = next(db_gen)
try:
UsageService.update_usage_status(
db=db,
request_id=target_request_id,
status="streaming",
provider=provider,
target_model=target_model,
)
finally:
db.close()
except Exception as e:
logger.warning(f"[{target_request_id}] 更新 Usage 状态为 streaming 失败: {e}")
# 创建后台任务,不阻塞当前流
asyncio.create_task(_do_update())
def _log_request_error(self, message: str, error: Exception) -> None:
"""记录请求错误日志,对业务异常不打印堆栈

View File

@@ -64,18 +64,6 @@ class ChatAdapterBase(ApiAdapter):
def __init__(self, allowed_api_formats: Optional[list[str]] = None):
self.allowed_api_formats = allowed_api_formats or [self.FORMAT_ID]
self.response_normalizer = None
# 可选启用响应规范化
self._init_response_normalizer()
def _init_response_normalizer(self):
"""初始化响应规范化器 - 子类可覆盖"""
try:
from src.services.provider.response_normalizer import ResponseNormalizer
self.response_normalizer = ResponseNormalizer()
except ImportError:
pass
async def handle(self, context: ApiRequestContext):
"""处理 Chat API 请求"""
@@ -228,8 +216,6 @@ class ChatAdapterBase(ApiAdapter):
user_agent=user_agent,
start_time=start_time,
allowed_api_formats=self.allowed_api_formats,
response_normalizer=self.response_normalizer,
enable_response_normalization=self.response_normalizer is not None,
adapter_detector=self.detect_capability_requirements,
)

View File

@@ -88,8 +88,6 @@ class ChatHandlerBase(BaseMessageHandler, ABC):
user_agent: str,
start_time: float,
allowed_api_formats: Optional[list] = None,
response_normalizer: Optional[Any] = None,
enable_response_normalization: bool = False,
adapter_detector: Optional[Callable[[Dict[str, str], Optional[Dict[str, Any]]], Dict[str, bool]]] = None,
):
allowed = allowed_api_formats or [self.FORMAT_ID]
@@ -106,8 +104,6 @@ class ChatHandlerBase(BaseMessageHandler, ABC):
)
self._parser: Optional[ResponseParser] = None
self._request_builder = PassthroughRequestBuilder()
self.response_normalizer = response_normalizer
self.enable_response_normalization = enable_response_normalization
@property
def parser(self) -> ResponseParser:
@@ -297,11 +293,15 @@ class ChatHandlerBase(BaseMessageHandler, ABC):
# 创建类型安全的流式上下文
ctx = StreamContext(model=model, api_format=api_format)
# 创建更新状态的回调闭包(可以访问 ctx
def update_streaming_status() -> None:
self._update_usage_to_streaming_with_ctx(ctx)
# 创建流处理器
stream_processor = StreamProcessor(
request_id=self.request_id,
default_parser=self.parser,
on_streaming_start=self._update_usage_to_streaming,
on_streaming_start=update_streaming_status,
)
# 定义请求函数
@@ -639,6 +639,8 @@ class ChatHandlerBase(BaseMessageHandler, ABC):
logger.info(f" [{self.request_id}] 发送非流式请求: Provider={provider.name}, "
f"模型={model} -> {mapped_model or '无映射'}")
logger.debug(f" [{self.request_id}] 请求URL: {url}")
logger.debug(f" [{self.request_id}] 请求体stream字段: {provider_payload.get('stream', 'N/A')}")
# 创建 HTTP 客户端(支持代理配置)
from src.clients.http_client import HTTPClientPool
@@ -662,10 +664,32 @@ class ChatHandlerBase(BaseMessageHandler, ABC):
response_headers=response_headers,
)
elif resp.status_code >= 500:
raise ProviderNotAvailableException(f"提供商服务不可用: {provider.name}")
elif resp.status_code != 200:
# 记录响应体以便调试
error_body = ""
try:
error_body = resp.text[:1000]
logger.error(f" [{self.request_id}] 上游返回5xx错误: status={resp.status_code}, body={error_body[:500]}")
except Exception:
pass
raise ProviderNotAvailableException(
f"提供商返回错误: {provider.name}, 状态: {resp.status_code}"
f"提供商服务不可用: {provider.name}",
provider_name=str(provider.name),
upstream_status=resp.status_code,
upstream_response=error_body,
)
elif resp.status_code != 200:
# 记录非200响应以便调试
error_body = ""
try:
error_body = resp.text[:1000]
logger.warning(f" [{self.request_id}] 上游返回非200: status={resp.status_code}, body={error_body[:500]}")
except Exception:
pass
raise ProviderNotAvailableException(
f"提供商返回错误: {provider.name}, 状态: {resp.status_code}",
provider_name=str(provider.name),
upstream_status=resp.status_code,
upstream_response=error_body,
)
response_json = resp.json()

View File

@@ -532,7 +532,7 @@ class CliMessageHandlerBase(BaseMessageHandler):
async for chunk in stream_response.aiter_raw():
# 在第一次输出数据前更新状态为 streaming
if not streaming_status_updated:
self._update_usage_to_streaming(ctx.request_id)
self._update_usage_to_streaming_with_ctx(ctx)
streaming_status_updated = True
buffer += chunk
@@ -816,7 +816,7 @@ class CliMessageHandlerBase(BaseMessageHandler):
# 在第一次输出数据前更新状态为 streaming
if prefetched_chunks:
self._update_usage_to_streaming(ctx.request_id)
self._update_usage_to_streaming_with_ctx(ctx)
# 先处理预读的字节块
for chunk in prefetched_chunks:
@@ -1114,8 +1114,10 @@ class CliMessageHandlerBase(BaseMessageHandler):
async for chunk in stream_generator:
yield chunk
except asyncio.CancelledError:
ctx.status_code = 499
ctx.error_message = "Client disconnected"
# 如果响应已完成,不标记为失败
if not ctx.has_completion:
ctx.status_code = 499
ctx.error_message = "Client disconnected"
raise
except httpx.TimeoutException as e:
ctx.status_code = 504

View File

@@ -0,0 +1,274 @@
"""
流式内容提取器 - 策略模式实现
为不同 API 格式OpenAI、Claude、Gemini提供内容提取和 chunk 构造的抽象。
StreamSmoother 使用这些提取器来处理不同格式的 SSE 事件。
"""
import copy
import json
from abc import ABC, abstractmethod
from typing import Optional
class ContentExtractor(ABC):
"""
流式内容提取器抽象基类
定义从 SSE 事件中提取文本内容和构造新 chunk 的接口。
每种 API 格式OpenAI、Claude、Gemini需要实现自己的提取器。
"""
@abstractmethod
def extract_content(self, data: dict) -> Optional[str]:
"""
从 SSE 数据中提取可拆分的文本内容
Args:
data: 解析后的 JSON 数据
Returns:
提取的文本内容,如果无法提取则返回 None
"""
pass
@abstractmethod
def create_chunk(
self,
original_data: dict,
new_content: str,
event_type: str = "",
is_first: bool = False,
) -> bytes:
"""
使用新内容构造 SSE chunk
Args:
original_data: 原始 JSON 数据
new_content: 新的文本内容
event_type: SSE 事件类型(某些格式需要)
is_first: 是否是第一个 chunk用于保留 role 等字段)
Returns:
编码后的 SSE 字节数据
"""
pass
class OpenAIContentExtractor(ContentExtractor):
"""
OpenAI 格式内容提取器
处理 OpenAI Chat Completions API 的流式响应格式:
- 数据结构: choices[0].delta.content
- 只在 delta 仅包含 role/content 时允许拆分,避免破坏 tool_calls 等结构
"""
def extract_content(self, data: dict) -> Optional[str]:
if not isinstance(data, dict):
return None
choices = data.get("choices")
if not isinstance(choices, list) or len(choices) != 1:
return None
first_choice = choices[0]
if not isinstance(first_choice, dict):
return None
delta = first_choice.get("delta")
if not isinstance(delta, dict):
return None
content = delta.get("content")
if not isinstance(content, str):
return None
# 只有 delta 仅包含 role/content 时才允许拆分
# 避免破坏 tool_calls、function_call 等复杂结构
allowed_keys = {"role", "content"}
if not all(key in allowed_keys for key in delta.keys()):
return None
return content
def create_chunk(
self,
original_data: dict,
new_content: str,
event_type: str = "",
is_first: bool = False,
) -> bytes:
new_data = original_data.copy()
if "choices" in new_data and new_data["choices"]:
new_choices = []
for choice in new_data["choices"]:
new_choice = choice.copy()
if "delta" in new_choice:
new_delta = {}
# 只有第一个 chunk 保留 role
if is_first and "role" in new_choice["delta"]:
new_delta["role"] = new_choice["delta"]["role"]
new_delta["content"] = new_content
new_choice["delta"] = new_delta
new_choices.append(new_choice)
new_data["choices"] = new_choices
return f"data: {json.dumps(new_data, ensure_ascii=False)}\n\n".encode("utf-8")
class ClaudeContentExtractor(ContentExtractor):
"""
Claude 格式内容提取器
处理 Claude Messages API 的流式响应格式:
- 事件类型: content_block_delta
- 数据结构: delta.type=text_delta, delta.text
"""
def extract_content(self, data: dict) -> Optional[str]:
if not isinstance(data, dict):
return None
# 检查事件类型
if data.get("type") != "content_block_delta":
return None
delta = data.get("delta", {})
if not isinstance(delta, dict):
return None
# 检查 delta 类型
if delta.get("type") != "text_delta":
return None
text = delta.get("text")
if not isinstance(text, str):
return None
return text
def create_chunk(
self,
original_data: dict,
new_content: str,
event_type: str = "",
is_first: bool = False,
) -> bytes:
new_data = original_data.copy()
if "delta" in new_data:
new_delta = new_data["delta"].copy()
new_delta["text"] = new_content
new_data["delta"] = new_delta
# Claude 格式需要 event: 前缀
event_name = event_type or "content_block_delta"
return f"event: {event_name}\ndata: {json.dumps(new_data, ensure_ascii=False)}\n\n".encode(
"utf-8"
)
class GeminiContentExtractor(ContentExtractor):
"""
Gemini 格式内容提取器
处理 Gemini API 的流式响应格式:
- 数据结构: candidates[0].content.parts[0].text
- 只有纯文本块才拆分
"""
def extract_content(self, data: dict) -> Optional[str]:
if not isinstance(data, dict):
return None
candidates = data.get("candidates")
if not isinstance(candidates, list) or len(candidates) != 1:
return None
first_candidate = candidates[0]
if not isinstance(first_candidate, dict):
return None
content = first_candidate.get("content", {})
if not isinstance(content, dict):
return None
parts = content.get("parts", [])
if not isinstance(parts, list) or len(parts) != 1:
return None
first_part = parts[0]
if not isinstance(first_part, dict):
return None
text = first_part.get("text")
# 只有纯文本块(只有 text 字段)才拆分
if not isinstance(text, str) or len(first_part) != 1:
return None
return text
def create_chunk(
self,
original_data: dict,
new_content: str,
event_type: str = "",
is_first: bool = False,
) -> bytes:
new_data = copy.deepcopy(original_data)
if "candidates" in new_data and new_data["candidates"]:
first_candidate = new_data["candidates"][0]
if "content" in first_candidate:
content = first_candidate["content"]
if "parts" in content and content["parts"]:
content["parts"][0]["text"] = new_content
return f"data: {json.dumps(new_data, ensure_ascii=False)}\n\n".encode("utf-8")
# 提取器注册表
_EXTRACTORS: dict[str, type[ContentExtractor]] = {
"openai": OpenAIContentExtractor,
"claude": ClaudeContentExtractor,
"gemini": GeminiContentExtractor,
}
def get_extractor(format_name: str) -> Optional[ContentExtractor]:
"""
根据格式名获取对应的内容提取器实例
Args:
format_name: 格式名称openai, claude, gemini
Returns:
对应的提取器实例,如果格式不支持则返回 None
"""
extractor_class = _EXTRACTORS.get(format_name.lower())
if extractor_class:
return extractor_class()
return None
def register_extractor(format_name: str, extractor_class: type[ContentExtractor]) -> None:
"""
注册新的内容提取器
Args:
format_name: 格式名称
extractor_class: 提取器类
"""
_EXTRACTORS[format_name.lower()] = extractor_class
def get_extractor_formats() -> list[str]:
"""
获取所有已注册的格式名称列表
Returns:
格式名称列表
"""
return list(_EXTRACTORS.keys())

View File

@@ -6,16 +6,22 @@
2. 响应流生成
3. 预读和嵌套错误检测
4. 客户端断开检测
5. 流式平滑输出
"""
import asyncio
import codecs
import json
import time
from dataclasses import dataclass
from typing import Any, AsyncGenerator, Callable, Optional
import httpx
from src.api.handlers.base.content_extractors import (
ContentExtractor,
get_extractor,
get_extractor_formats,
)
from src.api.handlers.base.parsers import get_parser_for_format
from src.api.handlers.base.response_parser import ResponseParser
from src.api.handlers.base.stream_context import StreamContext
@@ -25,11 +31,20 @@ from src.models.database import Provider, ProviderEndpoint
from src.utils.sse_parser import SSEEventParser
@dataclass
class StreamSmoothingConfig:
"""流式平滑输出配置"""
enabled: bool = False
chunk_size: int = 20
delay_ms: int = 8
class StreamProcessor:
"""
流式响应处理器
负责处理 SSE 流的解析、错误检测响应生成。
负责处理 SSE 流的解析、错误检测响应生成和平滑输出
从 ChatHandlerBase 中提取,使其职责更加单一。
"""
@@ -40,6 +55,7 @@ class StreamProcessor:
on_streaming_start: Optional[Callable[[], None]] = None,
*,
collect_text: bool = False,
smoothing_config: Optional[StreamSmoothingConfig] = None,
):
"""
初始化流处理器
@@ -48,11 +64,17 @@ class StreamProcessor:
request_id: 请求 ID用于日志
default_parser: 默认响应解析器
on_streaming_start: 流开始时的回调(用于更新状态)
collect_text: 是否收集文本内容
smoothing_config: 流式平滑输出配置
"""
self.request_id = request_id
self.default_parser = default_parser
self.on_streaming_start = on_streaming_start
self.collect_text = collect_text
self.smoothing_config = smoothing_config or StreamSmoothingConfig()
# 内容提取器缓存
self._extractors: dict[str, ContentExtractor] = {}
def get_parser_for_provider(self, ctx: StreamContext) -> ResponseParser:
"""
@@ -127,6 +149,13 @@ class StreamProcessor:
if event_type in ("response.completed", "message_stop"):
ctx.has_completion = True
# 检查 OpenAI 格式的 finish_reason
choices = data.get("choices", [])
if choices and isinstance(choices, list) and len(choices) > 0:
finish_reason = choices[0].get("finish_reason")
if finish_reason is not None:
ctx.has_completion = True
async def prefetch_and_check_error(
self,
byte_iterator: Any,
@@ -369,7 +398,7 @@ class StreamProcessor:
sse_parser: SSE 解析器
line: 原始行数据
"""
# SSEEventParser 以去掉换行符的单行文本作为输入;这里统一剔除 CR/LF
# SSEEventParser 以"去掉换行符"的单行文本作为输入;这里统一剔除 CR/LF
# 避免把空行误判成 "\n" 并导致事件边界解析错误。
normalized_line = line.rstrip("\r\n")
events = sse_parser.feed_line(normalized_line)
@@ -400,32 +429,201 @@ class StreamProcessor:
响应数据块
"""
try:
# 断连检查频率:每次 await 都会引入调度开销,过于频繁会让流式"发一段停一段"
# 这里按时间间隔节流,兼顾及时停止上游读取与吞吐平滑性。
next_disconnect_check_at = 0.0
disconnect_check_interval_s = 0.25
# 使用后台任务检查断连,完全不阻塞流式传输
disconnected = False
async for chunk in stream_generator:
now = time.monotonic()
if now >= next_disconnect_check_at:
next_disconnect_check_at = now + disconnect_check_interval_s
async def check_disconnect_background() -> None:
nonlocal disconnected
while not disconnected and not ctx.has_completion:
await asyncio.sleep(0.5)
if await is_disconnected():
logger.warning(f"ID:{self.request_id} | Client disconnected")
ctx.status_code = 499 # Client Closed Request
ctx.error_message = "client_disconnected"
disconnected = True
break
yield chunk
except asyncio.CancelledError:
ctx.status_code = 499
ctx.error_message = "client_disconnected"
# 启动后台检查任务
check_task = asyncio.create_task(check_disconnect_background())
try:
async for chunk in stream_generator:
if disconnected:
# 如果响应已完成,客户端断开不算失败
if ctx.has_completion:
logger.info(
f"ID:{self.request_id} | Client disconnected after completion"
)
else:
logger.warning(f"ID:{self.request_id} | Client disconnected")
ctx.status_code = 499
ctx.error_message = "client_disconnected"
break
yield chunk
finally:
check_task.cancel()
try:
await check_task
except asyncio.CancelledError:
pass
except asyncio.CancelledError:
# 如果响应已完成,不标记为失败
if not ctx.has_completion:
ctx.status_code = 499
ctx.error_message = "client_disconnected"
raise
except Exception as e:
ctx.status_code = 500
ctx.error_message = str(e)
raise
async def create_smoothed_stream(
self,
stream_generator: AsyncGenerator[bytes, None],
) -> AsyncGenerator[bytes, None]:
"""
创建平滑输出的流生成器
如果启用了平滑输出,将大 chunk 拆分成小块并添加微小延迟。
否则直接透传原始流。
Args:
stream_generator: 原始流生成器
Yields:
平滑处理后的响应数据块
"""
if not self.smoothing_config.enabled:
# 未启用平滑输出,直接透传
async for chunk in stream_generator:
yield chunk
return
# 启用平滑输出
buffer = b""
is_first_content = True
async for chunk in stream_generator:
buffer += chunk
# 按双换行分割 SSE 事件(标准 SSE 格式)
while b"\n\n" in buffer:
event_block, buffer = buffer.split(b"\n\n", 1)
event_str = event_block.decode("utf-8", errors="replace")
# 解析事件块
lines = event_str.strip().split("\n")
data_str = None
event_type = ""
for line in lines:
line = line.rstrip("\r")
if line.startswith("event: "):
event_type = line[7:].strip()
elif line.startswith("data: "):
data_str = line[6:]
# 没有 data 行,直接透传
if data_str is None:
yield event_block + b"\n\n"
continue
# [DONE] 直接透传
if data_str.strip() == "[DONE]":
yield event_block + b"\n\n"
continue
# 尝试解析 JSON
try:
data = json.loads(data_str)
except json.JSONDecodeError:
yield event_block + b"\n\n"
continue
# 检测格式并提取内容
content, extractor = self._detect_format_and_extract(data)
# 只有内容长度大于 1 才需要平滑处理
if content and len(content) > 1 and extractor:
# 获取配置的延迟
delay_seconds = self._calculate_delay()
# 拆分内容
content_chunks = self._split_content(content)
for i, sub_content in enumerate(content_chunks):
is_first = is_first_content and i == 0
# 使用提取器创建新 chunk
sse_chunk = extractor.create_chunk(
data,
sub_content,
event_type=event_type,
is_first=is_first,
)
yield sse_chunk
# 除了最后一个块,其他块之间加延迟
if i < len(content_chunks) - 1:
await asyncio.sleep(delay_seconds)
is_first_content = False
else:
# 不需要拆分,直接透传
yield event_block + b"\n\n"
if content:
is_first_content = False
# 处理剩余数据
if buffer:
yield buffer
def _get_extractor(self, format_name: str) -> Optional[ContentExtractor]:
"""获取或创建格式对应的提取器(带缓存)"""
if format_name not in self._extractors:
extractor = get_extractor(format_name)
if extractor:
self._extractors[format_name] = extractor
return self._extractors.get(format_name)
def _detect_format_and_extract(
self, data: dict
) -> tuple[Optional[str], Optional[ContentExtractor]]:
"""
检测数据格式并提取内容
依次尝试各格式的提取器,返回第一个成功提取内容的结果。
Returns:
(content, extractor): 提取的内容和对应的提取器
"""
for format_name in get_extractor_formats():
extractor = self._get_extractor(format_name)
if extractor:
content = extractor.extract_content(data)
if content is not None:
return content, extractor
return None, None
def _calculate_delay(self) -> float:
"""获取配置的延迟(秒)"""
return self.smoothing_config.delay_ms / 1000.0
def _split_content(self, content: str) -> list[str]:
"""
按块拆分文本
"""
chunk_size = self.smoothing_config.chunk_size
text_length = len(content)
if text_length <= chunk_size:
return [content]
# 按块拆分
chunks = []
for i in range(0, text_length, chunk_size):
chunks.append(content[i : i + chunk_size])
return chunks
async def _cleanup(
self,
response_ctx: Any,
@@ -440,3 +638,128 @@ class StreamProcessor:
await http_client.aclose()
except Exception:
pass
async def create_smoothed_stream(
stream_generator: AsyncGenerator[bytes, None],
chunk_size: int = 20,
delay_ms: int = 8,
) -> AsyncGenerator[bytes, None]:
"""
独立的平滑流生成函数
供 CLI handler 等场景使用,无需创建完整的 StreamProcessor 实例。
Args:
stream_generator: 原始流生成器
chunk_size: 每块字符数
delay_ms: 每块之间的延迟毫秒数
Yields:
平滑处理后的响应数据块
"""
processor = _LightweightSmoother(chunk_size=chunk_size, delay_ms=delay_ms)
async for chunk in processor.smooth(stream_generator):
yield chunk
class _LightweightSmoother:
"""
轻量级平滑处理器
只包含平滑输出所需的最小逻辑,不依赖 StreamProcessor 的其他功能。
"""
def __init__(self, chunk_size: int = 20, delay_ms: int = 8) -> None:
self.chunk_size = chunk_size
self.delay_ms = delay_ms
self._extractors: dict[str, ContentExtractor] = {}
def _get_extractor(self, format_name: str) -> Optional[ContentExtractor]:
if format_name not in self._extractors:
extractor = get_extractor(format_name)
if extractor:
self._extractors[format_name] = extractor
return self._extractors.get(format_name)
def _detect_format_and_extract(
self, data: dict
) -> tuple[Optional[str], Optional[ContentExtractor]]:
for format_name in get_extractor_formats():
extractor = self._get_extractor(format_name)
if extractor:
content = extractor.extract_content(data)
if content is not None:
return content, extractor
return None, None
def _calculate_delay(self) -> float:
return self.delay_ms / 1000.0
def _split_content(self, content: str) -> list[str]:
text_length = len(content)
if text_length <= self.chunk_size:
return [content]
return [content[i : i + self.chunk_size] for i in range(0, text_length, self.chunk_size)]
async def smooth(
self, stream_generator: AsyncGenerator[bytes, None]
) -> AsyncGenerator[bytes, None]:
buffer = b""
is_first_content = True
async for chunk in stream_generator:
buffer += chunk
while b"\n\n" in buffer:
event_block, buffer = buffer.split(b"\n\n", 1)
event_str = event_block.decode("utf-8", errors="replace")
lines = event_str.strip().split("\n")
data_str = None
event_type = ""
for line in lines:
line = line.rstrip("\r")
if line.startswith("event: "):
event_type = line[7:].strip()
elif line.startswith("data: "):
data_str = line[6:]
if data_str is None:
yield event_block + b"\n\n"
continue
if data_str.strip() == "[DONE]":
yield event_block + b"\n\n"
continue
try:
data = json.loads(data_str)
except json.JSONDecodeError:
yield event_block + b"\n\n"
continue
content, extractor = self._detect_format_and_extract(data)
if content and len(content) > 1 and extractor:
delay_seconds = self._calculate_delay()
content_chunks = self._split_content(content)
for i, sub_content in enumerate(content_chunks):
is_first = is_first_content and i == 0
sse_chunk = extractor.create_chunk(
data, sub_content, event_type=event_type, is_first=is_first
)
yield sse_chunk
if i < len(content_chunks) - 1:
await asyncio.sleep(delay_seconds)
is_first_content = False
else:
yield event_block + b"\n\n"
if content:
is_first_content = False
if buffer:
yield buffer

View File

@@ -131,10 +131,5 @@ class ClaudeChatHandler(ChatHandlerBase):
Returns:
规范化后的响应
"""
if self.response_normalizer and self.response_normalizer.should_normalize(response):
result: Dict[str, Any] = self.response_normalizer.normalize_claude_response(
response_data=response,
request_id=self.request_id,
)
return result
# 作为中转站,直接透传响应,不做标准化处理
return response

View File

@@ -148,17 +148,6 @@ class GeminiChatHandler(ChatHandlerBase):
Returns:
规范化后的响应
TODO: 如果需要,实现响应规范化逻辑
"""
# 可选:使用 response_normalizer 进行规范化
# if (
# self.response_normalizer
# and self.response_normalizer.should_normalize(response)
# ):
# return self.response_normalizer.normalize_gemini_response(
# response_data=response,
# request_id=self.request_id,
# strict=False,
# )
# 作为中转站,直接透传响应,不做标准化处理
return response

View File

@@ -128,10 +128,5 @@ class OpenAIChatHandler(ChatHandlerBase):
Returns:
规范化后的响应
"""
if self.response_normalizer and self.response_normalizer.should_normalize(response):
return self.response_normalizer.normalize_openai_response(
response_data=response,
request_id=self.request_id,
strict=False,
)
# 作为中转站,直接透传响应,不做标准化处理
return response

View File

@@ -120,7 +120,7 @@ class RedisClientManager:
if self._circuit_open_until and time.time() < self._circuit_open_until:
remaining = self._circuit_open_until - time.time()
logger.warning(
"Redis 客户端处于熔断状态,跳过初始化,剩余 %.1f 秒 (last_error: %s)",
"Redis 客户端处于熔断状态,跳过初始化,剩余 {:.1f} 秒 (last_error: {})",
remaining,
self._last_error,
)
@@ -200,7 +200,7 @@ class RedisClientManager:
if self._consecutive_failures >= self._circuit_threshold:
self._circuit_open_until = time.time() + self._circuit_reset_seconds
logger.warning(
"Redis 初始化连续失败 %s 次,开启熔断 %s 秒。"
"Redis 初始化连续失败 {} 次,开启熔断 {} 秒。"
"熔断期间以下功能将降级: 缓存亲和性、分布式并发控制、RPM限流。"
"可通过管理 API /api/admin/system/redis/reset-circuit 手动重置。",
self._consecutive_failures,

View File

@@ -105,6 +105,13 @@ class Config:
self.llm_api_rate_limit = int(os.getenv("LLM_API_RATE_LIMIT", "100"))
self.public_api_rate_limit = int(os.getenv("PUBLIC_API_RATE_LIMIT", "60"))
# 可信代理配置
# TRUSTED_PROXY_COUNT: 信任的代理层数(默认 1即信任最近一层代理
# 设置为 0 表示不信任任何代理头,直接使用连接 IP
# 当服务部署在 Nginx/CloudFlare 等反向代理后面时,设置为对应的代理层数
# 如果服务直接暴露公网,应设置为 0 以防止 IP 伪造
self.trusted_proxy_count = int(os.getenv("TRUSTED_PROXY_COUNT", "1"))
# 异常处理配置
# 设置为 True 时ProxyException 会传播到路由层以便记录 provider_request_headers
# 设置为 False 时,使用全局异常处理器统一处理

View File

@@ -10,8 +10,8 @@ class APIFormat(Enum):
"""API格式枚举 - 决定请求/响应的处理方式"""
CLAUDE = "CLAUDE" # Claude API 格式
OPENAI = "OPENAI" # OpenAI API 格式
CLAUDE_CLI = "CLAUDE_CLI" # Claude CLI API 格式(使用 authorization: Bearer
OPENAI = "OPENAI" # OpenAI API 格式
OPENAI_CLI = "OPENAI_CLI" # OpenAI CLI/Responses API 格式(用于 Claude Code 等客户端)
GEMINI = "GEMINI" # Google Gemini API 格式
GEMINI_CLI = "GEMINI_CLI" # Gemini CLI API 格式

View File

@@ -188,12 +188,16 @@ class ProviderNotAvailableException(ProviderException):
message: str,
provider_name: Optional[str] = None,
request_metadata: Optional[Any] = None,
upstream_status: Optional[int] = None,
upstream_response: Optional[str] = None,
):
super().__init__(
message=message,
provider_name=provider_name,
request_metadata=request_metadata,
)
self.upstream_status = upstream_status
self.upstream_response = upstream_response
class ProviderTimeoutException(ProviderException):
@@ -442,6 +446,36 @@ class EmbeddedErrorException(ProviderException):
self.error_status = error_status
class ProviderCompatibilityException(ProviderException):
"""Provider 兼容性错误异常 - 应该触发故障转移
用于处理因 Provider 不支持某些参数或功能导致的错误。
这类错误不是用户请求本身的问题,换一个 Provider 可能就能成功,应该触发故障转移。
常见场景:
- Unsupported parameter不支持的参数
- Unsupported model不支持的模型
- Unsupported feature不支持的功能
"""
def __init__(
self,
message: str,
provider_name: Optional[str] = None,
status_code: int = 400,
upstream_error: Optional[str] = None,
request_metadata: Optional[Any] = None,
):
self.upstream_error = upstream_error
super().__init__(
message=message,
provider_name=provider_name,
request_metadata=request_metadata,
)
# 覆盖状态码为 400保持与上游一致
self.status_code = status_code
class UpstreamClientException(ProxyException):
"""上游返回的客户端错误异常 - HTTP 4xx 错误,不应该重试

View File

@@ -153,7 +153,7 @@ def _log_pool_capacity():
total_estimated = theoretical * workers
safe_limit = config.pg_max_connections - config.pg_reserved_connections
logger.info(
"数据库连接池配置: pool_size=%s, max_overflow=%s, workers=%s, total_estimated=%s, safe_limit=%s",
"数据库连接池配置: pool_size={}, max_overflow={}, workers={}, total_estimated={}, safe_limit={}",
config.db_pool_size,
config.db_max_overflow,
workers,
@@ -162,7 +162,7 @@ def _log_pool_capacity():
)
if total_estimated > safe_limit:
logger.warning(
"数据库连接池总需求可能超过 PostgreSQL 限制: %s > %s (pg_max_connections - reserved)"
"数据库连接池总需求可能超过 PostgreSQL 限制: {} > {} (pg_max_connections - reserved)"
"建议调整 DB_POOL_SIZE/DB_MAX_OVERFLOW 或减少 worker 数",
total_estimated,
safe_limit,
@@ -260,7 +260,8 @@ def get_db(request: Request = None) -> Generator[Session, None, None]: # type:
2. **管理后台 API**
- 路由层显式调用 db.commit()
- 每个操作独立提交,不依赖中间件
- 提交后设置 request.state.tx_committed_by_route = True
- 中间件看到此标志后跳过 commit只负责 close
3. **后台任务/调度器**
- 使用独立 Session通过 create_session() 或 next(get_db())
@@ -271,6 +272,17 @@ def get_db(request: Request = None) -> Generator[Session, None, None]: # type:
- FastAPI 请求:通过 Depends(get_db) 注入,支持中间件管理的 session 复用
- 非请求上下文:直接调用 get_db(),退化为独立 session 模式
路由层提交事务示例
==================
```python
@router.post("/example")
async def example(request: Request, db: Session = Depends(get_db)):
# ... 业务逻辑 ...
db.commit()
request.state.tx_committed_by_route = True # 告知中间件已提交
return {"message": "success"}
```
注意事项
========
- 本函数不自动提交事务

View File

@@ -4,13 +4,10 @@
"""
from contextlib import asynccontextmanager
from pathlib import Path
import uvicorn
from fastapi import FastAPI, HTTPException, Request
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from src.api.admin import router as admin_router
from src.api.announcements import router as announcement_router
@@ -49,7 +46,7 @@ async def initialize_providers():
# 从数据库加载所有活跃的提供商
providers = (
db.query(Provider)
.filter(Provider.is_active == True)
.filter(Provider.is_active.is_(True))
.order_by(Provider.provider_priority.asc())
.all()
)
@@ -122,6 +119,7 @@ async def lifespan(app: FastAPI):
logger.info("初始化全局Redis客户端...")
from src.clients.redis_client import get_redis_client
redis_client = None
try:
redis_client = await get_redis_client(require_redis=config.require_redis)
if redis_client:
@@ -133,6 +131,7 @@ async def lifespan(app: FastAPI):
logger.exception("[ERROR] Redis连接失败应用启动中止")
raise
logger.warning(f"Redis连接失败但配置允许降级将继续使用内存模式: {e}")
redis_client = None
# 初始化并发管理器内部会使用Redis
logger.info("初始化并发管理器...")
@@ -297,33 +296,6 @@ app.include_router(dashboard_router) # 仪表盘端点
app.include_router(public_router) # 公开API端点用户可查看提供商和模型
app.include_router(monitoring_router) # 监控端点
# 静态文件服务(前端构建产物)
# 检查前端构建目录是否存在
frontend_dist = Path(__file__).parent.parent / "frontend" / "dist"
if frontend_dist.exists():
# 挂载静态资源目录
app.mount("/assets", StaticFiles(directory=str(frontend_dist / "assets")), name="assets")
# SPA catch-all路由 - 必须放在最后
@app.get("/{full_path:path}")
async def serve_spa(request: Request, full_path: str):
"""
处理所有未匹配的GET请求返回index.html供前端路由处理
仅对非API路径生效
"""
# 如果是API路径不处理
if full_path.startswith("api/") or full_path.startswith("v1/"):
raise HTTPException(status_code=404, detail="Not Found")
# 返回index.html让前端路由处理
index_file = frontend_dist / "index.html"
if index_file.exists():
return FileResponse(str(index_file))
else:
raise HTTPException(status_code=404, detail="Frontend not built")
else:
logger.warning("前端构建目录不存在,前端路由将无法使用")
def main():

View File

@@ -1,16 +1,17 @@
"""
统一的插件中间件
统一的插件中间件(纯 ASGI 实现)
负责协调所有插件的调用
注意:使用纯 ASGI middleware 而非 BaseHTTPMiddleware
以避免 Starlette 已知的流式响应兼容性问题。
"""
import hashlib
import time
from typing import Any, Awaitable, Callable, Optional
from typing import Optional
from fastapi import HTTPException, Request
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response as StarletteResponse
from starlette.requests import Request
from starlette.types import ASGIApp, Message, Receive, Scope, Send
from src.config import config
from src.core.logger import logger
@@ -18,20 +19,25 @@ from src.plugins.manager import get_plugin_manager
from src.plugins.rate_limit.base import RateLimitResult
class PluginMiddleware(BaseHTTPMiddleware):
class PluginMiddleware:
"""
统一的插件调用中间件
统一的插件调用中间件(纯 ASGI 实现)
职责:
- 性能监控
- 限流控制 (可选)
- 数据库会话生命周期管理
注意: 认证由各路由通过 Depends() 显式声明,不在中间件层处理
为什么使用纯 ASGI 而非 BaseHTTPMiddleware:
- BaseHTTPMiddleware 会缓冲整个响应体,对流式响应不友好
- BaseHTTPMiddleware 与 StreamingResponse 存在已知兼容性问题
- 纯 ASGI 可以直接透传流式响应,无额外开销
"""
def __init__(self, app: Any) -> None:
super().__init__(app)
def __init__(self, app: ASGIApp) -> None:
self.app = app
self.plugin_manager = get_plugin_manager()
# 从配置读取速率限制值
@@ -61,152 +67,159 @@ class PluginMiddleware(BaseHTTPMiddleware):
"/v1/completions",
]
async def dispatch(
self, request: Request, call_next: Callable[[Request], Awaitable[StarletteResponse]]
) -> StarletteResponse:
"""处理请求并调用相应插件"""
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""ASGI 入口点"""
if scope["type"] != "http":
# 非 HTTP 请求(如 WebSocket直接透传
await self.app(scope, receive, send)
return
# 构建 Request 对象以便复用现有逻辑
request = Request(scope, receive, send)
# 记录请求开始时间
start_time = time.time()
# 设置 request.state 属性
# 注意Starlette 的 Request 对象总是有 state 属性State 实例)
request.state.request_id = request.headers.get("x-request-id", "")
request.state.start_time = start_time
# 标记:若请求过程中通过 Depends(get_db) 创建了会话,则由本中间件统一管理其生命周期
request.state.db_managed_by_middleware = True
response = None
exception_to_raise = None
# 1. 限流检查(在调用下游之前)
rate_limit_result = await self._call_rate_limit_plugins(request)
if rate_limit_result and not rate_limit_result.allowed:
# 限流触发返回429
await self._send_rate_limit_response(send, rate_limit_result)
return
# 2. 预处理插件调用
await self._call_pre_request_plugins(request)
# 用于捕获响应状态码
response_status_code: int = 0
async def send_wrapper(message: Message) -> None:
nonlocal response_status_code
if message["type"] == "http.response.start":
response_status_code = message.get("status", 0)
await send(message)
# 3. 调用下游应用
exception_occurred: Optional[Exception] = None
try:
# 1. 限流插件调用(可选功能)
rate_limit_result = await self._call_rate_limit_plugins(request)
if rate_limit_result and not rate_limit_result.allowed:
# 限流触发返回429
headers = rate_limit_result.headers or {}
raise HTTPException(
status_code=429,
detail=rate_limit_result.message or "Rate limit exceeded",
headers=headers,
)
# 2. 预处理插件调用
await self._call_pre_request_plugins(request)
# 处理请求
response = await call_next(request)
# 3. 提交关键数据库事务(在返回响应前)
# 这确保了 Usage 记录、配额扣减等关键数据在响应返回前持久化
try:
db = getattr(request.state, "db", None)
if isinstance(db, Session):
db.commit()
except Exception as commit_error:
logger.error(f"关键事务提交失败: {commit_error}")
try:
if isinstance(db, Session):
db.rollback()
except Exception:
pass
await self._call_error_plugins(request, commit_error, start_time)
# 返回 500 错误,因为数据可能不一致
response = JSONResponse(
status_code=500,
content={
"type": "error",
"error": {
"type": "database_error",
"message": "数据保存失败,请重试",
},
},
)
# 跳过后处理插件,直接返回错误响应
return response
# 4. 后处理插件调用(监控等,非关键操作)
# 这些操作失败不应影响用户响应
await self._call_post_request_plugins(request, response, start_time)
# 注意不在此处添加限流响应头因为在BaseHTTPMiddleware中
# 响应返回后修改headers会导致Content-Length不匹配错误
# 限流响应头已在返回429错误时正确包含见上面的HTTPException
except RuntimeError as e:
if str(e) == "No response returned.":
db = getattr(request.state, "db", None)
if isinstance(db, Session):
try:
db.rollback()
except Exception:
pass
logger.error("Downstream handler completed without returning a response")
await self._call_error_plugins(request, e, start_time)
if isinstance(db, Session):
try:
db.commit()
except Exception:
pass
response = JSONResponse(
status_code=500,
content={
"type": "error",
"error": {
"type": "internal_error",
"message": "Internal server error: downstream handler returned no response.",
},
},
)
else:
exception_to_raise = e
await self.app(scope, receive, send_wrapper)
except Exception as e:
# 回滚数据库事务
db = getattr(request.state, "db", None)
if isinstance(db, Session):
try:
db.rollback()
except Exception:
pass
exception_occurred = e
# 错误处理插件调用
await self._call_error_plugins(request, e, start_time)
raise
finally:
# 4. 数据库会话清理(无论成功与否)
await self._cleanup_db_session(request, exception_occurred)
# 尝试提交错误日志
if isinstance(db, Session):
# 5. 后处理插件调用(仅在成功时)
if not exception_occurred and response_status_code > 0:
await self._call_post_request_plugins(request, response_status_code, start_time)
async def _send_rate_limit_response(
self, send: Send, result: RateLimitResult
) -> None:
"""发送 429 限流响应"""
import json
body = json.dumps({
"type": "error",
"error": {
"type": "rate_limit_error",
"message": result.message or "Rate limit exceeded",
},
}).encode("utf-8")
headers = [(b"content-type", b"application/json")]
if result.headers:
for key, value in result.headers.items():
headers.append((key.lower().encode(), str(value).encode()))
await send({
"type": "http.response.start",
"status": 429,
"headers": headers,
})
await send({
"type": "http.response.body",
"body": body,
})
async def _cleanup_db_session(
self, request: Request, exception: Optional[Exception]
) -> None:
"""清理数据库会话
事务策略:
- 如果 request.state.tx_committed_by_route 为 True说明路由已自行提交中间件只负责 close
- 否则由中间件统一 commit/rollback
这避免了双重提交的问题,同时保持向后兼容。
"""
from sqlalchemy.orm import Session
db = getattr(request.state, "db", None)
if not isinstance(db, Session):
return
# 检查是否由路由层已经提交
tx_committed_by_route = getattr(request.state, "tx_committed_by_route", False)
try:
if exception is not None:
# 发生异常,回滚事务(无论谁负责提交)
try:
db.rollback()
except Exception as rollback_error:
logger.debug(f"回滚事务时出错(可忽略): {rollback_error}")
elif not tx_committed_by_route:
# 正常完成且路由未自行提交,由中间件提交事务
try:
db.commit()
except:
pass
exception_to_raise = e
except Exception as commit_error:
logger.error(f"关键事务提交失败: {commit_error}")
try:
db.rollback()
except Exception:
pass
# 如果 tx_committed_by_route 为 True跳过 commit路由已提交
finally:
db = getattr(request.state, "db", None)
if isinstance(db, Session):
try:
db.close()
except Exception as close_error:
# 连接池会处理连接的回收,这里的异常不应影响响应
logger.debug(f"关闭数据库连接时出错(可忽略): {close_error}")
# 在 finally 块之后处理异常和响应
if exception_to_raise:
raise exception_to_raise
return response
# 关闭会话,归还连接到连接池
try:
db.close()
except Exception as close_error:
logger.debug(f"关闭数据库连接时出错(可忽略): {close_error}")
def _get_client_ip(self, request: Request) -> str:
"""
获取客户端 IP 地址,支持代理头
注意:此方法信任 X-Forwarded-For 和 X-Real-IP 头,
仅当服务部署在可信代理(如 Nginx、CloudFlare后面时才安全。
如果服务直接暴露公网,攻击者可伪造这些头绕过限流。
"""
# 从配置获取可信代理层数(默认为 1即信任最近一层代理
trusted_proxy_count = getattr(config, "trusted_proxy_count", 1)
# 优先从代理头获取真实 IP
forwarded_for = request.headers.get("x-forwarded-for")
if forwarded_for:
# X-Forwarded-For 可能包含多个 IP取第一个
return forwarded_for.split(",")[0].strip()
# X-Forwarded-For 格式: "client, proxy1, proxy2"
# 从右往左数 trusted_proxy_count 个,取其左边的第一个
ips = [ip.strip() for ip in forwarded_for.split(",")]
if len(ips) > trusted_proxy_count:
return ips[-(trusted_proxy_count + 1)]
elif ips:
return ips[0]
real_ip = request.headers.get("x-real-ip")
if real_ip:
@@ -248,13 +261,11 @@ class PluginMiddleware(BaseHTTPMiddleware):
auth_header = request.headers.get("authorization", "")
api_key = request.headers.get("x-api-key", "")
if auth_header.startswith("Bearer "):
if auth_header.lower().startswith("bearer "):
api_key = auth_header[7:]
if api_key:
# 使用 API Key 的哈希作为限制 key避免日志泄露完整 key
import hashlib
key_hash = hashlib.sha256(api_key.encode()).hexdigest()[:16]
key = f"llm_api_key:{key_hash}"
request.state.rate_limit_key_type = "api_key"
@@ -319,7 +330,10 @@ class PluginMiddleware(BaseHTTPMiddleware):
)
else:
# 限流触发,记录日志
logger.warning(f"速率限制触发: {getattr(request.state, 'rate_limit_key_type', 'unknown')}")
logger.warning(
"速率限制触发: {}",
getattr(request.state, "rate_limit_key_type", "unknown"),
)
return result
return None
except Exception as e:
@@ -332,7 +346,7 @@ class PluginMiddleware(BaseHTTPMiddleware):
pass
async def _call_post_request_plugins(
self, request: Request, response: StarletteResponse, start_time: float
self, request: Request, status_code: int, start_time: float
) -> None:
"""调用请求后的插件"""
@@ -345,8 +359,8 @@ class PluginMiddleware(BaseHTTPMiddleware):
monitor_labels = {
"method": request.method,
"endpoint": request.url.path,
"status": str(response.status_code),
"status_class": f"{response.status_code // 100}xx",
"status": str(status_code),
"status_class": f"{status_code // 100}xx",
}
# 记录请求计数
@@ -368,6 +382,7 @@ class PluginMiddleware(BaseHTTPMiddleware):
self, request: Request, error: Exception, start_time: float
) -> None:
"""调用错误处理插件"""
from fastapi import HTTPException
duration = time.time() - start_time
@@ -380,7 +395,7 @@ class PluginMiddleware(BaseHTTPMiddleware):
error=error,
context={
"endpoint": f"{request.method} {request.url.path}",
"request_id": request.state.request_id,
"request_id": getattr(request.state, "request_id", ""),
"duration": duration,
},
)

View File

@@ -3,6 +3,7 @@ JWT认证插件
支持JWT Bearer token认证
"""
import hashlib
from typing import Optional
from fastapi import Request
@@ -46,8 +47,8 @@ class JwtAuthPlugin(AuthPlugin):
logger.debug("未找到JWT token")
return None
# 记录认证尝试的详细信息
logger.info(f"JWT认证尝试 - 路径: {request.url.path}, Token前20位: {token[:20]}...")
token_fingerprint = hashlib.sha256(token.encode()).hexdigest()[:12]
logger.info(f"JWT认证尝试 - 路径: {request.url.path}, token_fp={token_fingerprint}")
try:
# 验证JWT token

View File

@@ -63,14 +63,16 @@ class JWTBlacklistService:
if ttl_seconds <= 0:
# Token 已经过期,不需要加入黑名单
logger.debug(f"Token 已过期,无需加入黑名单: {token[:10]}...")
token_fp = JWTBlacklistService._get_token_hash(token)[:12]
logger.debug("Token 已过期,无需加入黑名单: token_fp={}", token_fp)
return True
# 存储到 Redis设置 TTL 为 Token 过期时间
# 值存储为原因字符串
await redis_client.setex(redis_key, ttl_seconds, reason)
logger.info(f"Token 已加入黑名单: {token[:10]}... (原因: {reason}, TTL: {ttl_seconds}s)")
token_fp = JWTBlacklistService._get_token_hash(token)[:12]
logger.info("Token 已加入黑名单: token_fp={} (原因: {}, TTL: {}s)", token_fp, reason, ttl_seconds)
return True
except Exception as e:
@@ -109,7 +111,8 @@ class JWTBlacklistService:
if exists:
# 获取黑名单原因(可选)
reason = await redis_client.get(redis_key)
logger.warning(f"检测到黑名单 Token: {token[:10]}... (原因: {reason})")
token_fp = JWTBlacklistService._get_token_hash(token)[:12]
logger.warning("检测到黑名单 Token: token_fp={} (原因: {})", token_fp, reason)
return True
return False
@@ -148,9 +151,11 @@ class JWTBlacklistService:
deleted = await redis_client.delete(redis_key)
if deleted:
logger.info(f"Token 已从黑名单移除: {token[:10]}...")
token_fp = JWTBlacklistService._get_token_hash(token)[:12]
logger.info("Token 已从黑名单移除: token_fp={}", token_fp)
else:
logger.debug(f"Token 不在黑名单中: {token[:10]}...")
token_fp = JWTBlacklistService._get_token_hash(token)[:12]
logger.debug("Token 不在黑名单中: token_fp={}", token_fp)
return bool(deleted)

View File

@@ -3,6 +3,7 @@
"""
import os
import hashlib
import secrets
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional
@@ -169,7 +170,8 @@ class AuthService:
key_record.last_used_at = datetime.now(timezone.utc)
db.commit() # 立即提交事务,释放数据库锁,避免阻塞后续请求
logger.debug(f"API认证成功: 用户 {user.email} (Key: {api_key[:10]}...)")
api_key_fp = hashlib.sha256(api_key.encode()).hexdigest()[:12]
logger.debug("API认证成功: 用户 {} (api_key_fp={})", user.email, api_key_fp)
return user, key_record
@staticmethod

View File

@@ -13,6 +13,7 @@ from src.core.exceptions import InvalidRequestException, NotFoundException
from src.core.logger import logger
from src.models.api import ModelCreate, ModelResponse, ModelUpdate
from src.models.database import Model, Provider
from src.api.base.models_service import invalidate_models_list_cache
from src.services.cache.invalidation import get_cache_invalidation_service
from src.services.cache.model_cache import ModelCacheService
@@ -75,6 +76,10 @@ class ModelService:
)
logger.info(f"创建模型成功: provider={provider.name}, model={model.provider_model_name}, global_model_id={model.global_model_id}")
# 清除 /v1/models 列表缓存
asyncio.create_task(invalidate_models_list_cache())
return model
except IntegrityError as e:
@@ -197,6 +202,9 @@ class ModelService:
cache_service = get_cache_invalidation_service()
cache_service.on_model_changed(model.provider_id, model.global_model_id)
# 清除 /v1/models 列表缓存
asyncio.create_task(invalidate_models_list_cache())
logger.info(f"更新模型成功: id={model_id}, 最终 supports_vision: {model.supports_vision}, supports_function_calling: {model.supports_function_calling}, supports_extended_thinking: {model.supports_extended_thinking}")
return model
except IntegrityError as e:
@@ -261,6 +269,9 @@ class ModelService:
cache_service = get_cache_invalidation_service()
cache_service.on_model_changed(cache_info["provider_id"], cache_info["global_model_id"])
# 清除 /v1/models 列表缓存
asyncio.create_task(invalidate_models_list_cache())
logger.info(f"删除模型成功: id={model_id}, provider_model_name={cache_info['provider_model_name']}, "
f"global_model_id={cache_info['global_model_id'][:8] if cache_info['global_model_id'] else 'None'}...")
except Exception as e:
@@ -295,6 +306,9 @@ class ModelService:
cache_service = get_cache_invalidation_service()
cache_service.on_model_changed(model.provider_id, model.global_model_id)
# 清除 /v1/models 列表缓存
asyncio.create_task(invalidate_models_list_cache())
status = "可用" if is_available else "不可用"
logger.info(f"更新模型可用状态: id={model_id}, status={status}")
return model
@@ -358,6 +372,9 @@ class ModelService:
for model in created_models:
db.refresh(model)
logger.info(f"批量创建 {len(created_models)} 个模型成功")
# 清除 /v1/models 列表缓存
asyncio.create_task(invalidate_models_list_cache())
except IntegrityError as e:
db.rollback()
logger.error(f"批量创建模型失败: {str(e)}")

View File

@@ -15,6 +15,7 @@ from src.core.enums import APIFormat
from src.core.exceptions import (
ConcurrencyLimitError,
ProviderAuthException,
ProviderCompatibilityException,
ProviderException,
ProviderNotAvailableException,
ProviderRateLimitException,
@@ -81,7 +82,9 @@ class ErrorClassifier:
"context_length_exceeded", # 上下文长度超限
"content_length_limit", # 请求内容长度超限 (Claude API)
"content_length_exceeds", # 内容长度超限变体 (AWS CodeWhisperer)
"max_tokens", # token 数超限
# 注意:移除了 "max_tokens",因为 max_tokens 相关错误可能是 Provider 兼容性问题
# 如 "Unsupported parameter: 'max_tokens' is not supported with this model"
# 这类错误应由 COMPATIBILITY_ERROR_PATTERNS 处理
"invalid_prompt", # 无效的提示词
"content too long", # 内容过长
"input is too long", # 输入过长 (AWS)
@@ -136,6 +139,19 @@ class ErrorClassifier:
"CONTENT_POLICY_VIOLATION",
)
# Provider 兼容性错误模式 - 这类错误应该触发故障转移
# 因为换一个 Provider 可能就能成功
COMPATIBILITY_ERROR_PATTERNS: Tuple[str, ...] = (
"unsupported parameter", # 不支持的参数
"unsupported model", # 不支持的模型
"unsupported feature", # 不支持的功能
"not supported with this model", # 此模型不支持
"model does not support", # 模型不支持
"parameter is not supported", # 参数不支持
"feature is not supported", # 功能不支持
"not available for this model", # 此模型不可用
)
def _parse_error_response(self, error_text: Optional[str]) -> Dict[str, Any]:
"""
解析错误响应为结构化数据
@@ -261,6 +277,25 @@ class ErrorClassifier:
search_text = f"{parsed['message']} {parsed['raw']}".lower()
return any(pattern.lower() in search_text for pattern in self.CLIENT_ERROR_PATTERNS)
def _is_compatibility_error(self, error_text: Optional[str]) -> bool:
"""
检测错误响应是否为 Provider 兼容性错误(应触发故障转移)
这类错误是因为 Provider 不支持某些参数或功能导致的,
换一个 Provider 可能就能成功。
Args:
error_text: 错误响应文本
Returns:
是否为兼容性错误
"""
if not error_text:
return False
search_text = error_text.lower()
return any(pattern.lower() in search_text for pattern in self.COMPATIBILITY_ERROR_PATTERNS)
def _extract_error_message(self, error_text: Optional[str]) -> Optional[str]:
"""
从错误响应中提取错误消息
@@ -425,6 +460,16 @@ class ErrorClassifier:
),
)
# 400 错误:先检查是否为 Provider 兼容性错误(应触发故障转移)
if status == 400 and self._is_compatibility_error(error_response_text):
logger.info(f"检测到 Provider 兼容性错误,将触发故障转移: {extracted_message}")
return ProviderCompatibilityException(
message=extracted_message or "Provider 不支持此请求",
provider_name=provider_name,
status_code=400,
upstream_error=error_response_text,
)
# 400 错误:检查是否为客户端请求错误(不应重试)
if status == 400 and self._is_client_error(error_response_text):
logger.info(f"检测到客户端请求错误,不进行重试: {extracted_message}")

View File

@@ -427,6 +427,9 @@ class FallbackOrchestrator:
)
# str(cause) 可能为空(如 httpx 超时异常),使用 repr() 作为备用
error_msg = str(cause) or repr(cause)
# 如果是 ProviderNotAvailableException附加上游响应
if hasattr(cause, "upstream_response") and cause.upstream_response:
error_msg = f"{error_msg} | 上游响应: {cause.upstream_response[:500]}"
RequestCandidateService.mark_candidate_failed(
db=self.db,
candidate_id=candidate_record_id,
@@ -439,6 +442,9 @@ class FallbackOrchestrator:
# 未知错误:记录失败并抛出
error_msg = str(cause) or repr(cause)
# 如果是 ProviderNotAvailableException附加上游响应
if hasattr(cause, "upstream_response") and cause.upstream_response:
error_msg = f"{error_msg} | 上游响应: {cause.upstream_response[:500]}"
RequestCandidateService.mark_candidate_failed(
db=self.db,
candidate_id=candidate_record_id,

View File

@@ -1,61 +0,0 @@
"""响应标准化服务,用于 STANDARD 模式下的响应格式验证和补全"""
from typing import Any, Dict, Optional
from src.core.logger import logger
from src.models.claude import ClaudeResponse
class ResponseNormalizer:
"""响应标准化器 - 用于标准模式下验证和补全响应字段"""
@staticmethod
def normalize_claude_response(
response_data: Dict[str, Any], request_id: Optional[str] = None
) -> Dict[str, Any]:
"""
标准化 Claude API 响应
Args:
response_data: 原始响应数据
request_id: 请求ID用于日志
Returns:
标准化后的响应数据(失败时返回原始数据)
"""
if "error" in response_data:
logger.debug(f"[ResponseNormalizer] 检测到错误响应,跳过标准化 | ID:{request_id}")
return response_data
try:
validated = ClaudeResponse.model_validate(response_data)
normalized = validated.model_dump(mode="json", exclude_none=False)
logger.debug(f"[ResponseNormalizer] 响应标准化成功 | ID:{request_id}")
return normalized
except Exception as e:
logger.debug(f"[ResponseNormalizer] 响应验证失败,透传原始数据 | ID:{request_id}")
return response_data
@staticmethod
def should_normalize(response_data: Dict[str, Any]) -> bool:
"""
判断是否需要进行标准化
Args:
response_data: 响应数据
Returns:
是否需要标准化
"""
# 错误响应不需要标准化
if "error" in response_data:
return False
# 已经包含新字段的响应不需要再次标准化
if "context_management" in response_data and "container" in response_data:
return False
return True

View File

@@ -289,11 +289,17 @@ class RequestResult:
status_code = 500
error_type = "internal_error"
# 构建错误消息,包含上游响应信息
error_message = str(exception)
if isinstance(exception, ProviderNotAvailableException):
if exception.upstream_response:
error_message = f"{error_message} | 上游响应: {exception.upstream_response[:500]}"
return cls(
status=RequestStatus.FAILED,
metadata=metadata,
status_code=status_code,
error_message=str(exception),
error_message=error_message,
error_type=error_type,
response_time_ms=response_time_ms,
is_stream=is_stream,

View File

@@ -21,7 +21,7 @@ from sqlalchemy.orm import Session
from src.core.logger import logger
from src.database import create_session
from src.models.database import Usage
from src.models.database import AuditLog, Usage
from src.services.system.config import SystemConfigService
from src.services.system.scheduler import get_scheduler
from src.services.system.stats_aggregator import StatsAggregatorService
@@ -91,6 +91,15 @@ class CleanupScheduler:
name="Pending状态清理",
)
# 审计日志清理 - 凌晨 4 点执行
scheduler.add_cron_job(
self._scheduled_audit_cleanup,
hour=4,
minute=0,
job_id="audit_cleanup",
name="审计日志清理",
)
# 启动时执行一次初始化任务
asyncio.create_task(self._run_startup_tasks())
@@ -145,6 +154,10 @@ class CleanupScheduler:
"""Pending 清理任务(定时调用)"""
await self._perform_pending_cleanup()
async def _scheduled_audit_cleanup(self):
"""审计日志清理任务(定时调用)"""
await self._perform_audit_cleanup()
# ========== 实际任务实现 ==========
async def _perform_stats_aggregation(self, backfill: bool = False):
@@ -330,6 +343,70 @@ class CleanupScheduler:
finally:
db.close()
async def _perform_audit_cleanup(self):
"""执行审计日志清理任务"""
db = create_session()
try:
# 检查是否启用自动清理
if not SystemConfigService.get_config(db, "enable_auto_cleanup", True):
logger.info("自动清理已禁用,跳过审计日志清理")
return
# 获取审计日志保留天数(默认 30 天,最少 7 天)
audit_retention_days = max(
SystemConfigService.get_config(db, "audit_log_retention_days", 30),
7, # 最少保留 7 天,防止误配置删除所有审计日志
)
batch_size = SystemConfigService.get_config(db, "cleanup_batch_size", 1000)
cutoff_time = datetime.now(timezone.utc) - timedelta(days=audit_retention_days)
logger.info(f"开始清理 {audit_retention_days} 天前的审计日志...")
total_deleted = 0
while True:
# 先查询要删除的记录 ID分批
records_to_delete = (
db.query(AuditLog.id)
.filter(AuditLog.created_at < cutoff_time)
.limit(batch_size)
.all()
)
if not records_to_delete:
break
record_ids = [r.id for r in records_to_delete]
# 执行删除
result = db.execute(
delete(AuditLog)
.where(AuditLog.id.in_(record_ids))
.execution_options(synchronize_session=False)
)
rows_deleted = result.rowcount
db.commit()
total_deleted += rows_deleted
logger.debug(f"已删除 {rows_deleted} 条审计日志,累计 {total_deleted}")
await asyncio.sleep(0.1)
if total_deleted > 0:
logger.info(f"审计日志清理完成,共删除 {total_deleted} 条记录")
else:
logger.info("无需清理的审计日志")
except Exception as e:
logger.exception(f"审计日志清理失败: {e}")
try:
db.rollback()
except Exception:
pass
finally:
db.close()
async def _perform_cleanup(self):
"""执行清理任务"""
db = create_session()

View File

@@ -12,7 +12,6 @@ from src.core.logger import logger
from src.models.database import Provider, SystemConfig
class LogLevel(str, Enum):
"""日志记录级别"""
@@ -94,6 +93,35 @@ class SystemConfigService:
return default
@classmethod
def get_configs(cls, db: Session, keys: List[str]) -> Dict[str, Any]:
"""
批量获取系统配置值
Args:
db: 数据库会话
keys: 配置键列表
Returns:
配置键值字典
"""
result = {}
# 一次查询获取所有配置
configs = db.query(SystemConfig).filter(SystemConfig.key.in_(keys)).all()
config_map = {c.key: c.value for c in configs}
# 填充结果,不存在的使用默认值
for key in keys:
if key in config_map:
result[key] = config_map[key]
elif key in cls.DEFAULT_CONFIGS:
result[key] = cls.DEFAULT_CONFIGS[key]["value"]
else:
result[key] = None
return result
@staticmethod
def set_config(db: Session, key: str, value: Any, description: str = None) -> SystemConfig:
"""设置系统配置值"""
@@ -111,6 +139,7 @@ class SystemConfigService:
db.commit()
db.refresh(config)
return config
@staticmethod
@@ -153,8 +182,8 @@ class SystemConfigService:
for config in configs
]
@staticmethod
def delete_config(db: Session, key: str) -> bool:
@classmethod
def delete_config(cls, db: Session, key: str) -> bool:
"""删除系统配置"""
config = db.query(SystemConfig).filter(SystemConfig.key == key).first()
if config:

View File

@@ -1217,15 +1217,19 @@ class UsageService:
request_id: str,
status: str,
error_message: Optional[str] = None,
provider: Optional[str] = None,
target_model: Optional[str] = None,
) -> Optional[Usage]:
"""
快速更新使用记录状态(不更新其他字段)
快速更新使用记录状态
Args:
db: 数据库会话
request_id: 请求ID
status: 新状态 (pending, streaming, completed, failed)
error_message: 错误消息(仅在 failed 状态时使用)
provider: 提供商名称可选streaming 状态时更新)
target_model: 映射后的目标模型名(可选)
Returns:
更新后的 Usage 记录,如果未找到则返回 None
@@ -1239,6 +1243,10 @@ class UsageService:
usage.status = status
if error_message:
usage.error_message = error_message
if provider:
usage.provider = provider
if target_model:
usage.target_model = target_model
db.commit()

View File

@@ -457,7 +457,7 @@ class StreamUsageTracker:
logger.debug(f"ID:{self.request_id} | 开始跟踪流式响应 | 估算输入tokens:{self.input_tokens}")
# 更新状态为 streaming
# 更新状态为 streaming,同时更新 provider
if self.request_id:
try:
from src.services.usage.service import UsageService
@@ -465,6 +465,7 @@ class StreamUsageTracker:
db=self.db,
request_id=self.request_id,
status="streaming",
provider=self.provider,
)
except Exception as e:
logger.warning(f"更新使用记录状态为 streaming 失败: {e}")

View File

@@ -210,7 +210,15 @@ class ApiKeyService:
@staticmethod
def check_rate_limit(db: Session, api_key: ApiKey, window_minutes: int = 1) -> tuple[bool, int]:
"""检查速率限制"""
"""检查速率限制
Returns:
(is_allowed, remaining): 是否允许请求,剩余可用次数
当 rate_limit 为 None 时表示不限制,返回 (True, -1)
"""
# 如果 rate_limit 为 None表示不限制
if api_key.rate_limit is None:
return True, -1 # -1 表示无限制
# 计算时间窗口
window_start = datetime.now(timezone.utc) - timedelta(minutes=window_minutes)

View File

@@ -3,6 +3,7 @@
提供统一的用户认证和授权功能
"""
import hashlib
from typing import Optional
from fastapi import Depends, Header, HTTPException, status
@@ -44,10 +45,17 @@ async def get_current_user(
payload = await AuthService.verify_token(token, token_type="access")
except HTTPException as token_error:
# 保持原始的HTTP状态码如401 Unauthorized不要转换为403
logger.error(f"Token验证失败: {token_error.status_code}: {token_error.detail}, Token前10位: {token[:10]}...")
token_fp = hashlib.sha256(token.encode()).hexdigest()[:12]
logger.error(
"Token验证失败: {}: {}, token_fp={}",
token_error.status_code,
token_error.detail,
token_fp,
)
raise # 重新抛出原始异常,保持状态码
except Exception as token_error:
logger.error(f"Token验证失败: {token_error}, Token前10位: {token[:10]}...")
token_fp = hashlib.sha256(token.encode()).hexdigest()[:12]
logger.error("Token验证失败: {}, token_fp={}", token_error, token_fp)
raise ForbiddenException("无效的Token")
user_id = payload.get("user_id")
@@ -63,7 +71,8 @@ async def get_current_user(
raise ForbiddenException("无效的认证凭据")
# 仅在DEBUG模式下记录详细信息
logger.debug(f"尝试获取用户: user_id={user_id}, token前10位: {token[:10]}...")
token_fp = hashlib.sha256(token.encode()).hexdigest()[:12]
logger.debug("尝试获取用户: user_id={}, token_fp={}", user_id, token_fp)
# 确保user_id是字符串格式UUID
if not isinstance(user_id, str):

View File

@@ -7,29 +7,47 @@ from typing import Optional
from fastapi import Request
from src.config import config
def get_client_ip(request: Request) -> str:
"""
获取客户端真实IP地址
按优先级检查:
1. X-Forwarded-For 头(支持代理链)
1. X-Forwarded-For 头(支持代理链,根据可信代理数量提取
2. X-Real-IP 头Nginx 代理)
3. 直接客户端IP
安全说明:
- 此函数根据 TRUSTED_PROXY_COUNT 配置来决定信任的代理层数
- 当 TRUSTED_PROXY_COUNT=0 时,不信任任何代理头,直接使用连接 IP
- 当服务直接暴露公网时,应设置 TRUSTED_PROXY_COUNT=0 以防止 IP 伪造
Args:
request: FastAPI Request 对象
Returns:
str: 客户端IP地址如果无法获取则返回 "unknown"
"""
trusted_proxy_count = config.trusted_proxy_count
# 如果不信任任何代理,直接返回连接 IP
if trusted_proxy_count == 0:
if request.client and request.client.host:
return request.client.host
return "unknown"
# 优先检查 X-Forwarded-For 头(可能包含代理链)
forwarded_for = request.headers.get("X-Forwarded-For")
if forwarded_for:
# X-Forwarded-For 格式: "client, proxy1, proxy2",取第一个(真实客户端)
client_ip = forwarded_for.split(",")[0].strip()
if client_ip:
return client_ip
# X-Forwarded-For 格式: "client, proxy1, proxy2"
# 从右往左数 trusted_proxy_count 个,取其左边的第一个
ips = [ip.strip() for ip in forwarded_for.split(",") if ip.strip()]
if len(ips) > trusted_proxy_count:
return ips[-(trusted_proxy_count + 1)]
elif ips:
return ips[0]
# 检查 X-Real-IP 头(通常由 Nginx 设置)
real_ip = request.headers.get("X-Real-IP")
@@ -91,20 +109,32 @@ def get_request_metadata(request: Request) -> dict:
}
def extract_ip_from_headers(headers: dict) -> str:
def extract_ip_from_headers(headers: dict, trusted_proxy_count: Optional[int] = None) -> str:
"""
从HTTP头字典中提取IP地址用于中间件等场景
Args:
headers: HTTP头字典
trusted_proxy_count: 可信代理层数None 时使用配置值
Returns:
str: 客户端IP地址
"""
if trusted_proxy_count is None:
trusted_proxy_count = config.trusted_proxy_count
# 如果不信任任何代理,返回 unknown调用方需要用其他方式获取连接 IP
if trusted_proxy_count == 0:
return "unknown"
# 检查 X-Forwarded-For
forwarded_for = headers.get("x-forwarded-for", "")
if forwarded_for:
return forwarded_for.split(",")[0].strip()
ips = [ip.strip() for ip in forwarded_for.split(",") if ip.strip()]
if len(ips) > trusted_proxy_count:
return ips[-(trusted_proxy_count + 1)]
elif ips:
return ips[0]
# 检查 X-Real-IP
real_ip = headers.get("x-real-ip", "")

View File

@@ -361,3 +361,61 @@ class TestPipelineAdminAuth:
assert result == mock_user
assert mock_request.state.user_id == "admin-123"
@pytest.mark.asyncio
async def test_authenticate_admin_lowercase_bearer(self, pipeline: ApiRequestPipeline) -> None:
"""测试 bearer (小写) 前缀也能正确解析"""
mock_user = MagicMock()
mock_user.id = "admin-123"
mock_user.is_active = True
mock_request = MagicMock()
mock_request.headers = {"authorization": "bearer valid-token"}
mock_request.state = MagicMock()
mock_db = MagicMock()
mock_db.query.return_value.filter.return_value.first.return_value = mock_user
with patch.object(
pipeline.auth_service,
"verify_token",
new_callable=AsyncMock,
return_value={"user_id": "admin-123"},
) as mock_verify:
result = await pipeline._authenticate_admin(mock_request, mock_db)
mock_verify.assert_awaited_once_with("valid-token", token_type="access")
assert result == mock_user
class TestPipelineUserAuth:
"""测试普通用户 JWT 认证"""
@pytest.fixture
def pipeline(self) -> ApiRequestPipeline:
return ApiRequestPipeline()
@pytest.mark.asyncio
async def test_authenticate_user_lowercase_bearer(self, pipeline: ApiRequestPipeline) -> None:
"""测试 bearer (小写) 前缀也能正确解析"""
mock_user = MagicMock()
mock_user.id = "user-123"
mock_user.is_active = True
mock_request = MagicMock()
mock_request.headers = {"authorization": "bearer valid-token"}
mock_request.state = MagicMock()
mock_db = MagicMock()
mock_db.query.return_value.filter.return_value.first.return_value = mock_user
with patch.object(
pipeline.auth_service,
"verify_token",
new_callable=AsyncMock,
return_value={"user_id": "user-123"},
) as mock_verify:
result = await pipeline._authenticate_user(mock_request, mock_db)
mock_verify.assert_awaited_once_with("valid-token", token_type="access")
assert result == mock_user