System Design: Production-Ready AI Agent
This question simulates a real system design interview at an AI lab.
Task
Design and implement a production-ready AI agent with:
- Pluggable backends: LLM, tools, memory, observability.
- Full agent loop: Load memory → LLM → parse → tool call → observe → repeat.
- Budget control: Max steps, tokens, cost (USD), time.
- Reliability: Retry logic, timeout handling, graceful degradation.
- Memory: Load/save per run, compression on overflow.
- Observability: Every run and tool call recorded.
- Guardrails: Output validation at each step.
Non-Functional Requirements
- P95 latency < 30s for 10-step tasks.
- Cost tracking accurate to $0.001.
- Zero data loss on agent crash (persist state after each step).
- Thread-safe for concurrent runs.
Constraints
- All I/O operations must be async.
- Tool calls must be idempotent or explicitly marked non-idempotent.
- Memory must compress before exceeding
compression_thresholdtokens.
Examples
Example 1:
Input:
run = await agent.run('What is the weather in Paris?', run_id='run_001')Output:
AgentRun(run_id='run_001', output='The weather in Paris is ...', steps=2, tool_calls=[...], total_cost_usd=0.002, status='complete')Explanation: Agent calls weather tool, gets result, returns grounded answer with full telemetry.
Starter Code
from typing import Any, Dict, List, Optional, Callable
from abc import ABC, abstractmethod
import asyncio
import time
import logging
import json
from dataclasses import dataclass, field
from enum import Enum
# ---- Core Types ----
class MessageRole(Enum):
SYSTEM = 'system'
USER = 'user'
ASSISTANT = 'assistant'
TOOL = 'tool'
@dataclass
class Message:
role: MessageRole
content: str
metadata: Dict = field(default_factory=dict)
@dataclass
class ToolCall:
tool_name: str
args: Dict
call_id: str
result: Optional[Any] = None
error: Optional[str] = None
latency_ms: float = 0.0
@dataclass
class AgentRun:
run_id: str
input: str
output: Optional[str] = None
steps: int = 0
tool_calls: List[ToolCall] = field(default_factory=list)
total_tokens: int = 0
total_cost_usd: float = 0.0
latency_ms: float = 0.0
status: str = 'running' # running|complete|failed|timeout
error: Optional[str] = None
# ---- Interfaces ----
class LLMBackend(ABC):
@abstractmethod
async def complete(self, messages: List[Message], tools: List[Dict]) -> Message:
pass
@abstractmethod
def count_tokens(self, messages: List[Message]) -> int:
pass
class ToolBackend(ABC):
@abstractmethod
async def execute(self, call: ToolCall) -> ToolCall:
pass
class MemoryBackend(ABC):
@abstractmethod
async def load(self, run_id: str) -> List[Message]:
pass
@abstractmethod
async def save(self, run_id: str, messages: List[Message]) -> None:
pass
class ObservabilityBackend(ABC):
@abstractmethod
def record_run(self, run: AgentRun) -> None:
pass
@abstractmethod
def record_tool_call(self, call: ToolCall, run_id: str) -> None:
pass
# ---- Production Agent ----
class ProductionAgent:
def __init__(
self,
llm: LLMBackend,
tools: List[Dict],
tool_backend: ToolBackend,
memory: MemoryBackend,
observability: ObservabilityBackend,
config: Dict
):
# TODO: Initialize with all backends and config
# Config keys: max_steps, max_tokens, max_cost_usd, timeout_s,
# retry_attempts, compression_threshold
pass
async def run(self, user_input: str, run_id: str = None) -> AgentRun:
# TODO: Full production agent loop
# 1. Load memory
# 2. Build messages with system prompt
# 3. Agent loop: LLM → parse → tool call or finish
# 4. Apply guardrails at each step
# 5. Compress memory if needed
# 6. Track all metrics
# 7. Save memory
# 8. Record to observability
# 9. Return AgentRun
pass
async def _execute_tool(self, call: ToolCall) -> ToolCall:
# TODO: Execute with retry, timeout, idempotency
pass
async def _check_budgets(self, run: AgentRun) -> Optional[str]:
# TODO: Check step/token/cost/time budgets
# Return violation string or None
pass
def _build_system_prompt(self, tools: List[Dict]) -> str:
pass
async def _compress_if_needed(self, messages: List[Message]) -> List[Message]:
pass
Python3
ReadyLines: 1Characters: 0
Ready