Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath"
version = "2.5.18"
version = "2.5.19"
description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools."
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
105 changes: 105 additions & 0 deletions src/uipath/_cli/_evals/_live_tracking_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import logging
from concurrent.futures import ThreadPoolExecutor

from opentelemetry import context as context_api
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor

from uipath.tracing import LlmOpsHttpExporter, SpanStatus

logger = logging.getLogger(__name__)


class LiveTrackingSpanProcessor(SpanProcessor):
"""Span processor for live span tracking using upsert_span API.

Sends real-time span updates:
- On span start: Upsert with RUNNING status
- On span end: Upsert with final status (OK/ERROR)

All upsert calls run in background threads without blocking evaluation
execution. Uses a thread pool to cap the maximum number of concurrent
threads and avoid resource exhaustion.
"""

def __init__(
self,
exporter: LlmOpsHttpExporter,
max_workers: int = 10,
):
self.exporter = exporter
self.span_status = SpanStatus
self.executor = ThreadPoolExecutor(
max_workers=max_workers, thread_name_prefix="span-upsert"
)

def _upsert_span_async(
self, span: Span | ReadableSpan, status_override: int | None = None
) -> None:
"""Run upsert_span in a background thread without blocking.

Submits the upsert task to the thread pool and returns immediately.
The thread pool handles execution with max_workers cap to prevent
resource exhaustion.
"""

def _upsert():
try:
if status_override:
self.exporter.upsert_span(span, status_override=status_override)
else:
self.exporter.upsert_span(span)
except Exception as e:
logger.debug(f"Failed to upsert span: {e}")

# Submit to thread pool and return immediately (non-blocking)
# The timeout parameter is reserved for shutdown operations
self.executor.submit(_upsert)

def on_start(
self, span: Span, parent_context: context_api.Context | None = None
) -> None:
"""Called when span starts - upsert with RUNNING status (non-blocking)."""
# Only track evaluation-related spans
if span.attributes and self._is_eval_span(span):
self._upsert_span_async(span, status_override=self.span_status.RUNNING)

def on_end(self, span: ReadableSpan) -> None:
"""Called when span ends - upsert with final status (non-blocking)."""
# Only track evaluation-related spans
if span.attributes and self._is_eval_span(span):
self._upsert_span_async(span)

def _is_eval_span(self, span: Span | ReadableSpan) -> bool:
"""Check if span is evaluation-related."""
if not span.attributes:
return False

span_type = span.attributes.get("span_type")
# Track eval-related span types
eval_span_types = {
"eval",
"evaluator",
"evaluation",
"eval_set_run",
"evalOutput",
}

if span_type in eval_span_types:
return True

# Also track spans with execution.id (eval executions)
if "execution.id" in span.attributes:
return True

return False

def shutdown(self) -> None:
"""Shutdown the processor and wait for pending tasks to complete."""
try:
self.executor.shutdown(wait=True)
except Exception as e:
logger.debug(f"Executor shutdown failed: {e}")

def force_flush(self, timeout_millis: int = 30000) -> bool:
"""Force flush - no-op for live tracking."""
return True
11 changes: 0 additions & 11 deletions src/uipath/_cli/_evals/_progress_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,17 +422,6 @@ async def handle_update_eval_run(self, payload: EvalRunUpdatedEvent) -> None:
try:
eval_run_id = self.eval_run_ids.get(payload.execution_id)

# Use evalRunId as the trace_id for agent execution spans
# This makes all agent spans children of the eval run trace
if eval_run_id:
self.spans_exporter.trace_id = eval_run_id
else:
# Fallback to evalSetRunId if eval_run_id not available yet
if self.eval_set_execution_id:
self.spans_exporter.trace_id = self.eval_set_run_ids.get(
self.eval_set_execution_id
)

self.spans_exporter.export(payload.spans)

for eval_result in payload.eval_results:
Expand Down
106 changes: 1 addition & 105 deletions src/uipath/_cli/_evals/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging
import uuid
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from pathlib import Path
from time import time
Expand All @@ -19,7 +18,7 @@

import coverage
from opentelemetry import context as context_api
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
from opentelemetry.sdk.trace import ReadableSpan, Span
from opentelemetry.sdk.trace.export import (
SpanExporter,
SpanExportResult,
Expand Down Expand Up @@ -52,7 +51,6 @@
from uipath._cli._evals.mocks.input_mocker import (
generate_llm_input,
)
from uipath.tracing import LlmOpsHttpExporter, SpanStatus

from ..._events._event_bus import EventBus
from ..._events._events import (
Expand Down Expand Up @@ -162,102 +160,6 @@ def on_start(
self.collector.add_span(span, exec_id)


class LiveTrackingSpanProcessor(SpanProcessor):
"""Span processor for live span tracking using upsert_span API.

Sends real-time span updates:
- On span start: Upsert with RUNNING status
- On span end: Upsert with final status (OK/ERROR)

All upsert calls run in background threads without blocking evaluation
execution. Uses a thread pool to cap the maximum number of concurrent
threads and avoid resource exhaustion.
"""

def __init__(
self,
exporter: LlmOpsHttpExporter,
max_workers: int = 10,
):
self.exporter = exporter
self.span_status = SpanStatus
self.executor = ThreadPoolExecutor(
max_workers=max_workers, thread_name_prefix="span-upsert"
)

def _upsert_span_async(
self, span: Span | ReadableSpan, status_override: int | None = None
) -> None:
"""Run upsert_span in a background thread without blocking.

Submits the upsert task to the thread pool and returns immediately.
The thread pool handles execution with max_workers cap to prevent
resource exhaustion.
"""

def _upsert():
try:
if status_override:
self.exporter.upsert_span(span, status_override=status_override)
else:
self.exporter.upsert_span(span)
except Exception as e:
logger.debug(f"Failed to upsert span: {e}")

# Submit to thread pool and return immediately (non-blocking)
# The timeout parameter is reserved for shutdown operations
self.executor.submit(_upsert)

def on_start(
self, span: Span, parent_context: context_api.Context | None = None
) -> None:
"""Called when span starts - upsert with RUNNING status (non-blocking)."""
# Only track evaluation-related spans
if span.attributes and self._is_eval_span(span):
self._upsert_span_async(span, status_override=self.span_status.RUNNING)

def on_end(self, span: ReadableSpan) -> None:
"""Called when span ends - upsert with final status (non-blocking)."""
# Only track evaluation-related spans
if span.attributes and self._is_eval_span(span):
self._upsert_span_async(span)

def _is_eval_span(self, span: Span | ReadableSpan) -> bool:
"""Check if span is evaluation-related."""
if not span.attributes:
return False

span_type = span.attributes.get("span_type")
# Track eval-related span types
eval_span_types = {
"eval",
"evaluator",
"evaluation",
"eval_set_run",
"evalOutput",
}

if span_type in eval_span_types:
return True

# Also track spans with execution.id (eval executions)
if "execution.id" in span.attributes:
return True

return False

def shutdown(self) -> None:
"""Shutdown the processor and wait for pending tasks to complete."""
try:
self.executor.shutdown(wait=True)
except Exception as e:
logger.debug(f"Executor shutdown failed: {e}")

def force_flush(self, timeout_millis: int = 30000) -> bool:
"""Force flush - no-op for live tracking."""
return True


class ExecutionLogsExporter:
"""Custom exporter that stores multiple execution log handlers."""

Expand Down Expand Up @@ -323,12 +225,6 @@ def __init__(
self.trace_manager.tracer_span_processors.append(span_processor)
self.trace_manager.tracer_provider.add_span_processor(span_processor)

# Live tracking processor for real-time span updates
live_tracking_exporter = LlmOpsHttpExporter()
live_tracking_processor = LiveTrackingSpanProcessor(live_tracking_exporter)
self.trace_manager.tracer_span_processors.append(live_tracking_processor)
self.trace_manager.tracer_provider.add_span_processor(live_tracking_processor)

self.logs_exporter: ExecutionLogsExporter = ExecutionLogsExporter()
# Use job_id if available (for single runtime runs), otherwise generate UUID
self.execution_id = context.job_id or str(uuid.uuid4())
Expand Down
45 changes: 42 additions & 3 deletions src/uipath/_cli/cli_eval.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import ast
import asyncio
import logging
import os
from typing import Any

Expand All @@ -9,6 +10,7 @@

from uipath._cli._evals._console_progress_reporter import ConsoleProgressReporter
from uipath._cli._evals._evaluate import evaluate
from uipath._cli._evals._live_tracking_processor import LiveTrackingSpanProcessor
from uipath._cli._evals._progress_reporter import StudioWebProgressReporter
from uipath._cli._evals._runtime import (
UiPathEvalContext,
Expand All @@ -28,6 +30,7 @@
from ._utils._console import ConsoleLogger
from ._utils._eval_set import EvalHelpers

logger = logging.getLogger(__name__)
console = ConsoleLogger()


Expand Down Expand Up @@ -203,8 +206,18 @@ def eval(
async def execute_eval():
event_bus = EventBus()

# Only create studio web exporter when reporting to Studio Web
studio_web_tracking_exporter = None
if should_register_progress_reporter:
progress_reporter = StudioWebProgressReporter(LlmOpsHttpExporter())
studio_web_tracking_exporter = LlmOpsHttpExporter()
if eval_context.eval_set_run_id:
studio_web_tracking_exporter.trace_id = (
eval_context.eval_set_run_id
)

progress_reporter = StudioWebProgressReporter(
studio_web_tracking_exporter
)
await progress_reporter.subscribe_to_eval_runtime_events(event_bus)

console_reporter = ConsoleProgressReporter()
Expand All @@ -224,8 +237,31 @@ async def execute_eval():
# Set job_id in eval context for single runtime runs
eval_context.job_id = ctx.job_id

# Create job exporter for live tracking
job_exporter = None
if ctx.job_id:
trace_manager.add_span_exporter(LlmOpsHttpExporter())
job_exporter = LlmOpsHttpExporter()
trace_manager.add_span_exporter(job_exporter)
# Add live tracking processor for real-time span updates
job_tracking_processor = LiveTrackingSpanProcessor(job_exporter)
trace_manager.tracer_span_processors.append(
job_tracking_processor
)
trace_manager.tracer_provider.add_span_processor(
job_tracking_processor
)

# Add studio web tracking processor if reporting to Studio Web
if studio_web_tracking_exporter:
studio_web_tracking_processor = LiveTrackingSpanProcessor(
studio_web_tracking_exporter
)
trace_manager.tracer_span_processors.append(
studio_web_tracking_processor
)
trace_manager.tracer_provider.add_span_processor(
studio_web_tracking_processor
)

if trace_file:
trace_manager.add_span_exporter(
Expand All @@ -252,7 +288,10 @@ async def execute_eval():
else:
# Fall back to execution without overwrites
ctx.result = await evaluate(
runtime_factory, trace_manager, eval_context, event_bus
runtime_factory,
trace_manager,
eval_context,
event_bus,
)
finally:
if runtime_factory:
Expand Down
Loading