Coordinator Agent Pattern
A coordinator manages a fleet of worker agents, routing tasks and aggregating results.
Task
Implement CoordinatorAgent that:
- Registers worker agents with capabilities and load limits.
- Assigns tasks to available agents based on capability + load.
- Aggregates results from multiple parallel agents.
- Rebalances tasks when agents are overloaded.
Constraints
- An agent cannot exceed
max_concurrenttasks. - Prefer least-loaded agent when multiple agents match.
- Aggregation merges result dicts; conflicts resolved by timestamp (latest wins).
Examples
Example 1:
Input:
coord.register_worker(AgentSpec('coder', ['code'], max_concurrent=2))
coord.assign_task({'type': 'code', 'content': 'write sort'})Output:
'coder'Explanation: Only 'coder' has 'code' capability and is under load limit.
Starter Code
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
@dataclass
class AgentSpec:
agent_id: str
capabilities: List[str]
max_concurrent: int = 1
current_load: int = 0
class CoordinatorAgent:
def __init__(self, llm_fn: callable):
self.llm_fn = llm_fn
self.agents: Dict[str, AgentSpec] = {}
self.task_queue: list = []
self.results: Dict[str, Any] = {}
def register_worker(self, spec: AgentSpec) -> None:
pass
def assign_task(self, task: Dict) -> Optional[str]:
# TODO: Select best available agent for task
# Return agent_id or None if no agent available
pass
def aggregate_results(self, task_results: List[Dict]) -> Dict:
# TODO: Merge results from multiple agents
pass
def rebalance(self) -> None:
# TODO: Redistribute tasks from overloaded agents
pass
Python3
ReadyLines: 1Characters: 0
Ready