Agent Observability and Distributed Tracing
Production agents are black boxes without proper observability. This question covers building a full tracing system.
Task
Implement a distributed tracing system for AI agents:
- Hierarchical spans (trace → agent run → LLM call → tool call).
- Context manager interface for ergonomic span creation.
- Event logging within spans.
- OpenTelemetry-compatible export format.
- Metric computation from traces: latency percentiles, error rates.
Non-Functional Requirements
- Spans must be thread-safe and async-safe.
- Zero-overhead sampling when disabled.
- Export format compatible with Jaeger/Zipkin/Grafana.
- Support for baggage (trace-level metadata propagation).
Constraints
- Trace ID: UUID, shared across all spans in a request.
- Span hierarchy via
parent_span_id. - P50/P95/P99 computed via percentile over all leaf spans.
Examples
Example 1:
Input:
tracer = AgentTracer('my-agent')
tid = tracer.start_trace()
with tracer.span('agent.run') as s:
with tracer.span('llm.call', parent_span_id=s['span_id']):
passOutput:
Trace with 2 nested spans, durations, statuses.Explanation: Context managers create and close spans automatically.
Starter Code
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from datetime import datetime
import uuid
import json
import time
from contextlib import contextmanager
@dataclass
class Span:
span_id: str
trace_id: str
parent_span_id: Optional[str]
operation: str
start_time: float
end_time: Optional[float] = None
status: str = 'running' # running|ok|error
attributes: Dict = field(default_factory=dict)
events: List[Dict] = field(default_factory=list)
error: Optional[str] = None
@property
def duration_ms(self) -> float:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return 0.0
class AgentTracer:
def __init__(self, service_name: str, exporter=None):
self.service_name = service_name
self.exporter = exporter # e.g., OTLP exporter
self.active_spans: Dict[str, Span] = {}
self.completed_spans: List[Span] = []
self._current_trace_id: Optional[str] = None
def start_trace(self, trace_id: str = None) -> str:
pass
@contextmanager
def span(self, operation: str, parent_span_id: str = None, **attributes):
# TODO: Context manager for span lifecycle
pass
def add_event(self, span_id: str, name: str, attributes: Dict = None) -> None:
pass
def set_error(self, span_id: str, error: str) -> None:
pass
def get_trace(self, trace_id: str) -> List[Span]:
pass
def export_opentelemetry(self, trace_id: str) -> Dict:
# TODO: Export in OpenTelemetry format
pass
def compute_metrics(self, trace_id: str) -> Dict:
# TODO: Return p50, p95, p99 latencies, error rate, cost
pass
Python3
ReadyLines: 1Characters: 0
Ready