refactor: simplify Langfuse integration with AI SDK 6 (#375)

- Remove manual token attribute setting (AI SDK 6 telemetry auto-reports)
- Use totalTokens directly instead of inputTokens + outputTokens calculation
- Fix sessionId bug in log-save/log-feedback (prevents wrong trace attachment)
- Hash IP addresses for privacy instead of storing raw IPs
- Fix isLangfuseEnabled() to check both keys for consistency
This commit is contained in:
Dayuan Jiang
2025-12-23 16:26:45 +09:00
committed by GitHub
parent 9aec7eda79
commit 5ec05eb100
5 changed files with 42 additions and 60 deletions

View File

@@ -162,9 +162,13 @@ async function handleChatRequest(req: Request): Promise<Response> {
const { messages, xml, previousXml, sessionId } = await req.json()
// Get user IP for Langfuse tracking
// Get user IP for Langfuse tracking (hashed for privacy)
const forwardedFor = req.headers.get("x-forwarded-for")
const userId = forwardedFor?.split(",")[0]?.trim() || "anonymous"
const rawIp = forwardedFor?.split(",")[0]?.trim() || "anonymous"
const userId =
rawIp === "anonymous"
? rawIp
: `user-${Buffer.from(rawIp).toString("base64url").slice(0, 8)}`
// Validate sessionId for Langfuse (must be string, max 200 chars)
const validSessionId =
@@ -506,12 +510,9 @@ ${userInputText}
userId,
}),
}),
onFinish: ({ text, usage }) => {
// Pass usage to Langfuse (Bedrock streaming doesn't auto-report tokens to telemetry)
setTraceOutput(text, {
promptTokens: usage?.inputTokens,
completionTokens: usage?.outputTokens,
})
onFinish: ({ text }) => {
// AI SDK 6 telemetry auto-reports token usage on its spans
setTraceOutput(text)
},
tools: {
// Client-side tool that will be executed on the client
@@ -681,20 +682,9 @@ Call this tool to get shape names and usage syntax for a specific library.`,
messageMetadata: ({ part }) => {
if (part.type === "finish") {
const usage = (part as any).totalUsage
if (!usage) {
console.warn(
"[messageMetadata] No usage data in finish part",
)
return undefined
}
// Total input = non-cached + cached (these are separate counts)
// Note: cacheWriteInputTokens is not available on finish part
const totalInputTokens =
(usage.inputTokens ?? 0) +
(usage.inputTokenDetails?.cacheReadTokens ?? 0)
// AI SDK 6 provides totalTokens directly
return {
inputTokens: totalInputTokens,
outputTokens: usage.outputTokens ?? 0,
totalTokens: usage?.totalTokens ?? 0,
finishReason: (part as any).finishReason,
}
}

View File

@@ -27,9 +27,18 @@ export async function POST(req: Request) {
const { messageId, feedback, sessionId } = data
// Get user IP for tracking
// Skip logging if no sessionId - prevents attaching to wrong user's trace
if (!sessionId) {
return Response.json({ success: true, logged: false })
}
// Get user IP for tracking (hashed for privacy)
const forwardedFor = req.headers.get("x-forwarded-for")
const userId = forwardedFor?.split(",")[0]?.trim() || "anonymous"
const rawIp = forwardedFor?.split(",")[0]?.trim() || "anonymous"
const userId =
rawIp === "anonymous"
? rawIp
: `user-${Buffer.from(rawIp).toString("base64url").slice(0, 8)}`
try {
// Find the most recent chat trace for this session to attach the score to

View File

@@ -27,6 +27,11 @@ export async function POST(req: Request) {
const { filename, format, sessionId } = data
// Skip logging if no sessionId - prevents attaching to wrong user's trace
if (!sessionId) {
return Response.json({ success: true, logged: false })
}
try {
const timestamp = new Date().toISOString()

View File

@@ -632,21 +632,15 @@ Continue from EXACTLY where you stopped.`,
// DEBUG: Log finish reason to diagnose truncation
console.log("[onFinish] finishReason:", metadata?.finishReason)
console.log("[onFinish] metadata:", metadata)
if (metadata) {
// Use Number.isFinite to guard against NaN (typeof NaN === 'number' is true)
const inputTokens = Number.isFinite(metadata.inputTokens)
? (metadata.inputTokens as number)
// AI SDK 6 provides totalTokens directly
const totalTokens =
metadata && Number.isFinite(metadata.totalTokens)
? (metadata.totalTokens as number)
: 0
const outputTokens = Number.isFinite(metadata.outputTokens)
? (metadata.outputTokens as number)
: 0
const actualTokens = inputTokens + outputTokens
if (actualTokens > 0) {
quotaManager.incrementTokenCount(actualTokens)
quotaManager.incrementTPMCount(actualTokens)
}
if (totalTokens > 0) {
quotaManager.incrementTokenCount(totalTokens)
quotaManager.incrementTPMCount(totalTokens)
}
},
sendAutomaticallyWhen: ({ messages }) => {

View File

@@ -21,9 +21,11 @@ export function getLangfuseClient(): LangfuseClient | null {
return langfuseClient
}
// Check if Langfuse is configured
// Check if Langfuse is configured (both keys required)
export function isLangfuseEnabled(): boolean {
return !!process.env.LANGFUSE_PUBLIC_KEY
return !!(
process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY
)
}
// Update trace with input data at the start of request
@@ -43,34 +45,16 @@ export function setTraceInput(params: {
}
// Update trace with output and end the span
export function setTraceOutput(
output: string,
usage?: { promptTokens?: number; completionTokens?: number },
) {
// Note: AI SDK 6 telemetry automatically reports token usage on its spans,
// so we only need to set the output text and close our wrapper span
export function setTraceOutput(output: string) {
if (!isLangfuseEnabled()) return
updateActiveTrace({ output })
// End the observe() wrapper span (AI SDK creates its own child spans with usage)
const activeSpan = api.trace.getActiveSpan()
if (activeSpan) {
// Manually set usage attributes since AI SDK Bedrock streaming doesn't provide them
if (usage?.promptTokens) {
activeSpan.setAttribute("ai.usage.promptTokens", usage.promptTokens)
activeSpan.setAttribute(
"gen_ai.usage.input_tokens",
usage.promptTokens,
)
}
if (usage?.completionTokens) {
activeSpan.setAttribute(
"ai.usage.completionTokens",
usage.completionTokens,
)
activeSpan.setAttribute(
"gen_ai.usage.output_tokens",
usage.completionTokens,
)
}
activeSpan.end()
}
}