mirror of
https://github.com/fawney19/Aether.git
synced 2026-01-07 18:22:28 +08:00
refactor: 改进上游错误消息的提取和传递
- 新增 extract_error_message 工具函数,统一错误消息提取逻辑 - 在 HTTPStatusError 异常上附加 upstream_response 属性,保留原始错误 - 优先使用上游响应内容作为错误消息,而非异常字符串表示 - 移除错误消息的长度限制(500/1000 字符) - 修复边界条件检查,使用 startswith 匹配 "Unable to read" 前缀 - 简化 result.py 中的条件判断逻辑
This commit is contained in:
@@ -47,7 +47,6 @@ if TYPE_CHECKING:
|
|||||||
from src.api.handlers.base.stream_context import StreamContext
|
from src.api.handlers.base.stream_context import StreamContext
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class MessageTelemetry:
|
class MessageTelemetry:
|
||||||
"""
|
"""
|
||||||
负责记录 Usage/Audit,避免处理器里重复代码。
|
负责记录 Usage/Audit,避免处理器里重复代码。
|
||||||
|
|||||||
@@ -36,6 +36,7 @@ from src.api.handlers.base.stream_processor import StreamProcessor
|
|||||||
from src.api.handlers.base.stream_telemetry import StreamTelemetryRecorder
|
from src.api.handlers.base.stream_telemetry import StreamTelemetryRecorder
|
||||||
from src.api.handlers.base.utils import build_sse_headers
|
from src.api.handlers.base.utils import build_sse_headers
|
||||||
from src.config.settings import config
|
from src.config.settings import config
|
||||||
|
from src.core.error_utils import extract_error_message
|
||||||
from src.core.exceptions import (
|
from src.core.exceptions import (
|
||||||
EmbeddedErrorException,
|
EmbeddedErrorException,
|
||||||
ProviderAuthException,
|
ProviderAuthException,
|
||||||
@@ -500,6 +501,8 @@ class ChatHandlerBase(BaseMessageHandler, ABC):
|
|||||||
error_text = await self._extract_error_text(e)
|
error_text = await self._extract_error_text(e)
|
||||||
logger.error(f"Provider 返回错误: {e.response.status_code}\n Response: {error_text}")
|
logger.error(f"Provider 返回错误: {e.response.status_code}\n Response: {error_text}")
|
||||||
await http_client.aclose()
|
await http_client.aclose()
|
||||||
|
# 将上游错误信息附加到异常,以便故障转移时能够返回给客户端
|
||||||
|
e.upstream_response = error_text # type: ignore[attr-defined]
|
||||||
raise
|
raise
|
||||||
|
|
||||||
except EmbeddedErrorException:
|
except EmbeddedErrorException:
|
||||||
@@ -549,7 +552,7 @@ class ChatHandlerBase(BaseMessageHandler, ABC):
|
|||||||
model=ctx.model,
|
model=ctx.model,
|
||||||
response_time_ms=response_time_ms,
|
response_time_ms=response_time_ms,
|
||||||
status_code=status_code,
|
status_code=status_code,
|
||||||
error_message=str(error),
|
error_message=extract_error_message(error),
|
||||||
request_headers=original_headers,
|
request_headers=original_headers,
|
||||||
request_body=actual_request_body,
|
request_body=actual_request_body,
|
||||||
is_stream=True,
|
is_stream=True,
|
||||||
@@ -785,7 +788,7 @@ class ChatHandlerBase(BaseMessageHandler, ABC):
|
|||||||
model=model,
|
model=model,
|
||||||
response_time_ms=response_time_ms,
|
response_time_ms=response_time_ms,
|
||||||
status_code=status_code,
|
status_code=status_code,
|
||||||
error_message=str(e),
|
error_message=extract_error_message(e),
|
||||||
request_headers=original_headers,
|
request_headers=original_headers,
|
||||||
request_body=actual_request_body,
|
request_body=actual_request_body,
|
||||||
is_stream=False,
|
is_stream=False,
|
||||||
@@ -802,10 +805,10 @@ class ChatHandlerBase(BaseMessageHandler, ABC):
|
|||||||
try:
|
try:
|
||||||
if hasattr(e.response, "is_stream_consumed") and not e.response.is_stream_consumed:
|
if hasattr(e.response, "is_stream_consumed") and not e.response.is_stream_consumed:
|
||||||
error_bytes = await e.response.aread()
|
error_bytes = await e.response.aread()
|
||||||
return error_bytes.decode("utf-8", errors="replace")[:500]
|
return error_bytes.decode("utf-8", errors="replace")
|
||||||
else:
|
else:
|
||||||
return (
|
return (
|
||||||
e.response.text[:500] if hasattr(e.response, "_content") else "Unable to read"
|
e.response.text if hasattr(e.response, "_content") else "Unable to read"
|
||||||
)
|
)
|
||||||
except Exception as decode_error:
|
except Exception as decode_error:
|
||||||
return f"Unable to read error: {decode_error}"
|
return f"Unable to read error: {decode_error}"
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ from src.api.handlers.base.parsers import get_parser_for_format
|
|||||||
from src.api.handlers.base.request_builder import PassthroughRequestBuilder
|
from src.api.handlers.base.request_builder import PassthroughRequestBuilder
|
||||||
from src.api.handlers.base.stream_context import StreamContext
|
from src.api.handlers.base.stream_context import StreamContext
|
||||||
from src.api.handlers.base.utils import build_sse_headers
|
from src.api.handlers.base.utils import build_sse_headers
|
||||||
|
from src.core.error_utils import extract_error_message
|
||||||
|
|
||||||
# 直接从具体模块导入,避免循环依赖
|
# 直接从具体模块导入,避免循环依赖
|
||||||
from src.api.handlers.base.response_parser import (
|
from src.api.handlers.base.response_parser import (
|
||||||
@@ -488,6 +489,8 @@ class CliMessageHandlerBase(BaseMessageHandler):
|
|||||||
error_text = await self._extract_error_text(e)
|
error_text = await self._extract_error_text(e)
|
||||||
logger.error(f"Provider 返回错误状态: {e.response.status_code}\n Response: {error_text}")
|
logger.error(f"Provider 返回错误状态: {e.response.status_code}\n Response: {error_text}")
|
||||||
await http_client.aclose()
|
await http_client.aclose()
|
||||||
|
# 将上游错误信息附加到异常,以便故障转移时能够返回给客户端
|
||||||
|
e.upstream_response = error_text # type: ignore[attr-defined]
|
||||||
raise
|
raise
|
||||||
|
|
||||||
except EmbeddedErrorException:
|
except EmbeddedErrorException:
|
||||||
@@ -1359,7 +1362,7 @@ class CliMessageHandlerBase(BaseMessageHandler):
|
|||||||
model=ctx.model,
|
model=ctx.model,
|
||||||
response_time_ms=response_time_ms,
|
response_time_ms=response_time_ms,
|
||||||
status_code=status_code,
|
status_code=status_code,
|
||||||
error_message=str(error),
|
error_message=extract_error_message(error),
|
||||||
request_headers=original_headers,
|
request_headers=original_headers,
|
||||||
request_body=actual_request_body,
|
request_body=actual_request_body,
|
||||||
is_stream=True,
|
is_stream=True,
|
||||||
@@ -1627,7 +1630,7 @@ class CliMessageHandlerBase(BaseMessageHandler):
|
|||||||
model=model,
|
model=model,
|
||||||
response_time_ms=response_time_ms,
|
response_time_ms=response_time_ms,
|
||||||
status_code=status_code,
|
status_code=status_code,
|
||||||
error_message=str(e),
|
error_message=extract_error_message(e),
|
||||||
request_headers=original_headers,
|
request_headers=original_headers,
|
||||||
request_body=actual_request_body,
|
request_body=actual_request_body,
|
||||||
is_stream=False,
|
is_stream=False,
|
||||||
@@ -1647,14 +1650,14 @@ class CliMessageHandlerBase(BaseMessageHandler):
|
|||||||
|
|
||||||
for encoding in ["utf-8", "gbk", "latin1"]:
|
for encoding in ["utf-8", "gbk", "latin1"]:
|
||||||
try:
|
try:
|
||||||
return error_bytes.decode(encoding)[:500]
|
return error_bytes.decode(encoding)
|
||||||
except (UnicodeDecodeError, LookupError):
|
except (UnicodeDecodeError, LookupError):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
return error_bytes.decode("utf-8", errors="replace")[:500]
|
return error_bytes.decode("utf-8", errors="replace")
|
||||||
else:
|
else:
|
||||||
return (
|
return (
|
||||||
e.response.text[:500]
|
e.response.text
|
||||||
if hasattr(e.response, "_content")
|
if hasattr(e.response, "_content")
|
||||||
else "Unable to read response"
|
else "Unable to read response"
|
||||||
)
|
)
|
||||||
|
|||||||
28
src/core/error_utils.py
Normal file
28
src/core/error_utils.py
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
"""
|
||||||
|
错误消息处理工具函数
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
def extract_error_message(error: Exception, status_code: Optional[int] = None) -> str:
|
||||||
|
"""
|
||||||
|
从异常中提取错误消息,优先使用上游响应内容
|
||||||
|
|
||||||
|
Args:
|
||||||
|
error: 异常对象
|
||||||
|
status_code: 可选的 HTTP 状态码,用于构建更详细的错误消息
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
错误消息字符串
|
||||||
|
"""
|
||||||
|
# 优先使用 upstream_response 属性(包含上游 Provider 的原始错误)
|
||||||
|
upstream_response = getattr(error, "upstream_response", None)
|
||||||
|
if upstream_response and isinstance(upstream_response, str) and upstream_response.strip():
|
||||||
|
return str(upstream_response)
|
||||||
|
|
||||||
|
# 回退到异常的字符串表示(str 可能为空,如 httpx 超时异常)
|
||||||
|
error_str = str(error) or repr(error)
|
||||||
|
if status_code is not None:
|
||||||
|
return f"HTTP {status_code}: {error_str}"
|
||||||
|
return error_str
|
||||||
@@ -237,7 +237,7 @@ class ErrorClassifier:
|
|||||||
result["reason"] = str(data.get("reason", data.get("code", "")))
|
result["reason"] = str(data.get("reason", data.get("code", "")))
|
||||||
|
|
||||||
except (json.JSONDecodeError, TypeError, KeyError):
|
except (json.JSONDecodeError, TypeError, KeyError):
|
||||||
result["message"] = error_text[:500] if len(error_text) > 500 else error_text
|
result["message"] = error_text
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
@@ -323,8 +323,8 @@ class ErrorClassifier:
|
|||||||
if parts:
|
if parts:
|
||||||
return ": ".join(parts) if len(parts) > 1 else parts[0]
|
return ": ".join(parts) if len(parts) > 1 else parts[0]
|
||||||
|
|
||||||
# 无法解析,返回原始文本(截断)
|
# 无法解析,返回原始文本
|
||||||
return parsed["raw"][:500] if len(parsed["raw"]) > 500 else parsed["raw"]
|
return parsed["raw"]
|
||||||
|
|
||||||
def classify(
|
def classify(
|
||||||
self,
|
self,
|
||||||
@@ -484,11 +484,15 @@ class ErrorClassifier:
|
|||||||
return ProviderNotAvailableException(
|
return ProviderNotAvailableException(
|
||||||
message=detailed_message,
|
message=detailed_message,
|
||||||
provider_name=provider_name,
|
provider_name=provider_name,
|
||||||
|
upstream_status=status,
|
||||||
|
upstream_response=error_response_text,
|
||||||
)
|
)
|
||||||
|
|
||||||
return ProviderNotAvailableException(
|
return ProviderNotAvailableException(
|
||||||
message=detailed_message,
|
message=detailed_message,
|
||||||
provider_name=provider_name,
|
provider_name=provider_name,
|
||||||
|
upstream_status=status,
|
||||||
|
upstream_response=error_response_text,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def handle_http_error(
|
async def handle_http_error(
|
||||||
@@ -532,12 +536,14 @@ class ErrorClassifier:
|
|||||||
provider_name = str(provider.name)
|
provider_name = str(provider.name)
|
||||||
|
|
||||||
# 尝试读取错误响应内容
|
# 尝试读取错误响应内容
|
||||||
error_response_text = None
|
# 优先使用 handler 附加的 upstream_response 属性(流式请求中 response.text 可能为空)
|
||||||
try:
|
error_response_text = getattr(http_error, "upstream_response", None)
|
||||||
if http_error.response and hasattr(http_error.response, "text"):
|
if not error_response_text:
|
||||||
error_response_text = http_error.response.text[:1000] # 限制长度
|
try:
|
||||||
except Exception:
|
if http_error.response and hasattr(http_error.response, "text"):
|
||||||
pass
|
error_response_text = http_error.response.text
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
logger.warning(f" [{request_id}] HTTP错误 (attempt={attempt}/{max_attempts}): "
|
logger.warning(f" [{request_id}] HTTP错误 (attempt={attempt}/{max_attempts}): "
|
||||||
f"{http_error.response.status_code if http_error.response else 'unknown'}")
|
f"{http_error.response.status_code if http_error.response else 'unknown'}")
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ from redis import Redis
|
|||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
from src.core.enums import APIFormat
|
from src.core.enums import APIFormat
|
||||||
|
from src.core.error_utils import extract_error_message
|
||||||
from src.core.exceptions import (
|
from src.core.exceptions import (
|
||||||
ConcurrencyLimitError,
|
ConcurrencyLimitError,
|
||||||
ProviderNotAvailableException,
|
ProviderNotAvailableException,
|
||||||
@@ -401,7 +402,7 @@ class FallbackOrchestrator:
|
|||||||
db=self.db,
|
db=self.db,
|
||||||
candidate_id=candidate_record_id,
|
candidate_id=candidate_record_id,
|
||||||
error_type="HTTPStatusError",
|
error_type="HTTPStatusError",
|
||||||
error_message=f"HTTP {status_code}: {str(cause)}",
|
error_message=extract_error_message(cause, status_code),
|
||||||
status_code=status_code,
|
status_code=status_code,
|
||||||
latency_ms=elapsed_ms,
|
latency_ms=elapsed_ms,
|
||||||
concurrent_requests=captured_key_concurrent,
|
concurrent_requests=captured_key_concurrent,
|
||||||
@@ -425,31 +426,22 @@ class FallbackOrchestrator:
|
|||||||
attempt=attempt,
|
attempt=attempt,
|
||||||
max_attempts=max_attempts,
|
max_attempts=max_attempts,
|
||||||
)
|
)
|
||||||
# str(cause) 可能为空(如 httpx 超时异常),使用 repr() 作为备用
|
|
||||||
error_msg = str(cause) or repr(cause)
|
|
||||||
# 如果是 ProviderNotAvailableException,附加上游响应
|
|
||||||
if hasattr(cause, "upstream_response") and cause.upstream_response:
|
|
||||||
error_msg = f"{error_msg} | 上游响应: {cause.upstream_response[:500]}"
|
|
||||||
RequestCandidateService.mark_candidate_failed(
|
RequestCandidateService.mark_candidate_failed(
|
||||||
db=self.db,
|
db=self.db,
|
||||||
candidate_id=candidate_record_id,
|
candidate_id=candidate_record_id,
|
||||||
error_type=type(cause).__name__,
|
error_type=type(cause).__name__,
|
||||||
error_message=error_msg,
|
error_message=extract_error_message(cause),
|
||||||
latency_ms=elapsed_ms,
|
latency_ms=elapsed_ms,
|
||||||
concurrent_requests=captured_key_concurrent,
|
concurrent_requests=captured_key_concurrent,
|
||||||
)
|
)
|
||||||
return "continue" if has_retry_left else "break"
|
return "continue" if has_retry_left else "break"
|
||||||
|
|
||||||
# 未知错误:记录失败并抛出
|
# 未知错误:记录失败并抛出
|
||||||
error_msg = str(cause) or repr(cause)
|
|
||||||
# 如果是 ProviderNotAvailableException,附加上游响应
|
|
||||||
if hasattr(cause, "upstream_response") and cause.upstream_response:
|
|
||||||
error_msg = f"{error_msg} | 上游响应: {cause.upstream_response[:500]}"
|
|
||||||
RequestCandidateService.mark_candidate_failed(
|
RequestCandidateService.mark_candidate_failed(
|
||||||
db=self.db,
|
db=self.db,
|
||||||
candidate_id=candidate_record_id,
|
candidate_id=candidate_record_id,
|
||||||
error_type=type(cause).__name__,
|
error_type=type(cause).__name__,
|
||||||
error_message=error_msg,
|
error_message=extract_error_message(cause),
|
||||||
latency_ms=elapsed_ms,
|
latency_ms=elapsed_ms,
|
||||||
concurrent_requests=captured_key_concurrent,
|
concurrent_requests=captured_key_concurrent,
|
||||||
)
|
)
|
||||||
@@ -706,15 +698,25 @@ class FallbackOrchestrator:
|
|||||||
# 从 httpx.HTTPStatusError 提取
|
# 从 httpx.HTTPStatusError 提取
|
||||||
if isinstance(last_error, httpx.HTTPStatusError):
|
if isinstance(last_error, httpx.HTTPStatusError):
|
||||||
upstream_status = last_error.response.status_code
|
upstream_status = last_error.response.status_code
|
||||||
try:
|
# 优先使用我们附加的 upstream_response 属性(流已读取时 response.text 可能为空)
|
||||||
upstream_response = last_error.response.text
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
# 从异常属性提取
|
|
||||||
elif hasattr(last_error, "upstream_status"):
|
|
||||||
upstream_status = getattr(last_error, "upstream_status", None)
|
|
||||||
if hasattr(last_error, "upstream_response"):
|
|
||||||
upstream_response = getattr(last_error, "upstream_response", None)
|
upstream_response = getattr(last_error, "upstream_response", None)
|
||||||
|
if not upstream_response:
|
||||||
|
try:
|
||||||
|
upstream_response = last_error.response.text
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
# 从其他异常属性提取(如 ProviderNotAvailableException)
|
||||||
|
else:
|
||||||
|
upstream_status = getattr(last_error, "upstream_status", None)
|
||||||
|
upstream_response = getattr(last_error, "upstream_response", None)
|
||||||
|
|
||||||
|
# 如果响应为空或无效,使用异常的字符串表示
|
||||||
|
if (
|
||||||
|
not upstream_response
|
||||||
|
or not upstream_response.strip()
|
||||||
|
or upstream_response.startswith("Unable to read")
|
||||||
|
):
|
||||||
|
upstream_response = str(last_error)
|
||||||
|
|
||||||
raise ProviderNotAvailableException(
|
raise ProviderNotAvailableException(
|
||||||
f"所有Provider均不可用,已尝试{max_attempts}个组合",
|
f"所有Provider均不可用,已尝试{max_attempts}个组合",
|
||||||
|
|||||||
@@ -289,11 +289,11 @@ class RequestResult:
|
|||||||
status_code = 500
|
status_code = 500
|
||||||
error_type = "internal_error"
|
error_type = "internal_error"
|
||||||
|
|
||||||
# 构建错误消息,包含上游响应信息
|
# 构建错误消息:优先使用上游响应作为主要错误信息
|
||||||
error_message = str(exception)
|
if isinstance(exception, ProviderNotAvailableException) and exception.upstream_response:
|
||||||
if isinstance(exception, ProviderNotAvailableException):
|
error_message = exception.upstream_response
|
||||||
if exception.upstream_response:
|
else:
|
||||||
error_message = f"{error_message} | 上游响应: {exception.upstream_response[:500]}"
|
error_message = str(exception)
|
||||||
|
|
||||||
return cls(
|
return cls(
|
||||||
status=RequestStatus.FAILED,
|
status=RequestStatus.FAILED,
|
||||||
|
|||||||
Reference in New Issue
Block a user