Design a Production-Ready AI Agent System

Hard
Agents

System Design: Production-Ready AI Agent

This question simulates a real system design interview at an AI lab.

Task

Design and implement a production-ready AI agent with:

  1. Pluggable backends: LLM, tools, memory, observability.
  2. Full agent loop: Load memory → LLM → parse → tool call → observe → repeat.
  3. Budget control: Max steps, tokens, cost (USD), time.
  4. Reliability: Retry logic, timeout handling, graceful degradation.
  5. Memory: Load/save per run, compression on overflow.
  6. Observability: Every run and tool call recorded.
  7. Guardrails: Output validation at each step.

Non-Functional Requirements

  • P95 latency < 30s for 10-step tasks.
  • Cost tracking accurate to $0.001.
  • Zero data loss on agent crash (persist state after each step).
  • Thread-safe for concurrent runs.

Constraints

  • All I/O operations must be async.
  • Tool calls must be idempotent or explicitly marked non-idempotent.
  • Memory must compress before exceeding compression_threshold tokens.

Examples

Example 1:
Input: run = await agent.run('What is the weather in Paris?', run_id='run_001')
Output: AgentRun(run_id='run_001', output='The weather in Paris is ...', steps=2, tool_calls=[...], total_cost_usd=0.002, status='complete')
Explanation: Agent calls weather tool, gets result, returns grounded answer with full telemetry.

Starter Code

from typing import Any, Dict, List, Optional, Callable
from abc import ABC, abstractmethod
import asyncio
import time
import logging
import json
from dataclasses import dataclass, field
from enum import Enum

# ---- Core Types ----
class MessageRole(Enum):
    SYSTEM = 'system'
    USER = 'user'
    ASSISTANT = 'assistant'
    TOOL = 'tool'

@dataclass
class Message:
    role: MessageRole
    content: str
    metadata: Dict = field(default_factory=dict)

@dataclass
class ToolCall:
    tool_name: str
    args: Dict
    call_id: str
    result: Optional[Any] = None
    error: Optional[str] = None
    latency_ms: float = 0.0

@dataclass
class AgentRun:
    run_id: str
    input: str
    output: Optional[str] = None
    steps: int = 0
    tool_calls: List[ToolCall] = field(default_factory=list)
    total_tokens: int = 0
    total_cost_usd: float = 0.0
    latency_ms: float = 0.0
    status: str = 'running'  # running|complete|failed|timeout
    error: Optional[str] = None

# ---- Interfaces ----
class LLMBackend(ABC):
    @abstractmethod
    async def complete(self, messages: List[Message], tools: List[Dict]) -> Message:
        pass

    @abstractmethod
    def count_tokens(self, messages: List[Message]) -> int:
        pass

class ToolBackend(ABC):
    @abstractmethod
    async def execute(self, call: ToolCall) -> ToolCall:
        pass

class MemoryBackend(ABC):
    @abstractmethod
    async def load(self, run_id: str) -> List[Message]:
        pass

    @abstractmethod
    async def save(self, run_id: str, messages: List[Message]) -> None:
        pass

class ObservabilityBackend(ABC):
    @abstractmethod
    def record_run(self, run: AgentRun) -> None:
        pass

    @abstractmethod
    def record_tool_call(self, call: ToolCall, run_id: str) -> None:
        pass

# ---- Production Agent ----
class ProductionAgent:
    def __init__(
        self,
        llm: LLMBackend,
        tools: List[Dict],
        tool_backend: ToolBackend,
        memory: MemoryBackend,
        observability: ObservabilityBackend,
        config: Dict
    ):
        # TODO: Initialize with all backends and config
        # Config keys: max_steps, max_tokens, max_cost_usd, timeout_s,
        #              retry_attempts, compression_threshold
        pass

    async def run(self, user_input: str, run_id: str = None) -> AgentRun:
        # TODO: Full production agent loop
        # 1. Load memory
        # 2. Build messages with system prompt
        # 3. Agent loop: LLM → parse → tool call or finish
        # 4. Apply guardrails at each step
        # 5. Compress memory if needed
        # 6. Track all metrics
        # 7. Save memory
        # 8. Record to observability
        # 9. Return AgentRun
        pass

    async def _execute_tool(self, call: ToolCall) -> ToolCall:
        # TODO: Execute with retry, timeout, idempotency
        pass

    async def _check_budgets(self, run: AgentRun) -> Optional[str]:
        # TODO: Check step/token/cost/time budgets
        # Return violation string or None
        pass

    def _build_system_prompt(self, tools: List[Dict]) -> str:
        pass

    async def _compress_if_needed(self, messages: List[Message]) -> List[Message]:
        pass
Lines: 1Characters: 0
Ready
The AI Interview - Master AI/ML Interviews