perf: 优化 HTTP 客户端连接池复用

- 新增 get_proxy_client() 方法,相同代理配置复用同一客户端
- 添加 LRU 淘汰策略,代理客户端上限 50 个防止内存泄漏
- 新增 get_default_client_async() 异步线程安全版本
- 使用模块级锁避免类属性初始化竞态条件
- 优化 ConcurrencyManager 使用 Redis MGET 批量获取减少往返
- 添加 get_pool_stats() 连接池统计信息接口
This commit is contained in:
fawney19
2026-01-08 13:34:59 +08:00
parent bf09e740e9
commit ea35efe440
4 changed files with 332 additions and 114 deletions

View File

@@ -691,17 +691,23 @@ class ChatHandlerBase(BaseMessageHandler, ABC):
f" [{self.request_id}] 请求体stream字段: {provider_payload.get('stream', 'N/A')}" f" [{self.request_id}] 请求体stream字段: {provider_payload.get('stream', 'N/A')}"
) )
# 创建 HTTP 客户端(支持代理配置) # 获取复用的 HTTP 客户端(支持代理配置)
# endpoint.timeout 作为整体请求超时 # 注意:使用 get_proxy_client 复用连接池,不再每次创建新客户端
from src.clients.http_client import HTTPClientPool from src.clients.http_client import HTTPClientPool
request_timeout = float(endpoint.timeout or 300) request_timeout = float(endpoint.timeout or 300)
http_client = HTTPClientPool.create_client_with_proxy( http_client = await HTTPClientPool.get_proxy_client(
proxy_config=endpoint.proxy, proxy_config=endpoint.proxy,
)
# 注意:不使用 async with因为复用的客户端不应该被关闭
# 超时通过 timeout 参数控制
resp = await http_client.post(
url,
json=provider_payload,
headers=provider_hdrs,
timeout=httpx.Timeout(request_timeout), timeout=httpx.Timeout(request_timeout),
) )
async with http_client:
resp = await http_client.post(url, json=provider_payload, headers=provider_hdrs)
status_code = resp.status_code status_code = resp.status_code
response_headers = dict(resp.headers) response_headers = dict(resp.headers)

View File

@@ -1534,17 +1534,23 @@ class CliMessageHandlerBase(BaseMessageHandler):
f"原始模型={model}, 映射后={mapped_model or '无映射'}, URL模型={url_model}" f"原始模型={model}, 映射后={mapped_model or '无映射'}, URL模型={url_model}"
) )
# 创建 HTTP 客户端(支持代理配置) # 获取复用的 HTTP 客户端(支持代理配置)
# endpoint.timeout 作为整体请求超时 # 注意:使用 get_proxy_client 复用连接池,不再每次创建新客户端
from src.clients.http_client import HTTPClientPool from src.clients.http_client import HTTPClientPool
request_timeout = float(endpoint.timeout or 300) request_timeout = float(endpoint.timeout or 300)
http_client = HTTPClientPool.create_client_with_proxy( http_client = await HTTPClientPool.get_proxy_client(
proxy_config=endpoint.proxy, proxy_config=endpoint.proxy,
)
# 注意:不使用 async with因为复用的客户端不应该被关闭
# 超时通过 timeout 参数控制
resp = await http_client.post(
url,
json=provider_payload,
headers=provider_headers,
timeout=httpx.Timeout(request_timeout), timeout=httpx.Timeout(request_timeout),
) )
async with http_client:
resp = await http_client.post(url, json=provider_payload, headers=provider_headers)
status_code = resp.status_code status_code = resp.status_code
response_headers = dict(resp.headers) response_headers = dict(resp.headers)

View File

@@ -1,10 +1,18 @@
""" """
全局HTTP客户端池管理 全局HTTP客户端池管理
避免每次请求都创建新的AsyncClient,提高性能 避免每次请求都创建新的AsyncClient,提高性能
性能优化说明:
1. 默认客户端:无代理场景,全局复用单一客户端
2. 代理客户端缓存:相同代理配置复用同一客户端,避免重复创建
3. 连接池复用Keep-alive 连接减少 TCP 握手开销
""" """
import asyncio
import hashlib
import time
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Any, Dict, Optional from typing import Any, Dict, Optional, Tuple
from urllib.parse import quote, urlparse from urllib.parse import quote, urlparse
import httpx import httpx
@@ -12,6 +20,32 @@ import httpx
from src.config import config from src.config import config
from src.core.logger import logger from src.core.logger import logger
# 模块级锁,避免类属性延迟初始化的竞态条件
_proxy_clients_lock = asyncio.Lock()
_default_client_lock = asyncio.Lock()
def _compute_proxy_cache_key(proxy_config: Optional[Dict[str, Any]]) -> str:
"""
计算代理配置的缓存键
Args:
proxy_config: 代理配置字典
Returns:
缓存键字符串,无代理时返回 "__no_proxy__"
"""
if not proxy_config:
return "__no_proxy__"
# 构建代理 URL 作为缓存键的基础
proxy_url = build_proxy_url(proxy_config)
if not proxy_url:
return "__no_proxy__"
# 使用 MD5 哈希来避免过长的键名
return f"proxy:{hashlib.md5(proxy_url.encode()).hexdigest()[:16]}"
def build_proxy_url(proxy_config: Dict[str, Any]) -> Optional[str]: def build_proxy_url(proxy_config: Dict[str, Any]) -> Optional[str]:
""" """
@@ -61,11 +95,20 @@ class HTTPClientPool:
全局HTTP客户端池单例 全局HTTP客户端池单例
管理可重用的httpx.AsyncClient实例,避免频繁创建/销毁连接 管理可重用的httpx.AsyncClient实例,避免频繁创建/销毁连接
性能优化:
1. 默认客户端:无代理场景复用
2. 代理客户端缓存:相同代理配置复用同一客户端
3. LRU 淘汰:代理客户端超过上限时淘汰最久未使用的
""" """
_instance: Optional["HTTPClientPool"] = None _instance: Optional["HTTPClientPool"] = None
_default_client: Optional[httpx.AsyncClient] = None _default_client: Optional[httpx.AsyncClient] = None
_clients: Dict[str, httpx.AsyncClient] = {} _clients: Dict[str, httpx.AsyncClient] = {}
# 代理客户端缓存:{cache_key: (client, last_used_time)}
_proxy_clients: Dict[str, Tuple[httpx.AsyncClient, float]] = {}
# 代理客户端缓存上限(避免内存泄漏)
_max_proxy_clients: int = 50
def __new__(cls): def __new__(cls):
if cls._instance is None: if cls._instance is None:
@@ -73,12 +116,50 @@ class HTTPClientPool:
return cls._instance return cls._instance
@classmethod @classmethod
def get_default_client(cls) -> httpx.AsyncClient: async def get_default_client_async(cls) -> httpx.AsyncClient:
""" """
获取默认的HTTP客户端 获取默认的HTTP客户端(异步线程安全版本)
用于大多数HTTP请求,具有合理的默认配置 用于大多数HTTP请求,具有合理的默认配置
""" """
if cls._default_client is not None:
return cls._default_client
async with _default_client_lock:
# 双重检查,避免重复创建
if cls._default_client is None:
cls._default_client = httpx.AsyncClient(
http2=False, # 暂时禁用HTTP/2以提高兼容性
verify=True, # 启用SSL验证
timeout=httpx.Timeout(
connect=config.http_connect_timeout,
read=config.http_read_timeout,
write=config.http_write_timeout,
pool=config.http_pool_timeout,
),
limits=httpx.Limits(
max_connections=config.http_max_connections,
max_keepalive_connections=config.http_keepalive_connections,
keepalive_expiry=config.http_keepalive_expiry,
),
follow_redirects=True, # 跟随重定向
)
logger.info(
f"全局HTTP客户端池已初始化: "
f"max_connections={config.http_max_connections}, "
f"keepalive={config.http_keepalive_connections}, "
f"keepalive_expiry={config.http_keepalive_expiry}s"
)
return cls._default_client
@classmethod
def get_default_client(cls) -> httpx.AsyncClient:
"""
获取默认的HTTP客户端同步版本向后兼容
⚠️ 注意:此方法在高并发首次调用时可能存在竞态条件,
推荐使用 get_default_client_async() 异步版本。
"""
if cls._default_client is None: if cls._default_client is None:
cls._default_client = httpx.AsyncClient( cls._default_client = httpx.AsyncClient(
http2=False, # 暂时禁用HTTP/2以提高兼容性 http2=False, # 暂时禁用HTTP/2以提高兼容性
@@ -135,6 +216,101 @@ class HTTPClientPool:
return cls._clients[name] return cls._clients[name]
@classmethod
def _get_proxy_clients_lock(cls) -> asyncio.Lock:
"""获取代理客户端缓存锁(模块级单例,避免竞态条件)"""
return _proxy_clients_lock
@classmethod
async def _evict_lru_proxy_client(cls) -> None:
"""淘汰最久未使用的代理客户端"""
if len(cls._proxy_clients) < cls._max_proxy_clients:
return
# 找到最久未使用的客户端
oldest_key = min(cls._proxy_clients.keys(), key=lambda k: cls._proxy_clients[k][1])
old_client, _ = cls._proxy_clients.pop(oldest_key)
# 异步关闭旧客户端
try:
await old_client.aclose()
logger.debug(f"淘汰代理客户端: {oldest_key}")
except Exception as e:
logger.warning(f"关闭代理客户端失败: {e}")
@classmethod
async def get_proxy_client(
cls,
proxy_config: Optional[Dict[str, Any]] = None,
) -> httpx.AsyncClient:
"""
获取代理客户端(带缓存复用)
相同代理配置会复用同一个客户端,大幅减少连接建立开销。
注意:返回的客户端使用默认超时配置,如需自定义超时请在请求时传递 timeout 参数。
Args:
proxy_config: 代理配置字典,包含 url, username, password
Returns:
可复用的 httpx.AsyncClient 实例
"""
cache_key = _compute_proxy_cache_key(proxy_config)
# 无代理时返回默认客户端
if cache_key == "__no_proxy__":
return await cls.get_default_client_async()
lock = cls._get_proxy_clients_lock()
async with lock:
# 检查缓存
if cache_key in cls._proxy_clients:
client, _ = cls._proxy_clients[cache_key]
# 健康检查:如果客户端已关闭,移除并重新创建
if client.is_closed:
del cls._proxy_clients[cache_key]
logger.debug(f"代理客户端已关闭,将重新创建: {cache_key}")
else:
# 更新最后使用时间
cls._proxy_clients[cache_key] = (client, time.time())
return client
# 淘汰旧客户端(如果超过上限)
await cls._evict_lru_proxy_client()
# 创建新客户端(使用默认超时,请求时可覆盖)
client_config: Dict[str, Any] = {
"http2": False,
"verify": True,
"follow_redirects": True,
"limits": httpx.Limits(
max_connections=config.http_max_connections,
max_keepalive_connections=config.http_keepalive_connections,
keepalive_expiry=config.http_keepalive_expiry,
),
"timeout": httpx.Timeout(
connect=config.http_connect_timeout,
read=config.http_read_timeout,
write=config.http_write_timeout,
pool=config.http_pool_timeout,
),
}
# 添加代理配置
proxy_url = build_proxy_url(proxy_config) if proxy_config else None
if proxy_url:
client_config["proxy"] = proxy_url
client = httpx.AsyncClient(**client_config)
cls._proxy_clients[cache_key] = (client, time.time())
logger.debug(
f"创建代理客户端(缓存): {proxy_config.get('url', 'unknown') if proxy_config else 'none'}, "
f"缓存数量: {len(cls._proxy_clients)}"
)
return client
@classmethod @classmethod
async def close_all(cls): async def close_all(cls):
"""关闭所有HTTP客户端""" """关闭所有HTTP客户端"""
@@ -148,6 +324,16 @@ class HTTPClientPool:
logger.debug(f"命名HTTP客户端已关闭: {name}") logger.debug(f"命名HTTP客户端已关闭: {name}")
cls._clients.clear() cls._clients.clear()
# 关闭代理客户端缓存
for cache_key, (client, _) in cls._proxy_clients.items():
try:
await client.aclose()
logger.debug(f"代理客户端已关闭: {cache_key}")
except Exception as e:
logger.warning(f"关闭代理客户端失败: {e}")
cls._proxy_clients.clear()
logger.info("所有HTTP客户端已关闭") logger.info("所有HTTP客户端已关闭")
@classmethod @classmethod
@@ -190,13 +376,15 @@ class HTTPClientPool:
""" """
创建带代理配置的HTTP客户端 创建带代理配置的HTTP客户端
⚠️ 性能警告:此方法每次都创建新客户端,推荐使用 get_proxy_client() 复用连接。
Args: Args:
proxy_config: 代理配置字典,包含 url, username, password proxy_config: 代理配置字典,包含 url, username, password
timeout: 超时配置 timeout: 超时配置
**kwargs: 其他 httpx.AsyncClient 配置参数 **kwargs: 其他 httpx.AsyncClient 配置参数
Returns: Returns:
配置好的 httpx.AsyncClient 实例 配置好的 httpx.AsyncClient 实例(调用者需要负责关闭)
""" """
client_config: Dict[str, Any] = { client_config: Dict[str, Any] = {
"http2": False, "http2": False,
@@ -218,11 +406,21 @@ class HTTPClientPool:
proxy_url = build_proxy_url(proxy_config) if proxy_config else None proxy_url = build_proxy_url(proxy_config) if proxy_config else None
if proxy_url: if proxy_url:
client_config["proxy"] = proxy_url client_config["proxy"] = proxy_url
logger.debug(f"创建带代理的HTTP客户端: {proxy_config.get('url', 'unknown')}") logger.debug(f"创建带代理的HTTP客户端(一次性): {proxy_config.get('url', 'unknown')}")
client_config.update(kwargs) client_config.update(kwargs)
return httpx.AsyncClient(**client_config) return httpx.AsyncClient(**client_config)
@classmethod
def get_pool_stats(cls) -> Dict[str, Any]:
"""获取连接池统计信息"""
return {
"default_client_active": cls._default_client is not None,
"named_clients_count": len(cls._clients),
"proxy_clients_count": len(cls._proxy_clients),
"max_proxy_clients": cls._max_proxy_clients,
}
# 便捷访问函数 # 便捷访问函数
def get_http_client() -> httpx.AsyncClient: def get_http_client() -> httpx.AsyncClient:

View File

@@ -85,6 +85,8 @@ class ConcurrencyManager:
""" """
获取当前并发数 获取当前并发数
性能优化:使用 MGET 批量获取,减少 Redis 往返次数
Args: Args:
endpoint_id: Endpoint ID可选 endpoint_id: Endpoint ID可选
key_id: ProviderAPIKey ID可选 key_id: ProviderAPIKey ID可选
@@ -104,15 +106,21 @@ class ConcurrencyManager:
key_count = 0 key_count = 0
try: try:
# 使用 MGET 批量获取,减少 Redis 往返2 次 GET -> 1 次 MGET
keys_to_fetch = []
if endpoint_id: if endpoint_id:
endpoint_key = self._get_endpoint_key(endpoint_id) keys_to_fetch.append(self._get_endpoint_key(endpoint_id))
result = await self._redis.get(endpoint_key)
endpoint_count = int(result) if result else 0
if key_id: if key_id:
key_key = self._get_key_key(key_id) keys_to_fetch.append(self._get_key_key(key_id))
result = await self._redis.get(key_key)
key_count = int(result) if result else 0 if keys_to_fetch:
results = await self._redis.mget(keys_to_fetch)
idx = 0
if endpoint_id:
endpoint_count = int(results[idx]) if results[idx] else 0
idx += 1
if key_id:
key_count = int(results[idx]) if results[idx] else 0
except Exception as e: except Exception as e:
logger.error(f"获取并发数失败: {e}") logger.error(f"获取并发数失败: {e}")