diff --git a/examples/filters/langfuse_filter_pipeline.py b/examples/filters/langfuse_filter_pipeline.py index 2d73e5de..0891572f 100644 --- a/examples/filters/langfuse_filter_pipeline.py +++ b/examples/filters/langfuse_filter_pipeline.py @@ -1,11 +1,11 @@ """ title: Langfuse Filter Pipeline author: open-webui -date: 2025-06-16 -version: 1.7.1 +date: 2025-07-27 +version: 1.7.2 license: MIT description: A filter pipeline that uses Langfuse. -requirements: langfuse<3.0.0 +requirements: langfuse """ from typing import List, Optional @@ -16,7 +16,6 @@ from utils.pipelines.main import get_last_assistant_message from pydantic import BaseModel from langfuse import Langfuse -from langfuse.api.resources.commons.errors.unauthorized_error import UnauthorizedError def get_last_assistant_message_obj(messages: List[dict]) -> dict: @@ -56,7 +55,8 @@ def __init__(self): ) self.langfuse = None - self.chat_traces = {} + self.chat_spans = {} + self.chat_generations = {} self.suppressed_logs = set() # Dictionary to store model names for each chat self.model_names = {} @@ -64,6 +64,7 @@ def __init__(self): # Only these tasks will be treated as LLM "generations": self.GENERATION_TASKS = {"llm_response"} + def log(self, message: str, suppress_repeats: bool = False): if self.valves.debug: if suppress_repeats: @@ -78,6 +79,23 @@ async def on_startup(self): async def on_shutdown(self): self.log(f"on_shutdown triggered for {__name__}") + + for chat_id, generation in list(self.chat_generations.items()): + try: + if generation: + generation.end() + self.log(f"Ended generation for chat_id: {chat_id}") + except Exception as e: + print(f"Error ending generation for {chat_id}: {e}") + + for chat_id, span in list(self.chat_spans.items()): + try: + if span: + span.end() + self.log(f"Ended span for chat_id: {chat_id}") + except Exception as e: + print(f"Error ending span for {chat_id}: {e}") + if self.langfuse: self.langfuse.flush() @@ -95,10 +113,6 @@ def set_langfuse(self): ) self.langfuse.auth_check() self.log("Langfuse client initialized successfully.") - except UnauthorizedError: - print( - "Langfuse credentials incorrect. Please re-enter your Langfuse credentials in the pipeline settings." - ) except Exception as e: print( f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings." @@ -163,30 +177,26 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: # Build tags tags_list = self._build_tags(task_name) - if chat_id not in self.chat_traces: - self.log(f"Creating new trace for chat_id: {chat_id}") - - trace_payload = { - "name": f"chat:{chat_id}", - "input": body, - "user_id": user_email, - "metadata": metadata, - "session_id": chat_id, - } + if chat_id not in self.chat_spans: + self.log(f"Creating new span for chat_id: {chat_id}") + span_metadata = metadata.copy() if tags_list: - trace_payload["tags"] = tags_list + span_metadata["tags"] = tags_list - if self.valves.debug: - print(f"[DEBUG] Langfuse trace request: {json.dumps(trace_payload, indent=2)}") - - trace = self.langfuse.trace(**trace_payload) - self.chat_traces[chat_id] = trace + span = self.langfuse.start_span( + name=f"chat:{chat_id}", + input=body, + metadata=span_metadata + ) + self.chat_spans[chat_id] = span + + # Update trace with user info if available + if user_email: + span.update_trace(user_id=user_email, session_id=chat_id) else: - trace = self.chat_traces[chat_id] - self.log(f"Reusing existing trace for chat_id: {chat_id}") - if tags_list: - trace.update(tags=tags_list) + span = self.chat_spans[chat_id] + self.log(f"Reusing existing span for chat_id: {chat_id}") # Update metadata with type metadata["type"] = task_name @@ -205,33 +215,34 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: metadata["model_id"] = model_id metadata["model_name"] = model_name - generation_payload = { - "name": f"{task_name}:{str(uuid.uuid4())}", - "model": model_value, - "input": body["messages"], - "metadata": metadata, - } + generation_metadata = metadata.copy() if tags_list: - generation_payload["tags"] = tags_list + generation_metadata["tags"] = tags_list if self.valves.debug: - print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}") + print(f"[DEBUG] Langfuse generation request: model={model_value}, metadata={generation_metadata}") - trace.generation(**generation_payload) + generation = self.langfuse.start_generation( + name=f"{task_name}:{str(uuid.uuid4())}", + model=model_value, + input=body["messages"], + metadata=generation_metadata + ) + self.chat_generations[chat_id] = generation else: # Otherwise, log it as an event - event_payload = { - "name": f"{task_name}:{str(uuid.uuid4())}", - "metadata": metadata, - "input": body["messages"], - } + event_metadata = metadata.copy() if tags_list: - event_payload["tags"] = tags_list + event_metadata["tags"] = tags_list if self.valves.debug: - print(f"[DEBUG] Langfuse event request: {json.dumps(event_payload, indent=2)}") + print(f"[DEBUG] Langfuse event request: {task_name}, metadata={event_metadata}") - trace.event(**event_payload) + event = self.langfuse.create_event( + name=f"{task_name}:{str(uuid.uuid4())}", + input=body["messages"], + metadata=event_metadata + ) return body @@ -252,13 +263,6 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: # Build tags tags_list = self._build_tags(task_name) - if chat_id not in self.chat_traces: - self.log(f"[WARNING] No matching trace found for chat_id: {chat_id}, attempting to re-register.") - # Re-run inlet to register if somehow missing - return await self.inlet(body, user) - - trace = self.chat_traces[chat_id] - assistant_message = get_last_assistant_message(body["messages"]) assistant_message_obj = get_last_assistant_message_obj(body["messages"]) @@ -276,13 +280,18 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: } self.log(f"Usage data extracted: {usage}") - # Update the trace output with the last assistant message - trace.update(output=assistant_message) + # Update the span output with the last assistant message + if chat_id in self.chat_spans: + span = self.chat_spans[chat_id] + span.update(output=assistant_message) + span.update_trace(output=assistant_message) metadata["type"] = task_name metadata["interface"] = "open-webui" - if task_name in self.GENERATION_TASKS: + if task_name in self.GENERATION_TASKS and chat_id in self.chat_generations: + generation = self.chat_generations[chat_id] + # Determine which model value to use based on the use_model_name valve model_id = self.model_names.get(chat_id, {}).get("id", body.get("model")) model_name = self.model_names.get(chat_id, {}).get("name", "unknown") @@ -294,40 +303,40 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: metadata["model_id"] = model_id metadata["model_name"] = model_name - # If it's an LLM generation - generation_payload = { - "name": f"{task_name}:{str(uuid.uuid4())}", - "model": model_value, # <-- Use model name or ID based on valve setting - "input": body["messages"], - "metadata": metadata, - "usage": usage, - } + generation_metadata = metadata.copy() if tags_list: - generation_payload["tags"] = tags_list + generation_metadata["tags"] = tags_list if self.valves.debug: - print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}") + print(f"[DEBUG] Langfuse generation end: model={model_value}, usage={usage}") - trace.generation().end(**generation_payload) + generation.update( + output=assistant_message, + metadata=generation_metadata, + usage_details=usage + ) + generation.end() + + del self.chat_generations[chat_id] self.log(f"Generation ended for chat_id: {chat_id}") else: - # Otherwise log as an event - event_payload = { - "name": f"{task_name}:{str(uuid.uuid4())}", - "metadata": metadata, - "input": body["messages"], - } - if usage: - # If you want usage on event as well - event_payload["metadata"]["usage"] = usage - + # Handle non-generation tasks as events + event_metadata = metadata.copy() if tags_list: - event_payload["tags"] = tags_list + event_metadata["tags"] = tags_list + if usage: + event_metadata["usage"] = usage if self.valves.debug: - print(f"[DEBUG] Langfuse event end request: {json.dumps(event_payload, indent=2)}") - - trace.event(**event_payload) + print(f"[DEBUG] Langfuse event end: {task_name}, usage={usage}") + + # Log as event for non-generation tasks + event = self.langfuse.create_event( + name=f"{task_name}:{str(uuid.uuid4())}", + input=body["messages"], + output=assistant_message, + metadata=event_metadata + ) self.log(f"Event logged for chat_id: {chat_id}") return body