Implement Scalable Multi-Agent Message Router

Hard
Agents

Scalable Multi-Agent Message Router

As multi-agent systems scale, a dedicated message router becomes essential for performance and reliability.

Task

Build a MessageRouter that:

  1. Routes messages based on configurable rules with pattern matching.
  2. Supports 4 load balancing strategies: round-robin, least-loaded, random, sticky.
  3. Handles queue overflow by routing to dead letter queue.
  4. Tracks detailed routing metrics.
  5. Supports message transformation via rule-level functions.

Non-Functional Requirements

  • Route 10,000 messages/second in Python (asyncio).
  • Pattern matching O(R×M) where R=rules, M=message.
  • Sticky routing: same session always goes to same agent.
  • Dead letter queue when all target agents' queues are full.

Constraints

  • Pattern supports: exact, 'prefix.*' wildcard.
  • Queue overflow threshold: max_queue_size.
  • Metrics updated on every route operation.

Examples

Example 1:
Input: router.add_rule(RoutingRule('r1', 'task.*', ['agent_a', 'agent_b'], 'round_robin')) await router.route({'type': 'task.code', 'payload': 'write sort'})
Output: True (message routed to agent_a first, then agent_b on next call)
Explanation: Pattern 'task.*' matches 'task.code'; round-robin selects agent_a.

Starter Code

import asyncio
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from collections import defaultdict
import time
import hashlib

@dataclass
class RoutingRule:
    rule_id: str
    pattern: str           # Message type pattern ('task.*', 'result.code.*')
    target_agents: List[str]
    load_balance: str = 'round_robin'  # round_robin | least_loaded | random | sticky
    priority_boost: int = 0
    transform_fn: Optional[Callable] = None

@dataclass
class RouterMetrics:
    messages_routed: int = 0
    messages_dropped: int = 0
    avg_routing_latency_ms: float = 0.0
    rule_hit_counts: Dict[str, int] = field(default_factory=dict)
    agent_load: Dict[str, int] = field(default_factory=dict)

class MessageRouter:
    def __init__(self, max_queue_size: int = 10000):
        self.rules: List[RoutingRule] = []
        self.agent_queues: Dict[str, asyncio.Queue] = {}
        self.agent_counters: Dict[str, int] = defaultdict(int)  # for round-robin
        self.metrics = RouterMetrics()
        self.max_queue_size = max_queue_size
        self.dead_letter_queue: List[Dict] = []
        self._sticky_map: Dict[str, str] = {}  # session_id -> agent_id

    def add_rule(self, rule: RoutingRule) -> None:
        pass

    def register_agent(self, agent_id: str) -> None:
        pass

    async def route(self, message: Dict) -> bool:
        # TODO: Match rules, apply load balancing, enqueue
        pass

    def _match_pattern(self, pattern: str, msg_type: str) -> bool:
        # Support: exact match, prefix wildcard (e.g. 'task.*')
        pass

    def _select_target(self, rule: RoutingRule, message: Dict) -> Optional[str]:
        # TODO: Apply load balancing strategy
        pass

    async def consume(self, agent_id: str) -> Optional[Dict]:
        # TODO: Return next message for agent
        pass

    def get_metrics(self) -> RouterMetrics:
        pass
Lines: 1Characters: 0
Ready
The AI Interview - Master AI/ML Interviews