Distributed Multi-Agent Coordinator

Hard
Agents

Implement distributed agent coordination:

System Model:

  • Multiple nodes, each running agents
  • Coordinator on each node
  • Tasks distributed via consistent hashing

Methods:

  1. register_local_agent(agent_id, capabilities): Local registration
  2. assign_task_distributed(task, requirements): Route task
    • Use consistent hashing on task_id
    • Check node health
    • Return assignment info
  3. _consistent_hash(key, nodes): Hash to node mapping
  4. handle_node_failure(failed_node): Rebalance tasks
  5. collect_results(task_id): Gather distributed results

Consistent Hashing: Hash(key) -> find next node in hash ring

Failure Handling:

  • Heartbeat monitoring
  • Automatic failover
  • Task reassignment

Examples

Example 1:
Input: coord = DistributedAgentCoordinator('node1', ['node1', 'node2']); coord.register_local_agent('agent1', ['math']); 'agent1' in coord.local_agents
Output: True
Explanation: Agent registered locally

Starter Code

import hashlib
import time

class DistributedAgentCoordinator:
    """
    Coordinator for distributed multi-agent system across nodes.
    """
    
    def __init__(self, node_id, all_nodes):
        self.node_id = node_id
        self.all_nodes = all_nodes
        self.local_agents = {}
        self.task_assignments = {}
        self.heartbeat_table = {}
    
    def register_local_agent(self, agent_id, capabilities):
        """Register agent on this node"""
        # Your implementation here
        pass
    
    def assign_task_distributed(self, task, requirements):
        """
        Assign task to best node using consistent hashing.
        Returns {'assigned_node': ..., 'agent_id': ..., 'local': bool}
        """
        # Your implementation here
        pass
    
    def _consistent_hash(self, key, nodes):
        """Consistent hashing for node selection"""
        # Your implementation here
        pass
    
    def handle_node_failure(self, failed_node):
        """Reassign tasks from failed node"""
        # Your implementation here
        pass
    
    def collect_results(self, task_id):
        """Collect results from potentially multiple nodes"""
        # Your implementation here
        pass
Lines: 1Characters: 0
Ready
The AI Interview - Master AI/ML Interviews