feat: add trace-level input/output to Langfuse observability (#69)

* feat: add trace-level input/output to Langfuse observability

- Add @langfuse/client and @langfuse/tracing dependencies
- Wrap POST handler with observe() for proper tracing
- Use updateActiveTrace() to set trace input, output, sessionId, userId
- Filter Next.js HTTP spans in shouldExportSpan so AI SDK spans become root traces
- Enable recordInputs/recordOutputs in experimental_telemetry

* refactor: extract Langfuse logic to separate lib/langfuse.ts module
This commit is contained in:
Dayuan Jiang
2025-12-04 11:24:26 +09:00
committed by GitHub
parent bed04c82f8
commit 9d9613a8d1
5 changed files with 172 additions and 39 deletions

View File

@@ -1,6 +1,7 @@
import { streamText, convertToModelMessages, createUIMessageStream, createUIMessageStreamResponse } from 'ai';
import { getAIModel } from '@/lib/ai-providers';
import { findCachedResponse } from '@/lib/cached-responses';
import { setTraceInput, setTraceOutput, getTelemetryConfig, wrapWithObserve } from '@/lib/langfuse';
import { z } from "zod";
export const maxDuration = 300;
@@ -28,22 +29,33 @@ function createCachedStreamResponse(xml: string): Response {
return createUIMessageStreamResponse({ stream });
}
export async function POST(req: Request) {
try {
const { messages, xml, sessionId } = await req.json();
// Inner handler function
async function handleChatRequest(req: Request): Promise<Response> {
const { messages, xml, sessionId } = await req.json();
// Get user IP for Langfuse tracking
const forwardedFor = req.headers.get('x-forwarded-for');
const userId = forwardedFor?.split(',')[0]?.trim() || 'anonymous';
// Get user IP for Langfuse tracking
const forwardedFor = req.headers.get('x-forwarded-for');
const userId = forwardedFor?.split(',')[0]?.trim() || 'anonymous';
// Validate sessionId for Langfuse (must be string, max 200 chars)
const validSessionId = sessionId && typeof sessionId === 'string' && sessionId.length <= 200
? sessionId
: undefined;
// Validate sessionId for Langfuse (must be string, max 200 chars)
const validSessionId = sessionId && typeof sessionId === 'string' && sessionId.length <= 200
? sessionId
: undefined;
// === CACHE CHECK START ===
const isFirstMessage = messages.length === 1;
const isEmptyDiagram = !xml || xml.trim() === '' || isMinimalDiagram(xml);
// Extract user input text for Langfuse trace
const currentMessage = messages[messages.length - 1];
const userInputText = currentMessage?.parts?.find((p: any) => p.type === 'text')?.text || '';
// Update Langfuse trace with input, session, and user
setTraceInput({
input: userInputText,
sessionId: validSessionId,
userId: userId,
});
// === CACHE CHECK START ===
const isFirstMessage = messages.length === 1;
const isEmptyDiagram = !xml || xml.trim() === '' || isMinimalDiagram(xml);
if (isFirstMessage && isEmptyDiagram) {
const lastMessage = messages[0];
@@ -266,23 +278,15 @@ ${lastMessageText}
messages: [systemMessageWithCache, ...enhancedMessages],
...(providerOptions && { providerOptions }),
...(headers && { headers }),
// Only enable telemetry if Langfuse is configured
...(process.env.LANGFUSE_PUBLIC_KEY && {
experimental_telemetry: {
isEnabled: true,
metadata: {
sessionId: validSessionId,
userId: userId,
},
},
// Langfuse telemetry config (returns undefined if not configured)
...(getTelemetryConfig({ sessionId: validSessionId, userId }) && {
experimental_telemetry: getTelemetryConfig({ sessionId: validSessionId, userId }),
}),
onFinish: ({ usage, providerMetadata }) => {
console.log('[Cache] Usage:', JSON.stringify({
inputTokens: usage?.inputTokens,
outputTokens: usage?.outputTokens,
cachedInputTokens: usage?.cachedInputTokens,
}, null, 2));
console.log('[Cache] Provider metadata:', JSON.stringify(providerMetadata, null, 2));
onFinish: ({ text, usage, providerMetadata }) => {
console.log('[Cache] Full providerMetadata:', JSON.stringify(providerMetadata, null, 2));
console.log('[Cache] Usage:', JSON.stringify(usage, null, 2));
// Update Langfuse trace with output
setTraceOutput(text);
},
tools: {
// Client-side tool that will be executed on the client
@@ -366,14 +370,24 @@ IMPORTANT: Keep edits concise:
return errorString;
}
return result.toUIMessageStreamResponse({
onError: errorHandler,
});
return result.toUIMessageStreamResponse({
onError: errorHandler,
});
}
// Wrap handler with error handling
async function safeHandler(req: Request): Promise<Response> {
try {
return await handleChatRequest(req);
} catch (error) {
console.error('Error in chat route:', error);
return Response.json(
{ error: 'Internal server error' },
{ status: 500 }
);
return Response.json({ error: 'Internal server error' }, { status: 500 });
}
}
// Wrap with Langfuse observe (if configured)
const observedHandler = wrapWithObserve(safeHandler);
export async function POST(req: Request) {
return observedHandler(req);
}