refactor: use background task for client disconnection monitoring

- Replace time-based throttling with background task for disconnect checks
- Remove time.monotonic() and related throttling logic
- Prevent blocking of stream transmission during connection checks
- Properly clean up background task with try/finally block
- Improve throughput and responsiveness of stream processing
This commit is contained in:
fawney19
2025-12-19 01:59:56 +08:00
parent cd06169b2f
commit 7e792dabfc

View File

@@ -11,7 +11,6 @@
import asyncio import asyncio
import codecs import codecs
import json import json
import time
from typing import Any, AsyncGenerator, Callable, Optional from typing import Any, AsyncGenerator, Callable, Optional
import httpx import httpx
@@ -407,27 +406,40 @@ class StreamProcessor:
响应数据块 响应数据块
""" """
try: try:
# 断连检查频率:每次 await 都会引入调度开销,过于频繁会让流式"发一段停一段" # 使用后台任务检查断连,完全不阻塞流式传输
# 这里按时间间隔节流,兼顾及时停止上游读取与吞吐平滑性。 disconnected = False
next_disconnect_check_at = 0.0
disconnect_check_interval_s = 0.25
async for chunk in stream_generator: async def check_disconnect_background() -> None:
now = time.monotonic() nonlocal disconnected
if now >= next_disconnect_check_at: while not disconnected and not ctx.has_completion:
next_disconnect_check_at = now + disconnect_check_interval_s await asyncio.sleep(0.5)
if await is_disconnected(): if await is_disconnected():
# 如果响应已完成(收到 finish_reason客户端断开不算失败 disconnected = True
break
# 启动后台检查任务
check_task = asyncio.create_task(check_disconnect_background())
try:
async for chunk in stream_generator:
if disconnected:
# 如果响应已完成,客户端断开不算失败
if ctx.has_completion: if ctx.has_completion:
logger.info( logger.info(
f"ID:{self.request_id} | Client disconnected after completion" f"ID:{self.request_id} | Client disconnected after completion"
) )
else: else:
logger.warning(f"ID:{self.request_id} | Client disconnected") logger.warning(f"ID:{self.request_id} | Client disconnected")
ctx.status_code = 499 # Client Closed Request ctx.status_code = 499
ctx.error_message = "client_disconnected" ctx.error_message = "client_disconnected"
break break
yield chunk yield chunk
finally:
check_task.cancel()
try:
await check_task
except asyncio.CancelledError:
pass
except asyncio.CancelledError: except asyncio.CancelledError:
# 如果响应已完成,不标记为失败 # 如果响应已完成,不标记为失败
if not ctx.has_completion: if not ctx.has_completion: