Implement Agent Observability and Distributed Tracing

Hard
Agents

Agent Observability and Distributed Tracing

Production agents are black boxes without proper observability. This question covers building a full tracing system.

Task

Implement a distributed tracing system for AI agents:

  1. Hierarchical spans (trace → agent run → LLM call → tool call).
  2. Context manager interface for ergonomic span creation.
  3. Event logging within spans.
  4. OpenTelemetry-compatible export format.
  5. Metric computation from traces: latency percentiles, error rates.

Non-Functional Requirements

  • Spans must be thread-safe and async-safe.
  • Zero-overhead sampling when disabled.
  • Export format compatible with Jaeger/Zipkin/Grafana.
  • Support for baggage (trace-level metadata propagation).

Constraints

  • Trace ID: UUID, shared across all spans in a request.
  • Span hierarchy via parent_span_id.
  • P50/P95/P99 computed via percentile over all leaf spans.

Examples

Example 1:
Input: tracer = AgentTracer('my-agent') tid = tracer.start_trace() with tracer.span('agent.run') as s: with tracer.span('llm.call', parent_span_id=s['span_id']): pass
Output: Trace with 2 nested spans, durations, statuses.
Explanation: Context managers create and close spans automatically.

Starter Code

from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from datetime import datetime
import uuid
import json
import time
from contextlib import contextmanager

@dataclass
class Span:
    span_id: str
    trace_id: str
    parent_span_id: Optional[str]
    operation: str
    start_time: float
    end_time: Optional[float] = None
    status: str = 'running'  # running|ok|error
    attributes: Dict = field(default_factory=dict)
    events: List[Dict] = field(default_factory=list)
    error: Optional[str] = None

    @property
    def duration_ms(self) -> float:
        if self.end_time:
            return (self.end_time - self.start_time) * 1000
        return 0.0

class AgentTracer:
    def __init__(self, service_name: str, exporter=None):
        self.service_name = service_name
        self.exporter = exporter  # e.g., OTLP exporter
        self.active_spans: Dict[str, Span] = {}
        self.completed_spans: List[Span] = []
        self._current_trace_id: Optional[str] = None

    def start_trace(self, trace_id: str = None) -> str:
        pass

    @contextmanager
    def span(self, operation: str, parent_span_id: str = None, **attributes):
        # TODO: Context manager for span lifecycle
        pass

    def add_event(self, span_id: str, name: str, attributes: Dict = None) -> None:
        pass

    def set_error(self, span_id: str, error: str) -> None:
        pass

    def get_trace(self, trace_id: str) -> List[Span]:
        pass

    def export_opentelemetry(self, trace_id: str) -> Dict:
        # TODO: Export in OpenTelemetry format
        pass

    def compute_metrics(self, trace_id: str) -> Dict:
        # TODO: Return p50, p95, p99 latencies, error rate, cost
        pass
Lines: 1Characters: 0
Ready
The AI Interview - Master AI/ML Interviews