Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 90 additions & 81 deletions examples/filters/langfuse_filter_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""
title: Langfuse Filter Pipeline
author: open-webui
date: 2025-06-16
version: 1.7.1
date: 2025-07-27
version: 1.7.2
license: MIT
description: A filter pipeline that uses Langfuse.
requirements: langfuse<3.0.0
requirements: langfuse
"""

from typing import List, Optional
Expand All @@ -16,7 +16,6 @@
from utils.pipelines.main import get_last_assistant_message
from pydantic import BaseModel
from langfuse import Langfuse
from langfuse.api.resources.commons.errors.unauthorized_error import UnauthorizedError


def get_last_assistant_message_obj(messages: List[dict]) -> dict:
Expand Down Expand Up @@ -56,14 +55,16 @@ def __init__(self):
)

self.langfuse = None
self.chat_traces = {}
self.chat_spans = {}
self.chat_generations = {}
self.suppressed_logs = set()
# Dictionary to store model names for each chat
self.model_names = {}

# Only these tasks will be treated as LLM "generations":
self.GENERATION_TASKS = {"llm_response"}


def log(self, message: str, suppress_repeats: bool = False):
if self.valves.debug:
if suppress_repeats:
Expand All @@ -78,6 +79,23 @@ async def on_startup(self):

async def on_shutdown(self):
self.log(f"on_shutdown triggered for {__name__}")

for chat_id, generation in list(self.chat_generations.items()):
try:
if generation:
generation.end()
self.log(f"Ended generation for chat_id: {chat_id}")
except Exception as e:
print(f"Error ending generation for {chat_id}: {e}")

for chat_id, span in list(self.chat_spans.items()):
try:
if span:
span.end()
self.log(f"Ended span for chat_id: {chat_id}")
except Exception as e:
print(f"Error ending span for {chat_id}: {e}")

if self.langfuse:
self.langfuse.flush()

Expand All @@ -95,10 +113,6 @@ def set_langfuse(self):
)
self.langfuse.auth_check()
self.log("Langfuse client initialized successfully.")
except UnauthorizedError:
print(
"Langfuse credentials incorrect. Please re-enter your Langfuse credentials in the pipeline settings."
)
except Exception as e:
print(
f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings."
Expand Down Expand Up @@ -163,30 +177,26 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
# Build tags
tags_list = self._build_tags(task_name)

if chat_id not in self.chat_traces:
self.log(f"Creating new trace for chat_id: {chat_id}")

trace_payload = {
"name": f"chat:{chat_id}",
"input": body,
"user_id": user_email,
"metadata": metadata,
"session_id": chat_id,
}
if chat_id not in self.chat_spans:
self.log(f"Creating new span for chat_id: {chat_id}")

span_metadata = metadata.copy()
if tags_list:
trace_payload["tags"] = tags_list
span_metadata["tags"] = tags_list

if self.valves.debug:
print(f"[DEBUG] Langfuse trace request: {json.dumps(trace_payload, indent=2)}")

trace = self.langfuse.trace(**trace_payload)
self.chat_traces[chat_id] = trace
span = self.langfuse.start_span(
name=f"chat:{chat_id}",
input=body,
metadata=span_metadata
)
self.chat_spans[chat_id] = span

# Update trace with user info if available
if user_email:
span.update_trace(user_id=user_email, session_id=chat_id)
else:
trace = self.chat_traces[chat_id]
self.log(f"Reusing existing trace for chat_id: {chat_id}")
if tags_list:
trace.update(tags=tags_list)
span = self.chat_spans[chat_id]
self.log(f"Reusing existing span for chat_id: {chat_id}")

# Update metadata with type
metadata["type"] = task_name
Expand All @@ -205,33 +215,34 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
metadata["model_id"] = model_id
metadata["model_name"] = model_name

generation_payload = {
"name": f"{task_name}:{str(uuid.uuid4())}",
"model": model_value,
"input": body["messages"],
"metadata": metadata,
}
generation_metadata = metadata.copy()
if tags_list:
generation_payload["tags"] = tags_list
generation_metadata["tags"] = tags_list

if self.valves.debug:
print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}")
print(f"[DEBUG] Langfuse generation request: model={model_value}, metadata={generation_metadata}")

trace.generation(**generation_payload)
generation = self.langfuse.start_generation(
name=f"{task_name}:{str(uuid.uuid4())}",
model=model_value,
input=body["messages"],
metadata=generation_metadata
)
self.chat_generations[chat_id] = generation
else:
# Otherwise, log it as an event
event_payload = {
"name": f"{task_name}:{str(uuid.uuid4())}",
"metadata": metadata,
"input": body["messages"],
}
event_metadata = metadata.copy()
if tags_list:
event_payload["tags"] = tags_list
event_metadata["tags"] = tags_list

if self.valves.debug:
print(f"[DEBUG] Langfuse event request: {json.dumps(event_payload, indent=2)}")
print(f"[DEBUG] Langfuse event request: {task_name}, metadata={event_metadata}")

trace.event(**event_payload)
event = self.langfuse.create_event(
name=f"{task_name}:{str(uuid.uuid4())}",
input=body["messages"],
metadata=event_metadata
)

return body

Expand All @@ -252,13 +263,6 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
# Build tags
tags_list = self._build_tags(task_name)

if chat_id not in self.chat_traces:
self.log(f"[WARNING] No matching trace found for chat_id: {chat_id}, attempting to re-register.")
# Re-run inlet to register if somehow missing
return await self.inlet(body, user)

trace = self.chat_traces[chat_id]

assistant_message = get_last_assistant_message(body["messages"])
assistant_message_obj = get_last_assistant_message_obj(body["messages"])

Expand All @@ -276,13 +280,18 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
}
self.log(f"Usage data extracted: {usage}")

# Update the trace output with the last assistant message
trace.update(output=assistant_message)
# Update the span output with the last assistant message
if chat_id in self.chat_spans:
span = self.chat_spans[chat_id]
span.update(output=assistant_message)
span.update_trace(output=assistant_message)

metadata["type"] = task_name
metadata["interface"] = "open-webui"

if task_name in self.GENERATION_TASKS:
if task_name in self.GENERATION_TASKS and chat_id in self.chat_generations:
generation = self.chat_generations[chat_id]

# Determine which model value to use based on the use_model_name valve
model_id = self.model_names.get(chat_id, {}).get("id", body.get("model"))
model_name = self.model_names.get(chat_id, {}).get("name", "unknown")
Expand All @@ -294,40 +303,40 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
metadata["model_id"] = model_id
metadata["model_name"] = model_name

# If it's an LLM generation
generation_payload = {
"name": f"{task_name}:{str(uuid.uuid4())}",
"model": model_value, # <-- Use model name or ID based on valve setting
"input": body["messages"],
"metadata": metadata,
"usage": usage,
}
generation_metadata = metadata.copy()
if tags_list:
generation_payload["tags"] = tags_list
generation_metadata["tags"] = tags_list

if self.valves.debug:
print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}")
print(f"[DEBUG] Langfuse generation end: model={model_value}, usage={usage}")

trace.generation().end(**generation_payload)
generation.update(
output=assistant_message,
metadata=generation_metadata,
usage_details=usage
)
generation.end()

del self.chat_generations[chat_id]
self.log(f"Generation ended for chat_id: {chat_id}")
else:
# Otherwise log as an event
event_payload = {
"name": f"{task_name}:{str(uuid.uuid4())}",
"metadata": metadata,
"input": body["messages"],
}
if usage:
# If you want usage on event as well
event_payload["metadata"]["usage"] = usage

# Handle non-generation tasks as events
event_metadata = metadata.copy()
if tags_list:
event_payload["tags"] = tags_list
event_metadata["tags"] = tags_list
if usage:
event_metadata["usage"] = usage

if self.valves.debug:
print(f"[DEBUG] Langfuse event end request: {json.dumps(event_payload, indent=2)}")

trace.event(**event_payload)
print(f"[DEBUG] Langfuse event end: {task_name}, usage={usage}")

# Log as event for non-generation tasks
event = self.langfuse.create_event(
name=f"{task_name}:{str(uuid.uuid4())}",
input=body["messages"],
output=assistant_message,
metadata=event_metadata
)
self.log(f"Event logged for chat_id: {chat_id}")

return body