Implement Coordinator Agent Pattern

Medium
Agents

Coordinator Agent Pattern

A coordinator manages a fleet of worker agents, routing tasks and aggregating results.

Task

Implement CoordinatorAgent that:

  1. Registers worker agents with capabilities and load limits.
  2. Assigns tasks to available agents based on capability + load.
  3. Aggregates results from multiple parallel agents.
  4. Rebalances tasks when agents are overloaded.

Constraints

  • An agent cannot exceed max_concurrent tasks.
  • Prefer least-loaded agent when multiple agents match.
  • Aggregation merges result dicts; conflicts resolved by timestamp (latest wins).

Examples

Example 1:
Input: coord.register_worker(AgentSpec('coder', ['code'], max_concurrent=2)) coord.assign_task({'type': 'code', 'content': 'write sort'})
Output: 'coder'
Explanation: Only 'coder' has 'code' capability and is under load limit.

Starter Code

from typing import List, Dict, Any, Optional
from dataclasses import dataclass

@dataclass
class AgentSpec:
    agent_id: str
    capabilities: List[str]
    max_concurrent: int = 1
    current_load: int = 0

class CoordinatorAgent:
    def __init__(self, llm_fn: callable):
        self.llm_fn = llm_fn
        self.agents: Dict[str, AgentSpec] = {}
        self.task_queue: list = []
        self.results: Dict[str, Any] = {}

    def register_worker(self, spec: AgentSpec) -> None:
        pass

    def assign_task(self, task: Dict) -> Optional[str]:
        # TODO: Select best available agent for task
        # Return agent_id or None if no agent available
        pass

    def aggregate_results(self, task_results: List[Dict]) -> Dict:
        # TODO: Merge results from multiple agents
        pass

    def rebalance(self) -> None:
        # TODO: Redistribute tasks from overloaded agents
        pass
Lines: 1Characters: 0
Ready
The AI Interview - Master AI/ML Interviews