Design a Distributed Multi-Agent System with Fault Tolerance

Hard
Agents

System Design: Distributed Multi-Agent System

This is a full system design question mimicking a senior engineer interview at an AI company.

Task

Design a distributed agent cluster that:

  1. Routes tasks to healthy nodes with required capabilities.
  2. Monitors node health via heartbeats.
  3. Automatically recovers failed nodes and reassigns tasks.
  4. Checkpoints long-running tasks for crash recovery.
  5. Maintains a dead letter queue for permanently failed tasks.

Non-Functional Requirements

  • Support 1000+ concurrent tasks.
  • Node failure detection within 3× heartbeat interval.
  • Task reassignment within 5s of node failure detection.
  • Zero task loss (checkpoint + DLQ).
  • Exactly-once task execution for idempotent tasks.

Constraints

  • Priority queue: lower number = higher priority.
  • A node is 'failed' after missing failure_threshold heartbeats.
  • Tasks exceeding max_attempts go to dead letter queue.

Examples

Example 1:
Input: cluster.register_node(AgentNode('n1', 'host1', 8000, ['code', 'search'])) cluster.submit_task(DistributedTask('t1', {'prompt': 'write sort'}, 'code'))
Output: 't1' (task assigned to n1, executed, result stored)
Explanation: n1 has 'code' capability and is healthy; task assigned and executed.

Starter Code

import asyncio
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
import time

class NodeStatus(Enum):
    HEALTHY = 'healthy'
    DEGRADED = 'degraded'
    FAILED = 'failed'
    RECOVERING = 'recovering'

@dataclass
class AgentNode:
    node_id: str
    host: str
    port: int
    capabilities: List[str]
    status: NodeStatus = NodeStatus.HEALTHY
    last_heartbeat: float = field(default_factory=time.time)
    current_tasks: int = 0
    max_tasks: int = 10
    failure_count: int = 0

@dataclass
class DistributedTask:
    task_id: str
    payload: Dict
    required_capability: str
    priority: int = 5
    attempts: int = 0
    max_attempts: int = 3
    checkpointed_state: Optional[Dict] = None

class DistributedAgentCluster:
    def __init__(self, heartbeat_interval: float = 5.0, failure_threshold: int = 3):
        self.nodes: Dict[str, AgentNode] = {}
        self.task_queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
        self.heartbeat_interval = heartbeat_interval
        self.failure_threshold = failure_threshold
        self.completed_tasks: Dict[str, Any] = {}
        self.dead_letter_queue: List[DistributedTask] = []

    async def register_node(self, node: AgentNode) -> None:
        pass

    async def submit_task(self, task: DistributedTask) -> str:
        # TODO: Add to priority queue, return task_id
        pass

    async def dispatch_loop(self) -> None:
        # TODO: Continuously dispatch tasks to healthy nodes
        pass

    async def heartbeat_monitor(self) -> None:
        # TODO: Monitor node health, mark failed nodes
        pass

    async def recover_node(self, node_id: str) -> bool:
        # TODO: Attempt node recovery, reassign its tasks
        pass

    async def rebalance(self) -> None:
        # TODO: Redistribute tasks from overloaded/failed nodes
        pass

    def _select_node(self, capability: str) -> Optional[AgentNode]:
        # TODO: Least-loaded healthy node with required capability
        pass

    async def checkpoint_task(self, task: DistributedTask, state: Dict) -> None:
        # TODO: Save task state for fault recovery
        pass
Lines: 1Characters: 0
Ready
The AI Interview - Master AI/ML Interviews