diff --git a/src/api/handlers/base/base_handler.py b/src/api/handlers/base/base_handler.py index 87de1b3..c0cae63 100644 --- a/src/api/handlers/base/base_handler.py +++ b/src/api/handlers/base/base_handler.py @@ -47,7 +47,6 @@ if TYPE_CHECKING: from src.api.handlers.base.stream_context import StreamContext - class MessageTelemetry: """ 负责记录 Usage/Audit,避免处理器里重复代码。 diff --git a/src/api/handlers/base/chat_handler_base.py b/src/api/handlers/base/chat_handler_base.py index 76225ef..46fd472 100644 --- a/src/api/handlers/base/chat_handler_base.py +++ b/src/api/handlers/base/chat_handler_base.py @@ -36,6 +36,7 @@ from src.api.handlers.base.stream_processor import StreamProcessor from src.api.handlers.base.stream_telemetry import StreamTelemetryRecorder from src.api.handlers.base.utils import build_sse_headers from src.config.settings import config +from src.core.error_utils import extract_error_message from src.core.exceptions import ( EmbeddedErrorException, ProviderAuthException, @@ -500,6 +501,8 @@ class ChatHandlerBase(BaseMessageHandler, ABC): error_text = await self._extract_error_text(e) logger.error(f"Provider 返回错误: {e.response.status_code}\n Response: {error_text}") await http_client.aclose() + # 将上游错误信息附加到异常,以便故障转移时能够返回给客户端 + e.upstream_response = error_text # type: ignore[attr-defined] raise except EmbeddedErrorException: @@ -549,7 +552,7 @@ class ChatHandlerBase(BaseMessageHandler, ABC): model=ctx.model, response_time_ms=response_time_ms, status_code=status_code, - error_message=str(error), + error_message=extract_error_message(error), request_headers=original_headers, request_body=actual_request_body, is_stream=True, @@ -785,7 +788,7 @@ class ChatHandlerBase(BaseMessageHandler, ABC): model=model, response_time_ms=response_time_ms, status_code=status_code, - error_message=str(e), + error_message=extract_error_message(e), request_headers=original_headers, request_body=actual_request_body, is_stream=False, @@ -802,10 +805,10 @@ class ChatHandlerBase(BaseMessageHandler, ABC): try: if hasattr(e.response, "is_stream_consumed") and not e.response.is_stream_consumed: error_bytes = await e.response.aread() - return error_bytes.decode("utf-8", errors="replace")[:500] + return error_bytes.decode("utf-8", errors="replace") else: return ( - e.response.text[:500] if hasattr(e.response, "_content") else "Unable to read" + e.response.text if hasattr(e.response, "_content") else "Unable to read" ) except Exception as decode_error: return f"Unable to read error: {decode_error}" diff --git a/src/api/handlers/base/cli_handler_base.py b/src/api/handlers/base/cli_handler_base.py index 5f0f081..a8d2778 100644 --- a/src/api/handlers/base/cli_handler_base.py +++ b/src/api/handlers/base/cli_handler_base.py @@ -35,6 +35,7 @@ from src.api.handlers.base.parsers import get_parser_for_format from src.api.handlers.base.request_builder import PassthroughRequestBuilder from src.api.handlers.base.stream_context import StreamContext from src.api.handlers.base.utils import build_sse_headers +from src.core.error_utils import extract_error_message # 直接从具体模块导入,避免循环依赖 from src.api.handlers.base.response_parser import ( @@ -488,6 +489,8 @@ class CliMessageHandlerBase(BaseMessageHandler): error_text = await self._extract_error_text(e) logger.error(f"Provider 返回错误状态: {e.response.status_code}\n Response: {error_text}") await http_client.aclose() + # 将上游错误信息附加到异常,以便故障转移时能够返回给客户端 + e.upstream_response = error_text # type: ignore[attr-defined] raise except EmbeddedErrorException: @@ -1359,7 +1362,7 @@ class CliMessageHandlerBase(BaseMessageHandler): model=ctx.model, response_time_ms=response_time_ms, status_code=status_code, - error_message=str(error), + error_message=extract_error_message(error), request_headers=original_headers, request_body=actual_request_body, is_stream=True, @@ -1627,7 +1630,7 @@ class CliMessageHandlerBase(BaseMessageHandler): model=model, response_time_ms=response_time_ms, status_code=status_code, - error_message=str(e), + error_message=extract_error_message(e), request_headers=original_headers, request_body=actual_request_body, is_stream=False, @@ -1647,14 +1650,14 @@ class CliMessageHandlerBase(BaseMessageHandler): for encoding in ["utf-8", "gbk", "latin1"]: try: - return error_bytes.decode(encoding)[:500] + return error_bytes.decode(encoding) except (UnicodeDecodeError, LookupError): continue - return error_bytes.decode("utf-8", errors="replace")[:500] + return error_bytes.decode("utf-8", errors="replace") else: return ( - e.response.text[:500] + e.response.text if hasattr(e.response, "_content") else "Unable to read response" ) diff --git a/src/core/error_utils.py b/src/core/error_utils.py new file mode 100644 index 0000000..9b91391 --- /dev/null +++ b/src/core/error_utils.py @@ -0,0 +1,28 @@ +""" +错误消息处理工具函数 +""" + +from typing import Optional + + +def extract_error_message(error: Exception, status_code: Optional[int] = None) -> str: + """ + 从异常中提取错误消息,优先使用上游响应内容 + + Args: + error: 异常对象 + status_code: 可选的 HTTP 状态码,用于构建更详细的错误消息 + + Returns: + 错误消息字符串 + """ + # 优先使用 upstream_response 属性(包含上游 Provider 的原始错误) + upstream_response = getattr(error, "upstream_response", None) + if upstream_response and isinstance(upstream_response, str) and upstream_response.strip(): + return str(upstream_response) + + # 回退到异常的字符串表示(str 可能为空,如 httpx 超时异常) + error_str = str(error) or repr(error) + if status_code is not None: + return f"HTTP {status_code}: {error_str}" + return error_str diff --git a/src/services/orchestration/error_classifier.py b/src/services/orchestration/error_classifier.py index a5ea137..4ee8107 100644 --- a/src/services/orchestration/error_classifier.py +++ b/src/services/orchestration/error_classifier.py @@ -237,7 +237,7 @@ class ErrorClassifier: result["reason"] = str(data.get("reason", data.get("code", ""))) except (json.JSONDecodeError, TypeError, KeyError): - result["message"] = error_text[:500] if len(error_text) > 500 else error_text + result["message"] = error_text return result @@ -323,8 +323,8 @@ class ErrorClassifier: if parts: return ": ".join(parts) if len(parts) > 1 else parts[0] - # 无法解析,返回原始文本(截断) - return parsed["raw"][:500] if len(parsed["raw"]) > 500 else parsed["raw"] + # 无法解析,返回原始文本 + return parsed["raw"] def classify( self, @@ -484,11 +484,15 @@ class ErrorClassifier: return ProviderNotAvailableException( message=detailed_message, provider_name=provider_name, + upstream_status=status, + upstream_response=error_response_text, ) return ProviderNotAvailableException( message=detailed_message, provider_name=provider_name, + upstream_status=status, + upstream_response=error_response_text, ) async def handle_http_error( @@ -532,12 +536,14 @@ class ErrorClassifier: provider_name = str(provider.name) # 尝试读取错误响应内容 - error_response_text = None - try: - if http_error.response and hasattr(http_error.response, "text"): - error_response_text = http_error.response.text[:1000] # 限制长度 - except Exception: - pass + # 优先使用 handler 附加的 upstream_response 属性(流式请求中 response.text 可能为空) + error_response_text = getattr(http_error, "upstream_response", None) + if not error_response_text: + try: + if http_error.response and hasattr(http_error.response, "text"): + error_response_text = http_error.response.text + except Exception: + pass logger.warning(f" [{request_id}] HTTP错误 (attempt={attempt}/{max_attempts}): " f"{http_error.response.status_code if http_error.response else 'unknown'}") diff --git a/src/services/orchestration/fallback_orchestrator.py b/src/services/orchestration/fallback_orchestrator.py index cb13f4d..f3c0658 100644 --- a/src/services/orchestration/fallback_orchestrator.py +++ b/src/services/orchestration/fallback_orchestrator.py @@ -30,6 +30,7 @@ from redis import Redis from sqlalchemy.orm import Session from src.core.enums import APIFormat +from src.core.error_utils import extract_error_message from src.core.exceptions import ( ConcurrencyLimitError, ProviderNotAvailableException, @@ -401,7 +402,7 @@ class FallbackOrchestrator: db=self.db, candidate_id=candidate_record_id, error_type="HTTPStatusError", - error_message=f"HTTP {status_code}: {str(cause)}", + error_message=extract_error_message(cause, status_code), status_code=status_code, latency_ms=elapsed_ms, concurrent_requests=captured_key_concurrent, @@ -425,31 +426,22 @@ class FallbackOrchestrator: attempt=attempt, max_attempts=max_attempts, ) - # str(cause) 可能为空(如 httpx 超时异常),使用 repr() 作为备用 - error_msg = str(cause) or repr(cause) - # 如果是 ProviderNotAvailableException,附加上游响应 - if hasattr(cause, "upstream_response") and cause.upstream_response: - error_msg = f"{error_msg} | 上游响应: {cause.upstream_response[:500]}" RequestCandidateService.mark_candidate_failed( db=self.db, candidate_id=candidate_record_id, error_type=type(cause).__name__, - error_message=error_msg, + error_message=extract_error_message(cause), latency_ms=elapsed_ms, concurrent_requests=captured_key_concurrent, ) return "continue" if has_retry_left else "break" # 未知错误:记录失败并抛出 - error_msg = str(cause) or repr(cause) - # 如果是 ProviderNotAvailableException,附加上游响应 - if hasattr(cause, "upstream_response") and cause.upstream_response: - error_msg = f"{error_msg} | 上游响应: {cause.upstream_response[:500]}" RequestCandidateService.mark_candidate_failed( db=self.db, candidate_id=candidate_record_id, error_type=type(cause).__name__, - error_message=error_msg, + error_message=extract_error_message(cause), latency_ms=elapsed_ms, concurrent_requests=captured_key_concurrent, ) @@ -706,15 +698,25 @@ class FallbackOrchestrator: # 从 httpx.HTTPStatusError 提取 if isinstance(last_error, httpx.HTTPStatusError): upstream_status = last_error.response.status_code - try: - upstream_response = last_error.response.text - except Exception: - pass - # 从异常属性提取 - elif hasattr(last_error, "upstream_status"): - upstream_status = getattr(last_error, "upstream_status", None) - if hasattr(last_error, "upstream_response"): + # 优先使用我们附加的 upstream_response 属性(流已读取时 response.text 可能为空) upstream_response = getattr(last_error, "upstream_response", None) + if not upstream_response: + try: + upstream_response = last_error.response.text + except Exception: + pass + # 从其他异常属性提取(如 ProviderNotAvailableException) + else: + upstream_status = getattr(last_error, "upstream_status", None) + upstream_response = getattr(last_error, "upstream_response", None) + + # 如果响应为空或无效,使用异常的字符串表示 + if ( + not upstream_response + or not upstream_response.strip() + or upstream_response.startswith("Unable to read") + ): + upstream_response = str(last_error) raise ProviderNotAvailableException( f"所有Provider均不可用,已尝试{max_attempts}个组合", diff --git a/src/services/request/result.py b/src/services/request/result.py index 1f91994..192f31c 100644 --- a/src/services/request/result.py +++ b/src/services/request/result.py @@ -289,11 +289,11 @@ class RequestResult: status_code = 500 error_type = "internal_error" - # 构建错误消息,包含上游响应信息 - error_message = str(exception) - if isinstance(exception, ProviderNotAvailableException): - if exception.upstream_response: - error_message = f"{error_message} | 上游响应: {exception.upstream_response[:500]}" + # 构建错误消息:优先使用上游响应作为主要错误信息 + if isinstance(exception, ProviderNotAvailableException) and exception.upstream_response: + error_message = exception.upstream_response + else: + error_message = str(exception) return cls( status=RequestStatus.FAILED,