From 070121717ddd000652f838248be6ba2e1c6083f5 Mon Sep 17 00:00:00 2001 From: fawney19 Date: Fri, 19 Dec 2025 09:46:22 +0800 Subject: [PATCH] refactor: consolidate stream smoothing into StreamProcessor with intelligent timing - Move StreamSmoother functionality directly into StreamProcessor for better integration - Create ContentExtractor strategy pattern for format-agnostic content extraction - Implement intelligent dynamic delay calculation based on text length - Support three text length tiers: short (char-by-char), medium (chunked), long (chunked) - Remove manual chunk_size and delay_ms configuration - now auto-calculated - Simplify admin UI to single toggle switch with auto timing adjustment - Extract format detection logic to reusable content_extractors module - Improve code maintainability with cleaner architecture --- frontend/src/views/admin/SystemSettings.vue | 84 +----- src/api/handlers/base/chat_handler_base.py | 13 +- src/api/handlers/base/cli_handler_base.py | 13 +- src/api/handlers/base/content_extractors.py | 274 +++++++++++++++++ src/api/handlers/base/stream_processor.py | 309 +++++++++++++++++++- src/api/handlers/base/stream_smoother.py | 257 ---------------- src/services/system/config.py | 10 +- 7 files changed, 600 insertions(+), 360 deletions(-) create mode 100644 src/api/handlers/base/content_extractors.py delete mode 100644 src/api/handlers/base/stream_smoother.py diff --git a/frontend/src/views/admin/SystemSettings.vue b/frontend/src/views/admin/SystemSettings.vue index bda9ae2..60dfb33 100644 --- a/frontend/src/views/admin/SystemSettings.vue +++ b/frontend/src/views/admin/SystemSettings.vue @@ -470,68 +470,20 @@ title="流式输出" description="配置流式响应的输出效果" > -
-
-
- -
- -

- 将上游返回的大块内容拆分成小块,模拟打字效果 -

-
-
-
- +
+
- -

- 每次输出的字符数量(1-100) -

-
- -
- - -

- 每块之间的延迟毫秒数(1-500) +

+ 自动根据文本长度调整输出速度:短文本逐字符输出(打字感更强),长文本按块输出(避免卡顿)

@@ -886,8 +838,6 @@ interface SystemConfig { audit_log_retention_days: number // 流式输出 stream_smoothing_enabled: boolean - stream_smoothing_chunk_size: number - stream_smoothing_delay_ms: number } const loading = ref(false) @@ -939,8 +889,6 @@ const systemConfig = ref({ audit_log_retention_days: 30, // 流式输出 stream_smoothing_enabled: false, - stream_smoothing_chunk_size: 5, - stream_smoothing_delay_ms: 15, }) // 计算属性:KB 和 字节 之间的转换 @@ -999,8 +947,6 @@ async function loadSystemConfig() { 'audit_log_retention_days', // 流式输出 'stream_smoothing_enabled', - 'stream_smoothing_chunk_size', - 'stream_smoothing_delay_ms', ] for (const key of configs) { @@ -1114,16 +1060,6 @@ async function saveSystemConfig() { value: systemConfig.value.stream_smoothing_enabled, description: '是否启用流式平滑输出' }, - { - key: 'stream_smoothing_chunk_size', - value: systemConfig.value.stream_smoothing_chunk_size, - description: '流式平滑输出每个小块的字符数' - }, - { - key: 'stream_smoothing_delay_ms', - value: systemConfig.value.stream_smoothing_delay_ms, - description: '流式平滑输出每个小块之间的延迟毫秒数' - }, ] const promises = configItems.map(item => diff --git a/src/api/handlers/base/chat_handler_base.py b/src/api/handlers/base/chat_handler_base.py index 38c4b39..417aeec 100644 --- a/src/api/handlers/base/chat_handler_base.py +++ b/src/api/handlers/base/chat_handler_base.py @@ -298,16 +298,11 @@ class ChatHandlerBase(BaseMessageHandler, ABC): def update_streaming_status() -> None: self._update_usage_to_streaming_with_ctx(ctx) - # 从数据库批量读取流式平滑输出配置(单次查询) - smoothing_cfg = SystemConfigService.get_configs( - self.db, - ["stream_smoothing_enabled", "stream_smoothing_chunk_size", "stream_smoothing_delay_ms"], - ) - smoothing_config = StreamSmoothingConfig( - enabled=bool(smoothing_cfg.get("stream_smoothing_enabled", False)), - chunk_size=int(smoothing_cfg.get("stream_smoothing_chunk_size") or 5), - delay_ms=int(smoothing_cfg.get("stream_smoothing_delay_ms") or 15), + # 读取流式平滑输出开关 + smoothing_enabled = bool( + SystemConfigService.get_config(self.db, "stream_smoothing_enabled", False) ) + smoothing_config = StreamSmoothingConfig(enabled=smoothing_enabled) # 创建流处理器 stream_processor = StreamProcessor( diff --git a/src/api/handlers/base/cli_handler_base.py b/src/api/handlers/base/cli_handler_base.py index 0d8ca16..e4b863e 100644 --- a/src/api/handlers/base/cli_handler_base.py +++ b/src/api/handlers/base/cli_handler_base.py @@ -34,7 +34,9 @@ from src.api.handlers.base.base_handler import ( from src.api.handlers.base.parsers import get_parser_for_format from src.api.handlers.base.request_builder import PassthroughRequestBuilder from src.api.handlers.base.stream_context import StreamContext +from src.api.handlers.base.stream_processor import create_smoothed_stream from src.api.handlers.base.utils import build_sse_headers +from src.services.system.config import SystemConfigService # 直接从具体模块导入,避免循环依赖 from src.api.handlers.base.response_parser import ( @@ -352,8 +354,17 @@ class CliMessageHandlerBase(BaseMessageHandler): # 创建监控流 monitored_stream = self._create_monitored_stream(ctx, stream_generator) + # 创建平滑输出流(如果启用) + smoothing_enabled = bool( + SystemConfigService.get_config(self.db, "stream_smoothing_enabled", False) + ) + if smoothing_enabled: + final_stream = create_smoothed_stream(monitored_stream) + else: + final_stream = monitored_stream + return StreamingResponse( - monitored_stream, + final_stream, media_type="text/event-stream", headers=build_sse_headers(), background=background_tasks, diff --git a/src/api/handlers/base/content_extractors.py b/src/api/handlers/base/content_extractors.py new file mode 100644 index 0000000..99dc78c --- /dev/null +++ b/src/api/handlers/base/content_extractors.py @@ -0,0 +1,274 @@ +""" +流式内容提取器 - 策略模式实现 + +为不同 API 格式(OpenAI、Claude、Gemini)提供内容提取和 chunk 构造的抽象。 +StreamSmoother 使用这些提取器来处理不同格式的 SSE 事件。 +""" + +import copy +import json +from abc import ABC, abstractmethod +from typing import Optional + + +class ContentExtractor(ABC): + """ + 流式内容提取器抽象基类 + + 定义从 SSE 事件中提取文本内容和构造新 chunk 的接口。 + 每种 API 格式(OpenAI、Claude、Gemini)需要实现自己的提取器。 + """ + + @abstractmethod + def extract_content(self, data: dict) -> Optional[str]: + """ + 从 SSE 数据中提取可拆分的文本内容 + + Args: + data: 解析后的 JSON 数据 + + Returns: + 提取的文本内容,如果无法提取则返回 None + """ + pass + + @abstractmethod + def create_chunk( + self, + original_data: dict, + new_content: str, + event_type: str = "", + is_first: bool = False, + ) -> bytes: + """ + 使用新内容构造 SSE chunk + + Args: + original_data: 原始 JSON 数据 + new_content: 新的文本内容 + event_type: SSE 事件类型(某些格式需要) + is_first: 是否是第一个 chunk(用于保留 role 等字段) + + Returns: + 编码后的 SSE 字节数据 + """ + pass + + +class OpenAIContentExtractor(ContentExtractor): + """ + OpenAI 格式内容提取器 + + 处理 OpenAI Chat Completions API 的流式响应格式: + - 数据结构: choices[0].delta.content + - 只在 delta 仅包含 role/content 时允许拆分,避免破坏 tool_calls 等结构 + """ + + def extract_content(self, data: dict) -> Optional[str]: + if not isinstance(data, dict): + return None + + choices = data.get("choices") + if not isinstance(choices, list) or len(choices) != 1: + return None + + first_choice = choices[0] + if not isinstance(first_choice, dict): + return None + + delta = first_choice.get("delta") + if not isinstance(delta, dict): + return None + + content = delta.get("content") + if not isinstance(content, str): + return None + + # 只有 delta 仅包含 role/content 时才允许拆分 + # 避免破坏 tool_calls、function_call 等复杂结构 + allowed_keys = {"role", "content"} + if not all(key in allowed_keys for key in delta.keys()): + return None + + return content + + def create_chunk( + self, + original_data: dict, + new_content: str, + event_type: str = "", + is_first: bool = False, + ) -> bytes: + new_data = original_data.copy() + + if "choices" in new_data and new_data["choices"]: + new_choices = [] + for choice in new_data["choices"]: + new_choice = choice.copy() + if "delta" in new_choice: + new_delta = {} + # 只有第一个 chunk 保留 role + if is_first and "role" in new_choice["delta"]: + new_delta["role"] = new_choice["delta"]["role"] + new_delta["content"] = new_content + new_choice["delta"] = new_delta + new_choices.append(new_choice) + new_data["choices"] = new_choices + + return f"data: {json.dumps(new_data, ensure_ascii=False)}\n\n".encode("utf-8") + + +class ClaudeContentExtractor(ContentExtractor): + """ + Claude 格式内容提取器 + + 处理 Claude Messages API 的流式响应格式: + - 事件类型: content_block_delta + - 数据结构: delta.type=text_delta, delta.text + """ + + def extract_content(self, data: dict) -> Optional[str]: + if not isinstance(data, dict): + return None + + # 检查事件类型 + if data.get("type") != "content_block_delta": + return None + + delta = data.get("delta", {}) + if not isinstance(delta, dict): + return None + + # 检查 delta 类型 + if delta.get("type") != "text_delta": + return None + + text = delta.get("text") + if not isinstance(text, str): + return None + + return text + + def create_chunk( + self, + original_data: dict, + new_content: str, + event_type: str = "", + is_first: bool = False, + ) -> bytes: + new_data = original_data.copy() + + if "delta" in new_data: + new_delta = new_data["delta"].copy() + new_delta["text"] = new_content + new_data["delta"] = new_delta + + # Claude 格式需要 event: 前缀 + event_name = event_type or "content_block_delta" + return f"event: {event_name}\ndata: {json.dumps(new_data, ensure_ascii=False)}\n\n".encode( + "utf-8" + ) + + +class GeminiContentExtractor(ContentExtractor): + """ + Gemini 格式内容提取器 + + 处理 Gemini API 的流式响应格式: + - 数据结构: candidates[0].content.parts[0].text + - 只有纯文本块才拆分 + """ + + def extract_content(self, data: dict) -> Optional[str]: + if not isinstance(data, dict): + return None + + candidates = data.get("candidates") + if not isinstance(candidates, list) or len(candidates) != 1: + return None + + first_candidate = candidates[0] + if not isinstance(first_candidate, dict): + return None + + content = first_candidate.get("content", {}) + if not isinstance(content, dict): + return None + + parts = content.get("parts", []) + if not isinstance(parts, list) or len(parts) != 1: + return None + + first_part = parts[0] + if not isinstance(first_part, dict): + return None + + text = first_part.get("text") + # 只有纯文本块(只有 text 字段)才拆分 + if not isinstance(text, str) or len(first_part) != 1: + return None + + return text + + def create_chunk( + self, + original_data: dict, + new_content: str, + event_type: str = "", + is_first: bool = False, + ) -> bytes: + new_data = copy.deepcopy(original_data) + + if "candidates" in new_data and new_data["candidates"]: + first_candidate = new_data["candidates"][0] + if "content" in first_candidate: + content = first_candidate["content"] + if "parts" in content and content["parts"]: + content["parts"][0]["text"] = new_content + + return f"data: {json.dumps(new_data, ensure_ascii=False)}\n\n".encode("utf-8") + + +# 提取器注册表 +_EXTRACTORS: dict[str, type[ContentExtractor]] = { + "openai": OpenAIContentExtractor, + "claude": ClaudeContentExtractor, + "gemini": GeminiContentExtractor, +} + + +def get_extractor(format_name: str) -> Optional[ContentExtractor]: + """ + 根据格式名获取对应的内容提取器实例 + + Args: + format_name: 格式名称(openai, claude, gemini) + + Returns: + 对应的提取器实例,如果格式不支持则返回 None + """ + extractor_class = _EXTRACTORS.get(format_name.lower()) + if extractor_class: + return extractor_class() + return None + + +def register_extractor(format_name: str, extractor_class: type[ContentExtractor]) -> None: + """ + 注册新的内容提取器 + + Args: + format_name: 格式名称 + extractor_class: 提取器类 + """ + _EXTRACTORS[format_name.lower()] = extractor_class + + +def get_extractor_formats() -> list[str]: + """ + 获取所有已注册的格式名称列表 + + Returns: + 格式名称列表 + """ + return list(_EXTRACTORS.keys()) diff --git a/src/api/handlers/base/stream_processor.py b/src/api/handlers/base/stream_processor.py index 9306a00..9ed2373 100644 --- a/src/api/handlers/base/stream_processor.py +++ b/src/api/handlers/base/stream_processor.py @@ -6,20 +6,26 @@ 2. 响应流生成 3. 预读和嵌套错误检测 4. 客户端断开检测 +5. 流式平滑输出 """ import asyncio import codecs import json +import math from dataclasses import dataclass from typing import Any, AsyncGenerator, Callable, Optional import httpx +from src.api.handlers.base.content_extractors import ( + ContentExtractor, + get_extractor, + get_extractor_formats, +) from src.api.handlers.base.parsers import get_parser_for_format from src.api.handlers.base.response_parser import ResponseParser from src.api.handlers.base.stream_context import StreamContext -from src.api.handlers.base.stream_smoother import StreamSmoother from src.core.exceptions import EmbeddedErrorException from src.core.logger import logger from src.models.database import Provider, ProviderEndpoint @@ -31,18 +37,23 @@ class StreamSmoothingConfig: """流式平滑输出配置""" enabled: bool = False - chunk_size: int = 5 - delay_ms: int = 15 class StreamProcessor: """ 流式响应处理器 - 负责处理 SSE 流的解析、错误检测和响应生成。 + 负责处理 SSE 流的解析、错误检测、响应生成和平滑输出。 从 ChatHandlerBase 中提取,使其职责更加单一。 """ + # 平滑输出参数 + CHUNK_SIZE = 5 # 长文本每块字符数 + MIN_DELAY_MS = 15 # 长文本延迟(毫秒) + MAX_DELAY_MS = 24 # 短文本延迟(毫秒) + SHORT_TEXT_THRESHOLD = 10 # 短文本阈值(逐字符输出) + LONG_TEXT_THRESHOLD = 50 # 长文本阈值(按块输出) + def __init__( self, request_id: str, @@ -68,6 +79,9 @@ class StreamProcessor: self.collect_text = collect_text self.smoothing_config = smoothing_config or StreamSmoothingConfig() + # 内容提取器缓存 + self._extractors: dict[str, ContentExtractor] = {} + def get_parser_for_provider(self, ctx: StreamContext) -> ResponseParser: """ 获取 Provider 格式的解析器 @@ -390,7 +404,7 @@ class StreamProcessor: sse_parser: SSE 解析器 line: 原始行数据 """ - # SSEEventParser 以“去掉换行符”的单行文本作为输入;这里统一剔除 CR/LF, + # SSEEventParser 以"去掉换行符"的单行文本作为输入;这里统一剔除 CR/LF, # 避免把空行误判成 "\n" 并导致事件边界解析错误。 normalized_line = line.rstrip("\r\n") events = sse_parser.feed_line(normalized_line) @@ -489,12 +503,153 @@ class StreamProcessor: return # 启用平滑输出 - smoother = StreamSmoother( - chunk_size=self.smoothing_config.chunk_size, - delay_ms=self.smoothing_config.delay_ms, + buffer = b"" + is_first_content = True + + async for chunk in stream_generator: + buffer += chunk + + # 按双换行分割 SSE 事件(标准 SSE 格式) + while b"\n\n" in buffer: + event_block, buffer = buffer.split(b"\n\n", 1) + event_str = event_block.decode("utf-8", errors="replace") + + # 解析事件块 + lines = event_str.strip().split("\n") + data_str = None + event_type = "" + + for line in lines: + line = line.rstrip("\r") + if line.startswith("event: "): + event_type = line[7:].strip() + elif line.startswith("data: "): + data_str = line[6:] + + # 没有 data 行,直接透传 + if data_str is None: + yield event_block + b"\n\n" + continue + + # [DONE] 直接透传 + if data_str.strip() == "[DONE]": + yield event_block + b"\n\n" + continue + + # 尝试解析 JSON + try: + data = json.loads(data_str) + except json.JSONDecodeError: + yield event_block + b"\n\n" + continue + + # 检测格式并提取内容 + content, extractor = self._detect_format_and_extract(data) + + # 只有内容长度大于 1 才需要平滑处理 + if content and len(content) > 1 and extractor: + # 计算动态延迟 + delay_seconds = self._calculate_delay(len(content)) + + # 智能拆分 + content_chunks = self._split_content(content) + + for i, sub_content in enumerate(content_chunks): + is_first = is_first_content and i == 0 + + # 使用提取器创建新 chunk + sse_chunk = extractor.create_chunk( + data, + sub_content, + event_type=event_type, + is_first=is_first, + ) + + yield sse_chunk + + # 除了最后一个块,其他块之间加延迟 + if i < len(content_chunks) - 1: + await asyncio.sleep(delay_seconds) + + is_first_content = False + else: + # 不需要拆分,直接透传 + yield event_block + b"\n\n" + if content: + is_first_content = False + + # 处理剩余数据 + if buffer: + yield buffer + + def _get_extractor(self, format_name: str) -> Optional[ContentExtractor]: + """获取或创建格式对应的提取器(带缓存)""" + if format_name not in self._extractors: + extractor = get_extractor(format_name) + if extractor: + self._extractors[format_name] = extractor + return self._extractors.get(format_name) + + def _detect_format_and_extract( + self, data: dict + ) -> tuple[Optional[str], Optional[ContentExtractor]]: + """ + 检测数据格式并提取内容 + + 依次尝试各格式的提取器,返回第一个成功提取内容的结果。 + + Returns: + (content, extractor): 提取的内容和对应的提取器 + """ + for format_name in get_extractor_formats(): + extractor = self._get_extractor(format_name) + if extractor: + content = extractor.extract_content(data) + if content is not None: + return content, extractor + + return None, None + + def _calculate_delay(self, text_length: int) -> float: + """ + 根据文本长度计算动态延迟(秒) + + 短文本使用较大延迟(打字感更强),长文本使用较小延迟(避免卡顿)。 + 中间长度使用对数插值平滑过渡。 + """ + if text_length <= self.SHORT_TEXT_THRESHOLD: + return self.MAX_DELAY_MS / 1000.0 + if text_length >= self.LONG_TEXT_THRESHOLD: + return self.MIN_DELAY_MS / 1000.0 + + # 对数插值:平滑过渡 + ratio = math.log(text_length / self.SHORT_TEXT_THRESHOLD) / math.log( + self.LONG_TEXT_THRESHOLD / self.SHORT_TEXT_THRESHOLD ) - async for chunk in smoother.smooth_stream(stream_generator): - yield chunk + delay_ms = self.MAX_DELAY_MS - ratio * (self.MAX_DELAY_MS - self.MIN_DELAY_MS) + return delay_ms / 1000.0 + + def _split_content(self, content: str) -> list[str]: + """ + 根据文本长度智能拆分 + + 短文本:逐字符拆分(打字效果更真实) + 长文本:按 CHUNK_SIZE 拆分(避免过多延迟) + """ + text_length = len(content) + + if text_length <= self.CHUNK_SIZE: + return [content] + + # 长文本按块拆分 + if text_length >= self.LONG_TEXT_THRESHOLD: + chunks = [] + for i in range(0, text_length, self.CHUNK_SIZE): + chunks.append(content[i : i + self.CHUNK_SIZE]) + return chunks + + # 短/中文本逐字符拆分 + return list(content) async def _cleanup( self, @@ -510,3 +665,137 @@ class StreamProcessor: await http_client.aclose() except Exception: pass + + +async def create_smoothed_stream( + stream_generator: AsyncGenerator[bytes, None], +) -> AsyncGenerator[bytes, None]: + """ + 独立的平滑流生成函数 + + 供 CLI handler 等场景使用,无需创建完整的 StreamProcessor 实例。 + + Args: + stream_generator: 原始流生成器 + + Yields: + 平滑处理后的响应数据块 + """ + processor = _LightweightSmoother() + async for chunk in processor.smooth(stream_generator): + yield chunk + + +class _LightweightSmoother: + """ + 轻量级平滑处理器 + + 只包含平滑输出所需的最小逻辑,不依赖 StreamProcessor 的其他功能。 + """ + + CHUNK_SIZE = 5 + MIN_DELAY_MS = 15 + MAX_DELAY_MS = 24 + SHORT_TEXT_THRESHOLD = 10 + LONG_TEXT_THRESHOLD = 50 + + def __init__(self) -> None: + self._extractors: dict[str, ContentExtractor] = {} + + def _get_extractor(self, format_name: str) -> Optional[ContentExtractor]: + if format_name not in self._extractors: + extractor = get_extractor(format_name) + if extractor: + self._extractors[format_name] = extractor + return self._extractors.get(format_name) + + def _detect_format_and_extract( + self, data: dict + ) -> tuple[Optional[str], Optional[ContentExtractor]]: + for format_name in get_extractor_formats(): + extractor = self._get_extractor(format_name) + if extractor: + content = extractor.extract_content(data) + if content is not None: + return content, extractor + return None, None + + def _calculate_delay(self, text_length: int) -> float: + if text_length <= self.SHORT_TEXT_THRESHOLD: + return self.MAX_DELAY_MS / 1000.0 + if text_length >= self.LONG_TEXT_THRESHOLD: + return self.MIN_DELAY_MS / 1000.0 + ratio = math.log(text_length / self.SHORT_TEXT_THRESHOLD) / math.log( + self.LONG_TEXT_THRESHOLD / self.SHORT_TEXT_THRESHOLD + ) + return (self.MAX_DELAY_MS - ratio * (self.MAX_DELAY_MS - self.MIN_DELAY_MS)) / 1000.0 + + def _split_content(self, content: str) -> list[str]: + text_length = len(content) + if text_length <= self.CHUNK_SIZE: + return [content] + if text_length >= self.LONG_TEXT_THRESHOLD: + return [content[i : i + self.CHUNK_SIZE] for i in range(0, text_length, self.CHUNK_SIZE)] + return list(content) + + async def smooth( + self, stream_generator: AsyncGenerator[bytes, None] + ) -> AsyncGenerator[bytes, None]: + buffer = b"" + is_first_content = True + + async for chunk in stream_generator: + buffer += chunk + + while b"\n\n" in buffer: + event_block, buffer = buffer.split(b"\n\n", 1) + event_str = event_block.decode("utf-8", errors="replace") + + lines = event_str.strip().split("\n") + data_str = None + event_type = "" + + for line in lines: + line = line.rstrip("\r") + if line.startswith("event: "): + event_type = line[7:].strip() + elif line.startswith("data: "): + data_str = line[6:] + + if data_str is None: + yield event_block + b"\n\n" + continue + + if data_str.strip() == "[DONE]": + yield event_block + b"\n\n" + continue + + try: + data = json.loads(data_str) + except json.JSONDecodeError: + yield event_block + b"\n\n" + continue + + content, extractor = self._detect_format_and_extract(data) + + if content and len(content) > 1 and extractor: + delay_seconds = self._calculate_delay(len(content)) + content_chunks = self._split_content(content) + + for i, sub_content in enumerate(content_chunks): + is_first = is_first_content and i == 0 + sse_chunk = extractor.create_chunk( + data, sub_content, event_type=event_type, is_first=is_first + ) + yield sse_chunk + if i < len(content_chunks) - 1: + await asyncio.sleep(delay_seconds) + + is_first_content = False + else: + yield event_block + b"\n\n" + if content: + is_first_content = False + + if buffer: + yield buffer diff --git a/src/api/handlers/base/stream_smoother.py b/src/api/handlers/base/stream_smoother.py deleted file mode 100644 index 2f875e6..0000000 --- a/src/api/handlers/base/stream_smoother.py +++ /dev/null @@ -1,257 +0,0 @@ -""" -流式平滑输出处理器 - -将上游返回的大 chunk 拆分成小块,模拟更流畅的打字效果。 -支持 OpenAI、Claude、Gemini 格式的 SSE 事件。 -""" - -import asyncio -import copy -import json -from typing import AsyncGenerator, Optional, Tuple - - -class StreamSmoother: - """ - 流式平滑输出处理器 - - 将 SSE 事件中的大段 content 拆分成小块输出, - 每块之间加入微小延迟,模拟打字效果。 - """ - - def __init__( - self, - chunk_size: int = 5, - delay_ms: int = 15, - ): - """ - 初始化平滑处理器 - - Args: - chunk_size: 每个小块的字符数 - delay_ms: 每个小块之间的延迟毫秒数 - """ - self.chunk_size = chunk_size - self.delay_ms = delay_ms - self.delay_seconds = self.delay_ms / 1000.0 - - def _split_content(self, content: str) -> list[str]: - """ - 将内容按字符数拆分 - - 对于中文等多字节字符,按字符(而非字节)拆分。 - """ - if len(content) <= self.chunk_size: - return [content] - - chunks = [] - for i in range(0, len(content), self.chunk_size): - chunks.append(content[i : i + self.chunk_size]) - return chunks - - def _extract_content(self, data: dict) -> Tuple[Optional[str], str]: - """ - 从 SSE 数据中提取可拆分的 content - - Returns: - (content, format): content 为提取的文本,format 为检测到的格式 - format: "openai" | "claude" | "gemini" | "unknown" - """ - if not isinstance(data, dict): - return None, "unknown" - - # OpenAI 格式: choices[0].delta.content - # 只在 delta 仅包含 role/content 时允许拆分,避免破坏 tool_calls 等结构 - choices = data.get("choices") - if isinstance(choices, list) and len(choices) == 1: - first_choice = choices[0] - if isinstance(first_choice, dict): - delta = first_choice.get("delta") - if isinstance(delta, dict): - content = delta.get("content") - if isinstance(content, str): - allowed_keys = {"role", "content"} - if all(key in allowed_keys for key in delta.keys()): - return content, "openai" - - # Claude 格式: type=content_block_delta, delta.type=text_delta, delta.text - if data.get("type") == "content_block_delta": - delta = data.get("delta", {}) - if isinstance(delta, dict) and delta.get("type") == "text_delta": - text = delta.get("text") - if isinstance(text, str): - return text, "claude" - - # Gemini 格式: candidates[0].content.parts[0].text - candidates = data.get("candidates") - if isinstance(candidates, list) and len(candidates) == 1: - first_candidate = candidates[0] - if isinstance(first_candidate, dict): - content = first_candidate.get("content", {}) - if isinstance(content, dict): - parts = content.get("parts", []) - if isinstance(parts, list) and len(parts) == 1: - first_part = parts[0] - if isinstance(first_part, dict): - text = first_part.get("text") - # 只有纯文本块才拆分 - if isinstance(text, str) and len(first_part) == 1: - return text, "gemini" - - return None, "unknown" - - def _create_openai_chunk( - self, - original_data: dict, - new_content: str, - is_first: bool = False, - ) -> bytes: - """创建 OpenAI 格式的 SSE chunk""" - new_data = original_data.copy() - - if "choices" in new_data and new_data["choices"]: - new_choices = [] - for choice in new_data["choices"]: - new_choice = choice.copy() - if "delta" in new_choice: - new_delta = {} - # 只有第一个 chunk 保留 role - if is_first and "role" in new_choice["delta"]: - new_delta["role"] = new_choice["delta"]["role"] - new_delta["content"] = new_content - new_choice["delta"] = new_delta - new_choices.append(new_choice) - new_data["choices"] = new_choices - - return f"data: {json.dumps(new_data, ensure_ascii=False)}\n\n".encode("utf-8") - - def _create_claude_chunk( - self, - original_data: dict, - new_content: str, - event_type: str, - ) -> bytes: - """创建 Claude 格式的 SSE chunk""" - new_data = original_data.copy() - - if "delta" in new_data: - new_delta = new_data["delta"].copy() - new_delta["text"] = new_content - new_data["delta"] = new_delta - - # Claude 格式需要 event: 前缀 - return f"event: {event_type}\ndata: {json.dumps(new_data, ensure_ascii=False)}\n\n".encode( - "utf-8" - ) - - def _create_gemini_chunk( - self, - original_data: dict, - new_content: str, - ) -> bytes: - """创建 Gemini 格式的 SSE chunk""" - new_data = copy.deepcopy(original_data) - - if "candidates" in new_data and new_data["candidates"]: - first_candidate = new_data["candidates"][0] - if "content" in first_candidate: - content = first_candidate["content"] - if "parts" in content and content["parts"]: - content["parts"][0]["text"] = new_content - - return f"data: {json.dumps(new_data, ensure_ascii=False)}\n\n".encode("utf-8") - - async def smooth_stream( - self, - byte_iterator: AsyncGenerator[bytes, None], - ) -> AsyncGenerator[bytes, None]: - """ - 对字节流进行平滑处理 - - 解析 SSE 事件,拆分大 content,添加延迟后输出。 - - Args: - byte_iterator: 原始字节流 - - Yields: - 平滑处理后的字节块 - """ - buffer = b"" - is_first_content = True - - async for chunk in byte_iterator: - buffer += chunk - - # 按双换行分割 SSE 事件(标准 SSE 格式) - while b"\n\n" in buffer: - event_block, buffer = buffer.split(b"\n\n", 1) - event_str = event_block.decode("utf-8", errors="replace") - - # 解析事件块 - lines = event_str.strip().split("\n") - data_str = None - event_type = "" - - for line in lines: - line = line.rstrip("\r") - if line.startswith("event: "): - event_type = line[7:].strip() - elif line.startswith("data: "): - data_str = line[6:] - - # 没有 data 行,直接透传 - if data_str is None: - yield event_block + b"\n\n" - continue - - # [DONE] 直接透传 - if data_str.strip() == "[DONE]": - yield event_block + b"\n\n" - continue - - # 尝试解析 JSON - try: - data = json.loads(data_str) - except json.JSONDecodeError: - yield event_block + b"\n\n" - continue - - # 提取 content 和格式 - content, fmt = self._extract_content(data) - - if content and len(content) > self.chunk_size: - # 需要拆分 - content_chunks = self._split_content(content) - - for i, sub_content in enumerate(content_chunks): - is_first = is_first_content and i == 0 - - if fmt == "openai": - sse_chunk = self._create_openai_chunk(data, sub_content, is_first) - elif fmt == "claude": - sse_chunk = self._create_claude_chunk( - data, sub_content, event_type or "content_block_delta" - ) - elif fmt == "gemini": - sse_chunk = self._create_gemini_chunk(data, sub_content) - else: - # 未知格式,透传原始事件 - yield event_block + b"\n\n" - break - - yield sse_chunk - - # 除了最后一个块,其他块之间加延迟 - if i < len(content_chunks) - 1: - await asyncio.sleep(self.delay_seconds) - else: - is_first_content = False - else: - # 不需要拆分,直接透传 - yield event_block + b"\n\n" - if content: - is_first_content = False - - # 处理剩余数据 - if buffer: - yield buffer diff --git a/src/services/system/config.py b/src/services/system/config.py index 59c7b2b..3ff9e32 100644 --- a/src/services/system/config.py +++ b/src/services/system/config.py @@ -81,15 +81,7 @@ class SystemConfigService: # 流式平滑输出配置 "stream_smoothing_enabled": { "value": False, - "description": "是否启用流式平滑输出,将大 chunk 拆分成小块模拟打字效果", - }, - "stream_smoothing_chunk_size": { - "value": 5, - "description": "流式平滑输出每个小块的字符数", - }, - "stream_smoothing_delay_ms": { - "value": 15, - "description": "流式平滑输出每个小块之间的延迟毫秒数", + "description": "是否启用流式平滑输出,自动根据文本长度调整输出速度", }, }