From 9d9613a8d162de6e06b46b01c23988f850538218 Mon Sep 17 00:00:00 2001 From: Dayuan Jiang <34411969+DayuanJiang@users.noreply.github.com> Date: Thu, 4 Dec 2025 11:24:26 +0900 Subject: [PATCH] feat: add trace-level input/output to Langfuse observability (#69) * feat: add trace-level input/output to Langfuse observability - Add @langfuse/client and @langfuse/tracing dependencies - Wrap POST handler with observe() for proper tracing - Use updateActiveTrace() to set trace input, output, sessionId, userId - Filter Next.js HTTP spans in shouldExportSpan so AI SDK spans become root traces - Enable recordInputs/recordOutputs in experimental_telemetry * refactor: extract Langfuse logic to separate lib/langfuse.ts module --- app/api/chat/route.ts | 86 +++++++++++++++++++++++++------------------ instrumentation.ts | 13 +++++++ lib/langfuse.ts | 63 +++++++++++++++++++++++++++++++ package-lock.json | 47 +++++++++++++++++++++-- package.json | 2 + 5 files changed, 172 insertions(+), 39 deletions(-) create mode 100644 lib/langfuse.ts diff --git a/app/api/chat/route.ts b/app/api/chat/route.ts index 2afa072..6506856 100644 --- a/app/api/chat/route.ts +++ b/app/api/chat/route.ts @@ -1,6 +1,7 @@ import { streamText, convertToModelMessages, createUIMessageStream, createUIMessageStreamResponse } from 'ai'; import { getAIModel } from '@/lib/ai-providers'; import { findCachedResponse } from '@/lib/cached-responses'; +import { setTraceInput, setTraceOutput, getTelemetryConfig, wrapWithObserve } from '@/lib/langfuse'; import { z } from "zod"; export const maxDuration = 300; @@ -28,22 +29,33 @@ function createCachedStreamResponse(xml: string): Response { return createUIMessageStreamResponse({ stream }); } -export async function POST(req: Request) { - try { - const { messages, xml, sessionId } = await req.json(); +// Inner handler function +async function handleChatRequest(req: Request): Promise { + const { messages, xml, sessionId } = await req.json(); - // Get user IP for Langfuse tracking - const forwardedFor = req.headers.get('x-forwarded-for'); - const userId = forwardedFor?.split(',')[0]?.trim() || 'anonymous'; + // Get user IP for Langfuse tracking + const forwardedFor = req.headers.get('x-forwarded-for'); + const userId = forwardedFor?.split(',')[0]?.trim() || 'anonymous'; - // Validate sessionId for Langfuse (must be string, max 200 chars) - const validSessionId = sessionId && typeof sessionId === 'string' && sessionId.length <= 200 - ? sessionId - : undefined; + // Validate sessionId for Langfuse (must be string, max 200 chars) + const validSessionId = sessionId && typeof sessionId === 'string' && sessionId.length <= 200 + ? sessionId + : undefined; - // === CACHE CHECK START === - const isFirstMessage = messages.length === 1; - const isEmptyDiagram = !xml || xml.trim() === '' || isMinimalDiagram(xml); + // Extract user input text for Langfuse trace + const currentMessage = messages[messages.length - 1]; + const userInputText = currentMessage?.parts?.find((p: any) => p.type === 'text')?.text || ''; + + // Update Langfuse trace with input, session, and user + setTraceInput({ + input: userInputText, + sessionId: validSessionId, + userId: userId, + }); + + // === CACHE CHECK START === + const isFirstMessage = messages.length === 1; + const isEmptyDiagram = !xml || xml.trim() === '' || isMinimalDiagram(xml); if (isFirstMessage && isEmptyDiagram) { const lastMessage = messages[0]; @@ -266,23 +278,15 @@ ${lastMessageText} messages: [systemMessageWithCache, ...enhancedMessages], ...(providerOptions && { providerOptions }), ...(headers && { headers }), - // Only enable telemetry if Langfuse is configured - ...(process.env.LANGFUSE_PUBLIC_KEY && { - experimental_telemetry: { - isEnabled: true, - metadata: { - sessionId: validSessionId, - userId: userId, - }, - }, + // Langfuse telemetry config (returns undefined if not configured) + ...(getTelemetryConfig({ sessionId: validSessionId, userId }) && { + experimental_telemetry: getTelemetryConfig({ sessionId: validSessionId, userId }), }), - onFinish: ({ usage, providerMetadata }) => { - console.log('[Cache] Usage:', JSON.stringify({ - inputTokens: usage?.inputTokens, - outputTokens: usage?.outputTokens, - cachedInputTokens: usage?.cachedInputTokens, - }, null, 2)); - console.log('[Cache] Provider metadata:', JSON.stringify(providerMetadata, null, 2)); + onFinish: ({ text, usage, providerMetadata }) => { + console.log('[Cache] Full providerMetadata:', JSON.stringify(providerMetadata, null, 2)); + console.log('[Cache] Usage:', JSON.stringify(usage, null, 2)); + // Update Langfuse trace with output + setTraceOutput(text); }, tools: { // Client-side tool that will be executed on the client @@ -366,14 +370,24 @@ IMPORTANT: Keep edits concise: return errorString; } - return result.toUIMessageStreamResponse({ - onError: errorHandler, - }); + return result.toUIMessageStreamResponse({ + onError: errorHandler, + }); +} + +// Wrap handler with error handling +async function safeHandler(req: Request): Promise { + try { + return await handleChatRequest(req); } catch (error) { console.error('Error in chat route:', error); - return Response.json( - { error: 'Internal server error' }, - { status: 500 } - ); + return Response.json({ error: 'Internal server error' }, { status: 500 }); } } + +// Wrap with Langfuse observe (if configured) +const observedHandler = wrapWithObserve(safeHandler); + +export async function POST(req: Request) { + return observedHandler(req); +} diff --git a/instrumentation.ts b/instrumentation.ts index be21a71..fc0f703 100644 --- a/instrumentation.ts +++ b/instrumentation.ts @@ -12,11 +12,24 @@ export function register() { publicKey: process.env.LANGFUSE_PUBLIC_KEY, secretKey: process.env.LANGFUSE_SECRET_KEY, baseUrl: process.env.LANGFUSE_BASEURL, + // Filter out Next.js HTTP request spans so AI SDK spans become root traces + shouldExportSpan: ({ otelSpan }) => { + const spanName = otelSpan.name; + // Skip Next.js HTTP infrastructure spans + if (spanName.startsWith('POST /') || + spanName.startsWith('GET /') || + spanName.includes('BaseServer') || + spanName.includes('handleRequest')) { + return false; + } + return true; + }, }); const tracerProvider = new NodeTracerProvider({ spanProcessors: [langfuseSpanProcessor], }); + // Register globally so AI SDK's telemetry also uses this processor tracerProvider.register(); } diff --git a/lib/langfuse.ts b/lib/langfuse.ts new file mode 100644 index 0000000..56b1c54 --- /dev/null +++ b/lib/langfuse.ts @@ -0,0 +1,63 @@ +import { observe, updateActiveTrace } from '@langfuse/tracing'; +import * as api from '@opentelemetry/api'; + +// Check if Langfuse is configured +export function isLangfuseEnabled(): boolean { + return !!process.env.LANGFUSE_PUBLIC_KEY; +} + +// Update trace with input data at the start of request +export function setTraceInput(params: { + input: string; + sessionId?: string; + userId?: string; +}) { + if (!isLangfuseEnabled()) return; + + updateActiveTrace({ + name: 'chat', + input: params.input, + sessionId: params.sessionId, + userId: params.userId, + }); +} + +// Update trace with output and end the span +export function setTraceOutput(output: string) { + if (!isLangfuseEnabled()) return; + + updateActiveTrace({ output }); + const activeSpan = api.trace.getActiveSpan(); + if (activeSpan) { + activeSpan.end(); + } +} + +// Get telemetry config for streamText +export function getTelemetryConfig(params: { + sessionId?: string; + userId?: string; +}) { + if (!isLangfuseEnabled()) return undefined; + + return { + isEnabled: true, + recordInputs: true, + recordOutputs: true, + metadata: { + sessionId: params.sessionId, + userId: params.userId, + }, + }; +} + +// Wrap a handler with Langfuse observe +export function wrapWithObserve( + handler: (req: Request) => Promise +): (req: Request) => Promise { + if (!isLangfuseEnabled()) { + return handler; + } + + return observe(handler, { name: 'chat', endOnExit: false }); +} diff --git a/package-lock.json b/package-lock.json index 85dff6b..f12bedd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7,6 +7,7 @@ "": { "name": "next-ai-draw-io", "version": "0.2.0", + "license": "Apache-2.0", "dependencies": { "@ai-sdk/amazon-bedrock": "^3.0.62", "@ai-sdk/anthropic": "^2.0.44", @@ -15,7 +16,9 @@ "@ai-sdk/google": "^2.0.0", "@ai-sdk/openai": "^2.0.19", "@ai-sdk/react": "^2.0.22", + "@langfuse/client": "^4.4.9", "@langfuse/otel": "^4.4.4", + "@langfuse/tracing": "^4.4.9", "@next/third-parties": "^16.0.6", "@openrouter/ai-sdk-provider": "^1.2.3", "@opentelemetry/sdk-trace-node": "^2.2.0", @@ -1594,10 +1597,24 @@ "@jridgewell/sourcemap-codec": "^1.4.14" } }, + "node_modules/@langfuse/client": { + "version": "4.4.9", + "resolved": "https://registry.npmjs.org/@langfuse/client/-/client-4.4.9.tgz", + "integrity": "sha512-Y7bU70tMx/lYOU/A7NGvXXVZoL3AiFigbf7EwS5PVFc0xd34eRUmvwdLwEtuK7CnYTyxIZTzVVP2KEaicWCYZg==", + "license": "MIT", + "dependencies": { + "@langfuse/core": "^4.4.9", + "@langfuse/tracing": "^4.4.9", + "mustache": "^4.2.0" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.9.0" + } + }, "node_modules/@langfuse/core": { - "version": "4.4.4", - "resolved": "https://registry.npmjs.org/@langfuse/core/-/core-4.4.4.tgz", - "integrity": "sha512-hmtMNAOIsvDwT/xld0CJPXrIsakETbelSmAOGEY07faKtKdJy/BGjxexBbfAWLPgAC6wqC2fK2ByaYCGgC7MBw==", + "version": "4.4.9", + "resolved": "https://registry.npmjs.org/@langfuse/core/-/core-4.4.9.tgz", + "integrity": "sha512-9Hz/eH6dkOP8E/FLt1fsAQR8RE/TF8Ag/39GmY8JjN1o/Tl/MFJfK2QvqRGrkjDkIkMJGOSD+iQmV2pYm4upDA==", "license": "MIT", "peerDependencies": { "@opentelemetry/api": "^1.9.0" @@ -1621,6 +1638,21 @@ "@opentelemetry/sdk-trace-base": "^2.0.1" } }, + "node_modules/@langfuse/tracing": { + "version": "4.4.9", + "resolved": "https://registry.npmjs.org/@langfuse/tracing/-/tracing-4.4.9.tgz", + "integrity": "sha512-if+G/v9NsyTKj40KKX96bRSdMXwyDbVL4GJQvmwQ9SxvGYF+d99pGFB7L6QOeCd1KBHMdmDe733ncmvCnSHJ9w==", + "license": "MIT", + "dependencies": { + "@langfuse/core": "^4.4.9" + }, + "engines": { + "node": ">=20" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.9.0" + } + }, "node_modules/@napi-rs/wasm-runtime": { "version": "0.2.12", "resolved": "https://registry.npmjs.org/@napi-rs/wasm-runtime/-/wasm-runtime-0.2.12.tgz", @@ -7690,6 +7722,15 @@ "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", "license": "MIT" }, + "node_modules/mustache": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/mustache/-/mustache-4.2.0.tgz", + "integrity": "sha512-71ippSywq5Yb7/tVYyGbkBggbU8H3u5Rz56fH60jGFgr8uHwxs+aSKeqmluIVzM0m0kB7xQjKS6qPfd0b2ZoqQ==", + "license": "MIT", + "bin": { + "mustache": "bin/mustache" + } + }, "node_modules/nanoid": { "version": "3.3.11", "resolved": "https://registry.npmjs.org/nanoid/-/nanoid-3.3.11.tgz", diff --git a/package.json b/package.json index 96eb32f..4b7bd47 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,9 @@ "@ai-sdk/google": "^2.0.0", "@ai-sdk/openai": "^2.0.19", "@ai-sdk/react": "^2.0.22", + "@langfuse/client": "^4.4.9", "@langfuse/otel": "^4.4.4", + "@langfuse/tracing": "^4.4.9", "@next/third-parties": "^16.0.6", "@openrouter/ai-sdk-provider": "^1.2.3", "@opentelemetry/sdk-trace-node": "^2.2.0",