From 7e792dabfcebb4decec947abf610ed158fdb500b Mon Sep 17 00:00:00 2001 From: fawney19 Date: Fri, 19 Dec 2025 01:59:56 +0800 Subject: [PATCH] refactor: use background task for client disconnection monitoring - Replace time-based throttling with background task for disconnect checks - Remove time.monotonic() and related throttling logic - Prevent blocking of stream transmission during connection checks - Properly clean up background task with try/finally block - Improve throughput and responsiveness of stream processing --- src/api/handlers/base/stream_processor.py | 36 +++++++++++++++-------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/src/api/handlers/base/stream_processor.py b/src/api/handlers/base/stream_processor.py index a47c2c7..3342ee4 100644 --- a/src/api/handlers/base/stream_processor.py +++ b/src/api/handlers/base/stream_processor.py @@ -11,7 +11,6 @@ import asyncio import codecs import json -import time from typing import Any, AsyncGenerator, Callable, Optional import httpx @@ -407,27 +406,40 @@ class StreamProcessor: 响应数据块 """ try: - # 断连检查频率:每次 await 都会引入调度开销,过于频繁会让流式"发一段停一段" - # 这里按时间间隔节流,兼顾及时停止上游读取与吞吐平滑性。 - next_disconnect_check_at = 0.0 - disconnect_check_interval_s = 0.25 + # 使用后台任务检查断连,完全不阻塞流式传输 + disconnected = False - async for chunk in stream_generator: - now = time.monotonic() - if now >= next_disconnect_check_at: - next_disconnect_check_at = now + disconnect_check_interval_s + async def check_disconnect_background() -> None: + nonlocal disconnected + while not disconnected and not ctx.has_completion: + await asyncio.sleep(0.5) if await is_disconnected(): - # 如果响应已完成(收到 finish_reason),客户端断开不算失败 + disconnected = True + break + + # 启动后台检查任务 + check_task = asyncio.create_task(check_disconnect_background()) + + try: + async for chunk in stream_generator: + if disconnected: + # 如果响应已完成,客户端断开不算失败 if ctx.has_completion: logger.info( f"ID:{self.request_id} | Client disconnected after completion" ) else: logger.warning(f"ID:{self.request_id} | Client disconnected") - ctx.status_code = 499 # Client Closed Request + ctx.status_code = 499 ctx.error_message = "client_disconnected" break - yield chunk + yield chunk + finally: + check_task.cancel() + try: + await check_task + except asyncio.CancelledError: + pass except asyncio.CancelledError: # 如果响应已完成,不标记为失败 if not ctx.has_completion: