System Design: Distributed Multi-Agent System
This is a full system design question mimicking a senior engineer interview at an AI company.
Task
Design a distributed agent cluster that:
- Routes tasks to healthy nodes with required capabilities.
- Monitors node health via heartbeats.
- Automatically recovers failed nodes and reassigns tasks.
- Checkpoints long-running tasks for crash recovery.
- Maintains a dead letter queue for permanently failed tasks.
Non-Functional Requirements
- Support 1000+ concurrent tasks.
- Node failure detection within 3× heartbeat interval.
- Task reassignment within 5s of node failure detection.
- Zero task loss (checkpoint + DLQ).
- Exactly-once task execution for idempotent tasks.
Constraints
- Priority queue: lower number = higher priority.
- A node is 'failed' after missing
failure_thresholdheartbeats. - Tasks exceeding
max_attemptsgo to dead letter queue.
Examples
Example 1:
Input:
cluster.register_node(AgentNode('n1', 'host1', 8000, ['code', 'search']))
cluster.submit_task(DistributedTask('t1', {'prompt': 'write sort'}, 'code'))Output:
't1' (task assigned to n1, executed, result stored)Explanation: n1 has 'code' capability and is healthy; task assigned and executed.
Starter Code
import asyncio
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
import time
class NodeStatus(Enum):
HEALTHY = 'healthy'
DEGRADED = 'degraded'
FAILED = 'failed'
RECOVERING = 'recovering'
@dataclass
class AgentNode:
node_id: str
host: str
port: int
capabilities: List[str]
status: NodeStatus = NodeStatus.HEALTHY
last_heartbeat: float = field(default_factory=time.time)
current_tasks: int = 0
max_tasks: int = 10
failure_count: int = 0
@dataclass
class DistributedTask:
task_id: str
payload: Dict
required_capability: str
priority: int = 5
attempts: int = 0
max_attempts: int = 3
checkpointed_state: Optional[Dict] = None
class DistributedAgentCluster:
def __init__(self, heartbeat_interval: float = 5.0, failure_threshold: int = 3):
self.nodes: Dict[str, AgentNode] = {}
self.task_queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
self.heartbeat_interval = heartbeat_interval
self.failure_threshold = failure_threshold
self.completed_tasks: Dict[str, Any] = {}
self.dead_letter_queue: List[DistributedTask] = []
async def register_node(self, node: AgentNode) -> None:
pass
async def submit_task(self, task: DistributedTask) -> str:
# TODO: Add to priority queue, return task_id
pass
async def dispatch_loop(self) -> None:
# TODO: Continuously dispatch tasks to healthy nodes
pass
async def heartbeat_monitor(self) -> None:
# TODO: Monitor node health, mark failed nodes
pass
async def recover_node(self, node_id: str) -> bool:
# TODO: Attempt node recovery, reassign its tasks
pass
async def rebalance(self) -> None:
# TODO: Redistribute tasks from overloaded/failed nodes
pass
def _select_node(self, capability: str) -> Optional[AgentNode]:
# TODO: Least-loaded healthy node with required capability
pass
async def checkpoint_task(self, task: DistributedTask, state: Dict) -> None:
# TODO: Save task state for fault recovery
pass
Python3
ReadyLines: 1Characters: 0
Ready