feat: restore Langfuse observability integration (#103)

- Add lib/langfuse.ts with client, trace input/output, telemetry config
- Add instrumentation.ts for OpenTelemetry setup with Langfuse span processor
- Add /api/log-save endpoint for logging diagram saves
- Add /api/log-feedback endpoint for thumbs up/down feedback
- Update chat route with sessionId tracking and telemetry
- Add feedback buttons (thumbs up/down) to chat messages
- Add sessionId tracking throughout the app
- Update env.example with Langfuse configuration
- Add @langfuse/client, @langfuse/otel, @langfuse/tracing, @opentelemetry/sdk-trace-node
This commit is contained in:
Dayuan Jiang
2025-12-05 21:15:02 +09:00
committed by GitHub
parent 4cd78dc561
commit ed29e32ba3
12 changed files with 807 additions and 10 deletions

View File

@@ -1,6 +1,7 @@
import { streamText, convertToModelMessages, createUIMessageStream, createUIMessageStreamResponse } from 'ai';
import { getAIModel } from '@/lib/ai-providers';
import { findCachedResponse } from '@/lib/cached-responses';
import { setTraceInput, setTraceOutput, getTelemetryConfig, wrapWithObserve } from '@/lib/langfuse';
import { getSystemPrompt } from '@/lib/system-prompts';
import { z } from "zod";
@@ -61,7 +62,27 @@ function createCachedStreamResponse(xml: string): Response {
// Inner handler function
async function handleChatRequest(req: Request): Promise<Response> {
const { messages, xml } = await req.json();
const { messages, xml, sessionId } = await req.json();
// Get user IP for Langfuse tracking
const forwardedFor = req.headers.get('x-forwarded-for');
const userId = forwardedFor?.split(',')[0]?.trim() || 'anonymous';
// Validate sessionId for Langfuse (must be string, max 200 chars)
const validSessionId = sessionId && typeof sessionId === 'string' && sessionId.length <= 200
? sessionId
: undefined;
// Extract user input text for Langfuse trace
const currentMessage = messages[messages.length - 1];
const userInputText = currentMessage?.parts?.find((p: any) => p.type === 'text')?.text || '';
// Update Langfuse trace with input, session, and user
setTraceInput({
input: userInputText,
sessionId: validSessionId,
userId: userId,
});
// === FILE VALIDATION START ===
const fileValidation = validateFileParts(messages);
@@ -191,9 +212,19 @@ ${lastMessageText}
messages: allMessages,
...(providerOptions && { providerOptions }),
...(headers && { headers }),
onFinish: ({ usage, providerMetadata }) => {
console.log('[Cache] providerMetadata:', JSON.stringify(providerMetadata, null, 2));
// Langfuse telemetry config (returns undefined if not configured)
...(getTelemetryConfig({ sessionId: validSessionId, userId }) && {
experimental_telemetry: getTelemetryConfig({ sessionId: validSessionId, userId }),
}),
onFinish: ({ text, usage, providerMetadata }) => {
console.log('[Cache] Full providerMetadata:', JSON.stringify(providerMetadata, null, 2));
console.log('[Cache] Usage:', JSON.stringify(usage, null, 2));
// Pass usage to Langfuse (Bedrock streaming doesn't auto-report tokens to telemetry)
// AI SDK uses inputTokens/outputTokens, Langfuse expects promptTokens/completionTokens
setTraceOutput(text, {
promptTokens: usage?.inputTokens,
completionTokens: usage?.outputTokens,
});
},
tools: {
// Client-side tool that will be executed on the client
@@ -260,7 +291,8 @@ IMPORTANT: Keep edits concise:
return result.toUIMessageStreamResponse();
}
export async function POST(req: Request) {
// Wrap handler with error handling
async function safeHandler(req: Request): Promise<Response> {
try {
return await handleChatRequest(req);
} catch (error) {
@@ -268,3 +300,10 @@ export async function POST(req: Request) {
return Response.json({ error: 'Internal server error' }, { status: 500 });
}
}
// Wrap with Langfuse observe (if configured)
const observedHandler = wrapWithObserve(safeHandler);
export async function POST(req: Request) {
return observedHandler(req);
}

View File

@@ -0,0 +1,103 @@
import { getLangfuseClient } from '@/lib/langfuse';
import { randomUUID } from 'crypto';
import { z } from 'zod';
const feedbackSchema = z.object({
messageId: z.string().min(1).max(200),
feedback: z.enum(['good', 'bad']),
sessionId: z.string().min(1).max(200).optional(),
});
export async function POST(req: Request) {
const langfuse = getLangfuseClient();
if (!langfuse) {
return Response.json({ success: true, logged: false });
}
// Validate input
let data;
try {
data = feedbackSchema.parse(await req.json());
} catch {
return Response.json({ success: false, error: 'Invalid input' }, { status: 400 });
}
const { messageId, feedback, sessionId } = data;
// Get user IP for tracking
const forwardedFor = req.headers.get('x-forwarded-for');
const userId = forwardedFor?.split(',')[0]?.trim() || 'anonymous';
try {
// Find the most recent chat trace for this session to attach the score to
const tracesResponse = await langfuse.api.trace.list({
sessionId,
limit: 1,
});
const traces = tracesResponse.data || [];
const latestTrace = traces[0];
if (!latestTrace) {
// No trace found for this session - create a standalone feedback trace
const traceId = randomUUID();
const timestamp = new Date().toISOString();
await langfuse.api.ingestion.batch({
batch: [
{
type: 'trace-create',
id: randomUUID(),
timestamp,
body: {
id: traceId,
name: 'user-feedback',
sessionId,
userId,
input: { messageId, feedback },
metadata: { source: 'feedback-button', note: 'standalone - no chat trace found' },
timestamp,
},
},
{
type: 'score-create',
id: randomUUID(),
timestamp,
body: {
id: randomUUID(),
traceId,
name: 'user-feedback',
value: feedback === 'good' ? 1 : 0,
comment: `User gave ${feedback} feedback`,
},
},
],
});
} else {
// Attach score to the existing chat trace
const timestamp = new Date().toISOString();
await langfuse.api.ingestion.batch({
batch: [
{
type: 'score-create',
id: randomUUID(),
timestamp,
body: {
id: randomUUID(),
traceId: latestTrace.id,
name: 'user-feedback',
value: feedback === 'good' ? 1 : 0,
comment: `User gave ${feedback} feedback`,
},
},
],
});
}
return Response.json({ success: true, logged: true });
} catch (error) {
console.error('Langfuse feedback error:', error);
return Response.json({ success: false, error: 'Failed to log feedback' }, { status: 500 });
}
}

65
app/api/log-save/route.ts Normal file
View File

@@ -0,0 +1,65 @@
import { getLangfuseClient } from '@/lib/langfuse';
import { randomUUID } from 'crypto';
import { z } from 'zod';
const saveSchema = z.object({
filename: z.string().min(1).max(255),
format: z.enum(['drawio', 'png', 'svg']),
sessionId: z.string().min(1).max(200).optional(),
});
export async function POST(req: Request) {
const langfuse = getLangfuseClient();
if (!langfuse) {
return Response.json({ success: true, logged: false });
}
// Validate input
let data;
try {
data = saveSchema.parse(await req.json());
} catch {
return Response.json({ success: false, error: 'Invalid input' }, { status: 400 });
}
const { filename, format, sessionId } = data;
try {
const timestamp = new Date().toISOString();
// Find the most recent chat trace for this session to attach the save flag
const tracesResponse = await langfuse.api.trace.list({
sessionId,
limit: 1,
});
const traces = tracesResponse.data || [];
const latestTrace = traces[0];
if (latestTrace) {
// Add a score to the existing trace to flag that user saved
await langfuse.api.ingestion.batch({
batch: [
{
type: 'score-create',
id: randomUUID(),
timestamp,
body: {
id: randomUUID(),
traceId: latestTrace.id,
name: 'diagram-saved',
value: 1,
comment: `User saved diagram as ${filename}.${format}`,
},
},
],
});
}
// If no trace found, skip logging (user hasn't chatted yet)
return Response.json({ success: true, logged: !!latestTrace });
} catch (error) {
console.error('Langfuse save error:', error);
return Response.json({ success: false, error: 'Failed to log save' }, { status: 500 });
}
}