Implement production observability pipeline:
Distributed Tracing:
start_trace(operation_name, context): Begin trace- Generate trace_id, span_id
- Record timestamp
end_trace(trace_id, result): Complete trace- Calculate duration
- Store full trace
Metrics:
3. record_metric(name, value, tags): Time-series data
- Tags:
{'agent_id': 'A1', 'tool': 'search'} - Aggregate by tags
Alerting:
4. add_alert_rule(name, condition_fn, severity): Define alert
- condition_fn(metrics) -> bool
check_alerts(): Evaluate all rules
Export:
6. export_to_sink(sink_type, config): Send to external system
- Format appropriately for sink
Examples
Example 1:
Input:
pipe = ObservabilityPipeline(); trace_id = pipe.start_trace('agent_run'); isinstance(trace_id, str)Output:
TrueExplanation: Trace started, ID generated
Starter Code
class ObservabilityPipeline:
"""
Complete observability pipeline for production agent systems.
"""
def __init__(self):
self.traces = []
self.metrics = {}
self.alerts = []
self.sinks = [] # External destinations
def start_trace(self, operation_name, context=None):
"""Start a new distributed trace"""
# Your implementation here
pass
def end_trace(self, trace_id, result=None):
"""End trace and calculate duration"""
# Your implementation here
pass
def record_metric(self, name, value, tags=None):
"""Record time-series metric with tags"""
# Your implementation here
pass
def add_alert_rule(self, name, condition_fn, severity='warning'):
"""Add alert rule based on metrics"""
# Your implementation here
pass
def check_alerts(self):
"""Evaluate all alert rules"""
# Your implementation here
pass
def export_to_sink(self, sink_type, config):
"""Export data to external system (Prometheus, Datadog, etc.)"""
# Your implementation here
passPython3
ReadyLines: 1Characters: 0
Ready