Parallel Agent Execution
Executing independent tasks in parallel dramatically reduces total latency.
Task
Implement ParallelAgentExecutor using asyncio that:
- Executes tasks concurrently with a semaphore to limit concurrency.
- Times out individual tasks.
- Supports primary + fallback pattern.
- Returns all results preserving task order.
Constraints
- Never exceed
max_concurrentsimultaneous executions. timeoutenforced per task; return error result on timeout.- Fallback only executed if primary fails or times out.
- Results ordered by original task order, not completion order.
Examples
Example 1:
Input:
tasks = [AgentTask('t1', lambda: 'a', {}), AgentTask('t2', lambda: 'b', {})]
results = asyncio.run(executor.execute_all(tasks))
[r.result for r in results]Output:
['a', 'b']Explanation: Both tasks run in parallel, results returned in order.
Starter Code
import asyncio
from typing import List, Dict, Any, Callable, Optional
from dataclasses import dataclass
import time
@dataclass
class AgentTask:
task_id: str
agent_fn: Callable
args: Dict
timeout: float = 30.0
priority: int = 5
@dataclass
class AgentResult:
task_id: str
result: Any
error: Optional[str]
duration_ms: float
agent_fn_name: str
class ParallelAgentExecutor:
def __init__(self, max_concurrent: int = 5):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def execute_task(self, task: AgentTask) -> AgentResult:
pass
async def execute_all(self, tasks: List[AgentTask]) -> List[AgentResult]:
# TODO: Execute all tasks in parallel with semaphore control
pass
async def execute_with_fallback(self, primary: AgentTask, fallback: AgentTask) -> AgentResult:
# TODO: Try primary, fall back on failure
pass
Python3
ReadyLines: 1Characters: 0
Ready