Scalable Multi-Agent Message Router
As multi-agent systems scale, a dedicated message router becomes essential for performance and reliability.
Task
Build a MessageRouter that:
- Routes messages based on configurable rules with pattern matching.
- Supports 4 load balancing strategies: round-robin, least-loaded, random, sticky.
- Handles queue overflow by routing to dead letter queue.
- Tracks detailed routing metrics.
- Supports message transformation via rule-level functions.
Non-Functional Requirements
- Route 10,000 messages/second in Python (asyncio).
- Pattern matching O(R×M) where R=rules, M=message.
- Sticky routing: same session always goes to same agent.
- Dead letter queue when all target agents' queues are full.
Constraints
- Pattern supports: exact,
'prefix.*'wildcard. - Queue overflow threshold:
max_queue_size. - Metrics updated on every route operation.
Examples
Example 1:
Input:
router.add_rule(RoutingRule('r1', 'task.*', ['agent_a', 'agent_b'], 'round_robin'))
await router.route({'type': 'task.code', 'payload': 'write sort'})Output:
True (message routed to agent_a first, then agent_b on next call)Explanation: Pattern 'task.*' matches 'task.code'; round-robin selects agent_a.
Starter Code
import asyncio
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from collections import defaultdict
import time
import hashlib
@dataclass
class RoutingRule:
rule_id: str
pattern: str # Message type pattern ('task.*', 'result.code.*')
target_agents: List[str]
load_balance: str = 'round_robin' # round_robin | least_loaded | random | sticky
priority_boost: int = 0
transform_fn: Optional[Callable] = None
@dataclass
class RouterMetrics:
messages_routed: int = 0
messages_dropped: int = 0
avg_routing_latency_ms: float = 0.0
rule_hit_counts: Dict[str, int] = field(default_factory=dict)
agent_load: Dict[str, int] = field(default_factory=dict)
class MessageRouter:
def __init__(self, max_queue_size: int = 10000):
self.rules: List[RoutingRule] = []
self.agent_queues: Dict[str, asyncio.Queue] = {}
self.agent_counters: Dict[str, int] = defaultdict(int) # for round-robin
self.metrics = RouterMetrics()
self.max_queue_size = max_queue_size
self.dead_letter_queue: List[Dict] = []
self._sticky_map: Dict[str, str] = {} # session_id -> agent_id
def add_rule(self, rule: RoutingRule) -> None:
pass
def register_agent(self, agent_id: str) -> None:
pass
async def route(self, message: Dict) -> bool:
# TODO: Match rules, apply load balancing, enqueue
pass
def _match_pattern(self, pattern: str, msg_type: str) -> bool:
# Support: exact match, prefix wildcard (e.g. 'task.*')
pass
def _select_target(self, rule: RoutingRule, message: Dict) -> Optional[str]:
# TODO: Apply load balancing strategy
pass
async def consume(self, agent_id: str) -> Optional[Dict]:
# TODO: Return next message for agent
pass
def get_metrics(self) -> RouterMetrics:
pass
Python3
ReadyLines: 1Characters: 0
Ready