diff --git a/frontend/src/views/admin/SystemSettings.vue b/frontend/src/views/admin/SystemSettings.vue
index bda9ae2..60dfb33 100644
--- a/frontend/src/views/admin/SystemSettings.vue
+++ b/frontend/src/views/admin/SystemSettings.vue
@@ -470,68 +470,20 @@
title="流式输出"
description="配置流式响应的输出效果"
>
-
-
-
-
-
-
-
- 将上游返回的大块内容拆分成小块,模拟打字效果
-
-
-
-
-
+
+
-
-
- 每次输出的字符数量(1-100)
-
-
-
-
-
-
-
- 每块之间的延迟毫秒数(1-500)
+
+ 自动根据文本长度调整输出速度:短文本逐字符输出(打字感更强),长文本按块输出(避免卡顿)
@@ -886,8 +838,6 @@ interface SystemConfig {
audit_log_retention_days: number
// 流式输出
stream_smoothing_enabled: boolean
- stream_smoothing_chunk_size: number
- stream_smoothing_delay_ms: number
}
const loading = ref(false)
@@ -939,8 +889,6 @@ const systemConfig = ref
({
audit_log_retention_days: 30,
// 流式输出
stream_smoothing_enabled: false,
- stream_smoothing_chunk_size: 5,
- stream_smoothing_delay_ms: 15,
})
// 计算属性:KB 和 字节 之间的转换
@@ -999,8 +947,6 @@ async function loadSystemConfig() {
'audit_log_retention_days',
// 流式输出
'stream_smoothing_enabled',
- 'stream_smoothing_chunk_size',
- 'stream_smoothing_delay_ms',
]
for (const key of configs) {
@@ -1114,16 +1060,6 @@ async function saveSystemConfig() {
value: systemConfig.value.stream_smoothing_enabled,
description: '是否启用流式平滑输出'
},
- {
- key: 'stream_smoothing_chunk_size',
- value: systemConfig.value.stream_smoothing_chunk_size,
- description: '流式平滑输出每个小块的字符数'
- },
- {
- key: 'stream_smoothing_delay_ms',
- value: systemConfig.value.stream_smoothing_delay_ms,
- description: '流式平滑输出每个小块之间的延迟毫秒数'
- },
]
const promises = configItems.map(item =>
diff --git a/src/api/handlers/base/chat_handler_base.py b/src/api/handlers/base/chat_handler_base.py
index 38c4b39..417aeec 100644
--- a/src/api/handlers/base/chat_handler_base.py
+++ b/src/api/handlers/base/chat_handler_base.py
@@ -298,16 +298,11 @@ class ChatHandlerBase(BaseMessageHandler, ABC):
def update_streaming_status() -> None:
self._update_usage_to_streaming_with_ctx(ctx)
- # 从数据库批量读取流式平滑输出配置(单次查询)
- smoothing_cfg = SystemConfigService.get_configs(
- self.db,
- ["stream_smoothing_enabled", "stream_smoothing_chunk_size", "stream_smoothing_delay_ms"],
- )
- smoothing_config = StreamSmoothingConfig(
- enabled=bool(smoothing_cfg.get("stream_smoothing_enabled", False)),
- chunk_size=int(smoothing_cfg.get("stream_smoothing_chunk_size") or 5),
- delay_ms=int(smoothing_cfg.get("stream_smoothing_delay_ms") or 15),
+ # 读取流式平滑输出开关
+ smoothing_enabled = bool(
+ SystemConfigService.get_config(self.db, "stream_smoothing_enabled", False)
)
+ smoothing_config = StreamSmoothingConfig(enabled=smoothing_enabled)
# 创建流处理器
stream_processor = StreamProcessor(
diff --git a/src/api/handlers/base/cli_handler_base.py b/src/api/handlers/base/cli_handler_base.py
index 0d8ca16..e4b863e 100644
--- a/src/api/handlers/base/cli_handler_base.py
+++ b/src/api/handlers/base/cli_handler_base.py
@@ -34,7 +34,9 @@ from src.api.handlers.base.base_handler import (
from src.api.handlers.base.parsers import get_parser_for_format
from src.api.handlers.base.request_builder import PassthroughRequestBuilder
from src.api.handlers.base.stream_context import StreamContext
+from src.api.handlers.base.stream_processor import create_smoothed_stream
from src.api.handlers.base.utils import build_sse_headers
+from src.services.system.config import SystemConfigService
# 直接从具体模块导入,避免循环依赖
from src.api.handlers.base.response_parser import (
@@ -352,8 +354,17 @@ class CliMessageHandlerBase(BaseMessageHandler):
# 创建监控流
monitored_stream = self._create_monitored_stream(ctx, stream_generator)
+ # 创建平滑输出流(如果启用)
+ smoothing_enabled = bool(
+ SystemConfigService.get_config(self.db, "stream_smoothing_enabled", False)
+ )
+ if smoothing_enabled:
+ final_stream = create_smoothed_stream(monitored_stream)
+ else:
+ final_stream = monitored_stream
+
return StreamingResponse(
- monitored_stream,
+ final_stream,
media_type="text/event-stream",
headers=build_sse_headers(),
background=background_tasks,
diff --git a/src/api/handlers/base/content_extractors.py b/src/api/handlers/base/content_extractors.py
new file mode 100644
index 0000000..99dc78c
--- /dev/null
+++ b/src/api/handlers/base/content_extractors.py
@@ -0,0 +1,274 @@
+"""
+流式内容提取器 - 策略模式实现
+
+为不同 API 格式(OpenAI、Claude、Gemini)提供内容提取和 chunk 构造的抽象。
+StreamSmoother 使用这些提取器来处理不同格式的 SSE 事件。
+"""
+
+import copy
+import json
+from abc import ABC, abstractmethod
+from typing import Optional
+
+
+class ContentExtractor(ABC):
+ """
+ 流式内容提取器抽象基类
+
+ 定义从 SSE 事件中提取文本内容和构造新 chunk 的接口。
+ 每种 API 格式(OpenAI、Claude、Gemini)需要实现自己的提取器。
+ """
+
+ @abstractmethod
+ def extract_content(self, data: dict) -> Optional[str]:
+ """
+ 从 SSE 数据中提取可拆分的文本内容
+
+ Args:
+ data: 解析后的 JSON 数据
+
+ Returns:
+ 提取的文本内容,如果无法提取则返回 None
+ """
+ pass
+
+ @abstractmethod
+ def create_chunk(
+ self,
+ original_data: dict,
+ new_content: str,
+ event_type: str = "",
+ is_first: bool = False,
+ ) -> bytes:
+ """
+ 使用新内容构造 SSE chunk
+
+ Args:
+ original_data: 原始 JSON 数据
+ new_content: 新的文本内容
+ event_type: SSE 事件类型(某些格式需要)
+ is_first: 是否是第一个 chunk(用于保留 role 等字段)
+
+ Returns:
+ 编码后的 SSE 字节数据
+ """
+ pass
+
+
+class OpenAIContentExtractor(ContentExtractor):
+ """
+ OpenAI 格式内容提取器
+
+ 处理 OpenAI Chat Completions API 的流式响应格式:
+ - 数据结构: choices[0].delta.content
+ - 只在 delta 仅包含 role/content 时允许拆分,避免破坏 tool_calls 等结构
+ """
+
+ def extract_content(self, data: dict) -> Optional[str]:
+ if not isinstance(data, dict):
+ return None
+
+ choices = data.get("choices")
+ if not isinstance(choices, list) or len(choices) != 1:
+ return None
+
+ first_choice = choices[0]
+ if not isinstance(first_choice, dict):
+ return None
+
+ delta = first_choice.get("delta")
+ if not isinstance(delta, dict):
+ return None
+
+ content = delta.get("content")
+ if not isinstance(content, str):
+ return None
+
+ # 只有 delta 仅包含 role/content 时才允许拆分
+ # 避免破坏 tool_calls、function_call 等复杂结构
+ allowed_keys = {"role", "content"}
+ if not all(key in allowed_keys for key in delta.keys()):
+ return None
+
+ return content
+
+ def create_chunk(
+ self,
+ original_data: dict,
+ new_content: str,
+ event_type: str = "",
+ is_first: bool = False,
+ ) -> bytes:
+ new_data = original_data.copy()
+
+ if "choices" in new_data and new_data["choices"]:
+ new_choices = []
+ for choice in new_data["choices"]:
+ new_choice = choice.copy()
+ if "delta" in new_choice:
+ new_delta = {}
+ # 只有第一个 chunk 保留 role
+ if is_first and "role" in new_choice["delta"]:
+ new_delta["role"] = new_choice["delta"]["role"]
+ new_delta["content"] = new_content
+ new_choice["delta"] = new_delta
+ new_choices.append(new_choice)
+ new_data["choices"] = new_choices
+
+ return f"data: {json.dumps(new_data, ensure_ascii=False)}\n\n".encode("utf-8")
+
+
+class ClaudeContentExtractor(ContentExtractor):
+ """
+ Claude 格式内容提取器
+
+ 处理 Claude Messages API 的流式响应格式:
+ - 事件类型: content_block_delta
+ - 数据结构: delta.type=text_delta, delta.text
+ """
+
+ def extract_content(self, data: dict) -> Optional[str]:
+ if not isinstance(data, dict):
+ return None
+
+ # 检查事件类型
+ if data.get("type") != "content_block_delta":
+ return None
+
+ delta = data.get("delta", {})
+ if not isinstance(delta, dict):
+ return None
+
+ # 检查 delta 类型
+ if delta.get("type") != "text_delta":
+ return None
+
+ text = delta.get("text")
+ if not isinstance(text, str):
+ return None
+
+ return text
+
+ def create_chunk(
+ self,
+ original_data: dict,
+ new_content: str,
+ event_type: str = "",
+ is_first: bool = False,
+ ) -> bytes:
+ new_data = original_data.copy()
+
+ if "delta" in new_data:
+ new_delta = new_data["delta"].copy()
+ new_delta["text"] = new_content
+ new_data["delta"] = new_delta
+
+ # Claude 格式需要 event: 前缀
+ event_name = event_type or "content_block_delta"
+ return f"event: {event_name}\ndata: {json.dumps(new_data, ensure_ascii=False)}\n\n".encode(
+ "utf-8"
+ )
+
+
+class GeminiContentExtractor(ContentExtractor):
+ """
+ Gemini 格式内容提取器
+
+ 处理 Gemini API 的流式响应格式:
+ - 数据结构: candidates[0].content.parts[0].text
+ - 只有纯文本块才拆分
+ """
+
+ def extract_content(self, data: dict) -> Optional[str]:
+ if not isinstance(data, dict):
+ return None
+
+ candidates = data.get("candidates")
+ if not isinstance(candidates, list) or len(candidates) != 1:
+ return None
+
+ first_candidate = candidates[0]
+ if not isinstance(first_candidate, dict):
+ return None
+
+ content = first_candidate.get("content", {})
+ if not isinstance(content, dict):
+ return None
+
+ parts = content.get("parts", [])
+ if not isinstance(parts, list) or len(parts) != 1:
+ return None
+
+ first_part = parts[0]
+ if not isinstance(first_part, dict):
+ return None
+
+ text = first_part.get("text")
+ # 只有纯文本块(只有 text 字段)才拆分
+ if not isinstance(text, str) or len(first_part) != 1:
+ return None
+
+ return text
+
+ def create_chunk(
+ self,
+ original_data: dict,
+ new_content: str,
+ event_type: str = "",
+ is_first: bool = False,
+ ) -> bytes:
+ new_data = copy.deepcopy(original_data)
+
+ if "candidates" in new_data and new_data["candidates"]:
+ first_candidate = new_data["candidates"][0]
+ if "content" in first_candidate:
+ content = first_candidate["content"]
+ if "parts" in content and content["parts"]:
+ content["parts"][0]["text"] = new_content
+
+ return f"data: {json.dumps(new_data, ensure_ascii=False)}\n\n".encode("utf-8")
+
+
+# 提取器注册表
+_EXTRACTORS: dict[str, type[ContentExtractor]] = {
+ "openai": OpenAIContentExtractor,
+ "claude": ClaudeContentExtractor,
+ "gemini": GeminiContentExtractor,
+}
+
+
+def get_extractor(format_name: str) -> Optional[ContentExtractor]:
+ """
+ 根据格式名获取对应的内容提取器实例
+
+ Args:
+ format_name: 格式名称(openai, claude, gemini)
+
+ Returns:
+ 对应的提取器实例,如果格式不支持则返回 None
+ """
+ extractor_class = _EXTRACTORS.get(format_name.lower())
+ if extractor_class:
+ return extractor_class()
+ return None
+
+
+def register_extractor(format_name: str, extractor_class: type[ContentExtractor]) -> None:
+ """
+ 注册新的内容提取器
+
+ Args:
+ format_name: 格式名称
+ extractor_class: 提取器类
+ """
+ _EXTRACTORS[format_name.lower()] = extractor_class
+
+
+def get_extractor_formats() -> list[str]:
+ """
+ 获取所有已注册的格式名称列表
+
+ Returns:
+ 格式名称列表
+ """
+ return list(_EXTRACTORS.keys())
diff --git a/src/api/handlers/base/stream_processor.py b/src/api/handlers/base/stream_processor.py
index 9306a00..9ed2373 100644
--- a/src/api/handlers/base/stream_processor.py
+++ b/src/api/handlers/base/stream_processor.py
@@ -6,20 +6,26 @@
2. 响应流生成
3. 预读和嵌套错误检测
4. 客户端断开检测
+5. 流式平滑输出
"""
import asyncio
import codecs
import json
+import math
from dataclasses import dataclass
from typing import Any, AsyncGenerator, Callable, Optional
import httpx
+from src.api.handlers.base.content_extractors import (
+ ContentExtractor,
+ get_extractor,
+ get_extractor_formats,
+)
from src.api.handlers.base.parsers import get_parser_for_format
from src.api.handlers.base.response_parser import ResponseParser
from src.api.handlers.base.stream_context import StreamContext
-from src.api.handlers.base.stream_smoother import StreamSmoother
from src.core.exceptions import EmbeddedErrorException
from src.core.logger import logger
from src.models.database import Provider, ProviderEndpoint
@@ -31,18 +37,23 @@ class StreamSmoothingConfig:
"""流式平滑输出配置"""
enabled: bool = False
- chunk_size: int = 5
- delay_ms: int = 15
class StreamProcessor:
"""
流式响应处理器
- 负责处理 SSE 流的解析、错误检测和响应生成。
+ 负责处理 SSE 流的解析、错误检测、响应生成和平滑输出。
从 ChatHandlerBase 中提取,使其职责更加单一。
"""
+ # 平滑输出参数
+ CHUNK_SIZE = 5 # 长文本每块字符数
+ MIN_DELAY_MS = 15 # 长文本延迟(毫秒)
+ MAX_DELAY_MS = 24 # 短文本延迟(毫秒)
+ SHORT_TEXT_THRESHOLD = 10 # 短文本阈值(逐字符输出)
+ LONG_TEXT_THRESHOLD = 50 # 长文本阈值(按块输出)
+
def __init__(
self,
request_id: str,
@@ -68,6 +79,9 @@ class StreamProcessor:
self.collect_text = collect_text
self.smoothing_config = smoothing_config or StreamSmoothingConfig()
+ # 内容提取器缓存
+ self._extractors: dict[str, ContentExtractor] = {}
+
def get_parser_for_provider(self, ctx: StreamContext) -> ResponseParser:
"""
获取 Provider 格式的解析器
@@ -390,7 +404,7 @@ class StreamProcessor:
sse_parser: SSE 解析器
line: 原始行数据
"""
- # SSEEventParser 以“去掉换行符”的单行文本作为输入;这里统一剔除 CR/LF,
+ # SSEEventParser 以"去掉换行符"的单行文本作为输入;这里统一剔除 CR/LF,
# 避免把空行误判成 "\n" 并导致事件边界解析错误。
normalized_line = line.rstrip("\r\n")
events = sse_parser.feed_line(normalized_line)
@@ -489,12 +503,153 @@ class StreamProcessor:
return
# 启用平滑输出
- smoother = StreamSmoother(
- chunk_size=self.smoothing_config.chunk_size,
- delay_ms=self.smoothing_config.delay_ms,
+ buffer = b""
+ is_first_content = True
+
+ async for chunk in stream_generator:
+ buffer += chunk
+
+ # 按双换行分割 SSE 事件(标准 SSE 格式)
+ while b"\n\n" in buffer:
+ event_block, buffer = buffer.split(b"\n\n", 1)
+ event_str = event_block.decode("utf-8", errors="replace")
+
+ # 解析事件块
+ lines = event_str.strip().split("\n")
+ data_str = None
+ event_type = ""
+
+ for line in lines:
+ line = line.rstrip("\r")
+ if line.startswith("event: "):
+ event_type = line[7:].strip()
+ elif line.startswith("data: "):
+ data_str = line[6:]
+
+ # 没有 data 行,直接透传
+ if data_str is None:
+ yield event_block + b"\n\n"
+ continue
+
+ # [DONE] 直接透传
+ if data_str.strip() == "[DONE]":
+ yield event_block + b"\n\n"
+ continue
+
+ # 尝试解析 JSON
+ try:
+ data = json.loads(data_str)
+ except json.JSONDecodeError:
+ yield event_block + b"\n\n"
+ continue
+
+ # 检测格式并提取内容
+ content, extractor = self._detect_format_and_extract(data)
+
+ # 只有内容长度大于 1 才需要平滑处理
+ if content and len(content) > 1 and extractor:
+ # 计算动态延迟
+ delay_seconds = self._calculate_delay(len(content))
+
+ # 智能拆分
+ content_chunks = self._split_content(content)
+
+ for i, sub_content in enumerate(content_chunks):
+ is_first = is_first_content and i == 0
+
+ # 使用提取器创建新 chunk
+ sse_chunk = extractor.create_chunk(
+ data,
+ sub_content,
+ event_type=event_type,
+ is_first=is_first,
+ )
+
+ yield sse_chunk
+
+ # 除了最后一个块,其他块之间加延迟
+ if i < len(content_chunks) - 1:
+ await asyncio.sleep(delay_seconds)
+
+ is_first_content = False
+ else:
+ # 不需要拆分,直接透传
+ yield event_block + b"\n\n"
+ if content:
+ is_first_content = False
+
+ # 处理剩余数据
+ if buffer:
+ yield buffer
+
+ def _get_extractor(self, format_name: str) -> Optional[ContentExtractor]:
+ """获取或创建格式对应的提取器(带缓存)"""
+ if format_name not in self._extractors:
+ extractor = get_extractor(format_name)
+ if extractor:
+ self._extractors[format_name] = extractor
+ return self._extractors.get(format_name)
+
+ def _detect_format_and_extract(
+ self, data: dict
+ ) -> tuple[Optional[str], Optional[ContentExtractor]]:
+ """
+ 检测数据格式并提取内容
+
+ 依次尝试各格式的提取器,返回第一个成功提取内容的结果。
+
+ Returns:
+ (content, extractor): 提取的内容和对应的提取器
+ """
+ for format_name in get_extractor_formats():
+ extractor = self._get_extractor(format_name)
+ if extractor:
+ content = extractor.extract_content(data)
+ if content is not None:
+ return content, extractor
+
+ return None, None
+
+ def _calculate_delay(self, text_length: int) -> float:
+ """
+ 根据文本长度计算动态延迟(秒)
+
+ 短文本使用较大延迟(打字感更强),长文本使用较小延迟(避免卡顿)。
+ 中间长度使用对数插值平滑过渡。
+ """
+ if text_length <= self.SHORT_TEXT_THRESHOLD:
+ return self.MAX_DELAY_MS / 1000.0
+ if text_length >= self.LONG_TEXT_THRESHOLD:
+ return self.MIN_DELAY_MS / 1000.0
+
+ # 对数插值:平滑过渡
+ ratio = math.log(text_length / self.SHORT_TEXT_THRESHOLD) / math.log(
+ self.LONG_TEXT_THRESHOLD / self.SHORT_TEXT_THRESHOLD
)
- async for chunk in smoother.smooth_stream(stream_generator):
- yield chunk
+ delay_ms = self.MAX_DELAY_MS - ratio * (self.MAX_DELAY_MS - self.MIN_DELAY_MS)
+ return delay_ms / 1000.0
+
+ def _split_content(self, content: str) -> list[str]:
+ """
+ 根据文本长度智能拆分
+
+ 短文本:逐字符拆分(打字效果更真实)
+ 长文本:按 CHUNK_SIZE 拆分(避免过多延迟)
+ """
+ text_length = len(content)
+
+ if text_length <= self.CHUNK_SIZE:
+ return [content]
+
+ # 长文本按块拆分
+ if text_length >= self.LONG_TEXT_THRESHOLD:
+ chunks = []
+ for i in range(0, text_length, self.CHUNK_SIZE):
+ chunks.append(content[i : i + self.CHUNK_SIZE])
+ return chunks
+
+ # 短/中文本逐字符拆分
+ return list(content)
async def _cleanup(
self,
@@ -510,3 +665,137 @@ class StreamProcessor:
await http_client.aclose()
except Exception:
pass
+
+
+async def create_smoothed_stream(
+ stream_generator: AsyncGenerator[bytes, None],
+) -> AsyncGenerator[bytes, None]:
+ """
+ 独立的平滑流生成函数
+
+ 供 CLI handler 等场景使用,无需创建完整的 StreamProcessor 实例。
+
+ Args:
+ stream_generator: 原始流生成器
+
+ Yields:
+ 平滑处理后的响应数据块
+ """
+ processor = _LightweightSmoother()
+ async for chunk in processor.smooth(stream_generator):
+ yield chunk
+
+
+class _LightweightSmoother:
+ """
+ 轻量级平滑处理器
+
+ 只包含平滑输出所需的最小逻辑,不依赖 StreamProcessor 的其他功能。
+ """
+
+ CHUNK_SIZE = 5
+ MIN_DELAY_MS = 15
+ MAX_DELAY_MS = 24
+ SHORT_TEXT_THRESHOLD = 10
+ LONG_TEXT_THRESHOLD = 50
+
+ def __init__(self) -> None:
+ self._extractors: dict[str, ContentExtractor] = {}
+
+ def _get_extractor(self, format_name: str) -> Optional[ContentExtractor]:
+ if format_name not in self._extractors:
+ extractor = get_extractor(format_name)
+ if extractor:
+ self._extractors[format_name] = extractor
+ return self._extractors.get(format_name)
+
+ def _detect_format_and_extract(
+ self, data: dict
+ ) -> tuple[Optional[str], Optional[ContentExtractor]]:
+ for format_name in get_extractor_formats():
+ extractor = self._get_extractor(format_name)
+ if extractor:
+ content = extractor.extract_content(data)
+ if content is not None:
+ return content, extractor
+ return None, None
+
+ def _calculate_delay(self, text_length: int) -> float:
+ if text_length <= self.SHORT_TEXT_THRESHOLD:
+ return self.MAX_DELAY_MS / 1000.0
+ if text_length >= self.LONG_TEXT_THRESHOLD:
+ return self.MIN_DELAY_MS / 1000.0
+ ratio = math.log(text_length / self.SHORT_TEXT_THRESHOLD) / math.log(
+ self.LONG_TEXT_THRESHOLD / self.SHORT_TEXT_THRESHOLD
+ )
+ return (self.MAX_DELAY_MS - ratio * (self.MAX_DELAY_MS - self.MIN_DELAY_MS)) / 1000.0
+
+ def _split_content(self, content: str) -> list[str]:
+ text_length = len(content)
+ if text_length <= self.CHUNK_SIZE:
+ return [content]
+ if text_length >= self.LONG_TEXT_THRESHOLD:
+ return [content[i : i + self.CHUNK_SIZE] for i in range(0, text_length, self.CHUNK_SIZE)]
+ return list(content)
+
+ async def smooth(
+ self, stream_generator: AsyncGenerator[bytes, None]
+ ) -> AsyncGenerator[bytes, None]:
+ buffer = b""
+ is_first_content = True
+
+ async for chunk in stream_generator:
+ buffer += chunk
+
+ while b"\n\n" in buffer:
+ event_block, buffer = buffer.split(b"\n\n", 1)
+ event_str = event_block.decode("utf-8", errors="replace")
+
+ lines = event_str.strip().split("\n")
+ data_str = None
+ event_type = ""
+
+ for line in lines:
+ line = line.rstrip("\r")
+ if line.startswith("event: "):
+ event_type = line[7:].strip()
+ elif line.startswith("data: "):
+ data_str = line[6:]
+
+ if data_str is None:
+ yield event_block + b"\n\n"
+ continue
+
+ if data_str.strip() == "[DONE]":
+ yield event_block + b"\n\n"
+ continue
+
+ try:
+ data = json.loads(data_str)
+ except json.JSONDecodeError:
+ yield event_block + b"\n\n"
+ continue
+
+ content, extractor = self._detect_format_and_extract(data)
+
+ if content and len(content) > 1 and extractor:
+ delay_seconds = self._calculate_delay(len(content))
+ content_chunks = self._split_content(content)
+
+ for i, sub_content in enumerate(content_chunks):
+ is_first = is_first_content and i == 0
+ sse_chunk = extractor.create_chunk(
+ data, sub_content, event_type=event_type, is_first=is_first
+ )
+ yield sse_chunk
+ if i < len(content_chunks) - 1:
+ await asyncio.sleep(delay_seconds)
+
+ is_first_content = False
+ else:
+ yield event_block + b"\n\n"
+ if content:
+ is_first_content = False
+
+ if buffer:
+ yield buffer
diff --git a/src/api/handlers/base/stream_smoother.py b/src/api/handlers/base/stream_smoother.py
deleted file mode 100644
index 2f875e6..0000000
--- a/src/api/handlers/base/stream_smoother.py
+++ /dev/null
@@ -1,257 +0,0 @@
-"""
-流式平滑输出处理器
-
-将上游返回的大 chunk 拆分成小块,模拟更流畅的打字效果。
-支持 OpenAI、Claude、Gemini 格式的 SSE 事件。
-"""
-
-import asyncio
-import copy
-import json
-from typing import AsyncGenerator, Optional, Tuple
-
-
-class StreamSmoother:
- """
- 流式平滑输出处理器
-
- 将 SSE 事件中的大段 content 拆分成小块输出,
- 每块之间加入微小延迟,模拟打字效果。
- """
-
- def __init__(
- self,
- chunk_size: int = 5,
- delay_ms: int = 15,
- ):
- """
- 初始化平滑处理器
-
- Args:
- chunk_size: 每个小块的字符数
- delay_ms: 每个小块之间的延迟毫秒数
- """
- self.chunk_size = chunk_size
- self.delay_ms = delay_ms
- self.delay_seconds = self.delay_ms / 1000.0
-
- def _split_content(self, content: str) -> list[str]:
- """
- 将内容按字符数拆分
-
- 对于中文等多字节字符,按字符(而非字节)拆分。
- """
- if len(content) <= self.chunk_size:
- return [content]
-
- chunks = []
- for i in range(0, len(content), self.chunk_size):
- chunks.append(content[i : i + self.chunk_size])
- return chunks
-
- def _extract_content(self, data: dict) -> Tuple[Optional[str], str]:
- """
- 从 SSE 数据中提取可拆分的 content
-
- Returns:
- (content, format): content 为提取的文本,format 为检测到的格式
- format: "openai" | "claude" | "gemini" | "unknown"
- """
- if not isinstance(data, dict):
- return None, "unknown"
-
- # OpenAI 格式: choices[0].delta.content
- # 只在 delta 仅包含 role/content 时允许拆分,避免破坏 tool_calls 等结构
- choices = data.get("choices")
- if isinstance(choices, list) and len(choices) == 1:
- first_choice = choices[0]
- if isinstance(first_choice, dict):
- delta = first_choice.get("delta")
- if isinstance(delta, dict):
- content = delta.get("content")
- if isinstance(content, str):
- allowed_keys = {"role", "content"}
- if all(key in allowed_keys for key in delta.keys()):
- return content, "openai"
-
- # Claude 格式: type=content_block_delta, delta.type=text_delta, delta.text
- if data.get("type") == "content_block_delta":
- delta = data.get("delta", {})
- if isinstance(delta, dict) and delta.get("type") == "text_delta":
- text = delta.get("text")
- if isinstance(text, str):
- return text, "claude"
-
- # Gemini 格式: candidates[0].content.parts[0].text
- candidates = data.get("candidates")
- if isinstance(candidates, list) and len(candidates) == 1:
- first_candidate = candidates[0]
- if isinstance(first_candidate, dict):
- content = first_candidate.get("content", {})
- if isinstance(content, dict):
- parts = content.get("parts", [])
- if isinstance(parts, list) and len(parts) == 1:
- first_part = parts[0]
- if isinstance(first_part, dict):
- text = first_part.get("text")
- # 只有纯文本块才拆分
- if isinstance(text, str) and len(first_part) == 1:
- return text, "gemini"
-
- return None, "unknown"
-
- def _create_openai_chunk(
- self,
- original_data: dict,
- new_content: str,
- is_first: bool = False,
- ) -> bytes:
- """创建 OpenAI 格式的 SSE chunk"""
- new_data = original_data.copy()
-
- if "choices" in new_data and new_data["choices"]:
- new_choices = []
- for choice in new_data["choices"]:
- new_choice = choice.copy()
- if "delta" in new_choice:
- new_delta = {}
- # 只有第一个 chunk 保留 role
- if is_first and "role" in new_choice["delta"]:
- new_delta["role"] = new_choice["delta"]["role"]
- new_delta["content"] = new_content
- new_choice["delta"] = new_delta
- new_choices.append(new_choice)
- new_data["choices"] = new_choices
-
- return f"data: {json.dumps(new_data, ensure_ascii=False)}\n\n".encode("utf-8")
-
- def _create_claude_chunk(
- self,
- original_data: dict,
- new_content: str,
- event_type: str,
- ) -> bytes:
- """创建 Claude 格式的 SSE chunk"""
- new_data = original_data.copy()
-
- if "delta" in new_data:
- new_delta = new_data["delta"].copy()
- new_delta["text"] = new_content
- new_data["delta"] = new_delta
-
- # Claude 格式需要 event: 前缀
- return f"event: {event_type}\ndata: {json.dumps(new_data, ensure_ascii=False)}\n\n".encode(
- "utf-8"
- )
-
- def _create_gemini_chunk(
- self,
- original_data: dict,
- new_content: str,
- ) -> bytes:
- """创建 Gemini 格式的 SSE chunk"""
- new_data = copy.deepcopy(original_data)
-
- if "candidates" in new_data and new_data["candidates"]:
- first_candidate = new_data["candidates"][0]
- if "content" in first_candidate:
- content = first_candidate["content"]
- if "parts" in content and content["parts"]:
- content["parts"][0]["text"] = new_content
-
- return f"data: {json.dumps(new_data, ensure_ascii=False)}\n\n".encode("utf-8")
-
- async def smooth_stream(
- self,
- byte_iterator: AsyncGenerator[bytes, None],
- ) -> AsyncGenerator[bytes, None]:
- """
- 对字节流进行平滑处理
-
- 解析 SSE 事件,拆分大 content,添加延迟后输出。
-
- Args:
- byte_iterator: 原始字节流
-
- Yields:
- 平滑处理后的字节块
- """
- buffer = b""
- is_first_content = True
-
- async for chunk in byte_iterator:
- buffer += chunk
-
- # 按双换行分割 SSE 事件(标准 SSE 格式)
- while b"\n\n" in buffer:
- event_block, buffer = buffer.split(b"\n\n", 1)
- event_str = event_block.decode("utf-8", errors="replace")
-
- # 解析事件块
- lines = event_str.strip().split("\n")
- data_str = None
- event_type = ""
-
- for line in lines:
- line = line.rstrip("\r")
- if line.startswith("event: "):
- event_type = line[7:].strip()
- elif line.startswith("data: "):
- data_str = line[6:]
-
- # 没有 data 行,直接透传
- if data_str is None:
- yield event_block + b"\n\n"
- continue
-
- # [DONE] 直接透传
- if data_str.strip() == "[DONE]":
- yield event_block + b"\n\n"
- continue
-
- # 尝试解析 JSON
- try:
- data = json.loads(data_str)
- except json.JSONDecodeError:
- yield event_block + b"\n\n"
- continue
-
- # 提取 content 和格式
- content, fmt = self._extract_content(data)
-
- if content and len(content) > self.chunk_size:
- # 需要拆分
- content_chunks = self._split_content(content)
-
- for i, sub_content in enumerate(content_chunks):
- is_first = is_first_content and i == 0
-
- if fmt == "openai":
- sse_chunk = self._create_openai_chunk(data, sub_content, is_first)
- elif fmt == "claude":
- sse_chunk = self._create_claude_chunk(
- data, sub_content, event_type or "content_block_delta"
- )
- elif fmt == "gemini":
- sse_chunk = self._create_gemini_chunk(data, sub_content)
- else:
- # 未知格式,透传原始事件
- yield event_block + b"\n\n"
- break
-
- yield sse_chunk
-
- # 除了最后一个块,其他块之间加延迟
- if i < len(content_chunks) - 1:
- await asyncio.sleep(self.delay_seconds)
- else:
- is_first_content = False
- else:
- # 不需要拆分,直接透传
- yield event_block + b"\n\n"
- if content:
- is_first_content = False
-
- # 处理剩余数据
- if buffer:
- yield buffer
diff --git a/src/services/system/config.py b/src/services/system/config.py
index 59c7b2b..3ff9e32 100644
--- a/src/services/system/config.py
+++ b/src/services/system/config.py
@@ -81,15 +81,7 @@ class SystemConfigService:
# 流式平滑输出配置
"stream_smoothing_enabled": {
"value": False,
- "description": "是否启用流式平滑输出,将大 chunk 拆分成小块模拟打字效果",
- },
- "stream_smoothing_chunk_size": {
- "value": 5,
- "description": "流式平滑输出每个小块的字符数",
- },
- "stream_smoothing_delay_ms": {
- "value": 15,
- "description": "流式平滑输出每个小块之间的延迟毫秒数",
+ "description": "是否启用流式平滑输出,自动根据文本长度调整输出速度",
},
}