Implement Parallel Agent Execution

Medium
Agents

Parallel Agent Execution

Executing independent tasks in parallel dramatically reduces total latency.

Task

Implement ParallelAgentExecutor using asyncio that:

  1. Executes tasks concurrently with a semaphore to limit concurrency.
  2. Times out individual tasks.
  3. Supports primary + fallback pattern.
  4. Returns all results preserving task order.

Constraints

  • Never exceed max_concurrent simultaneous executions.
  • timeout enforced per task; return error result on timeout.
  • Fallback only executed if primary fails or times out.
  • Results ordered by original task order, not completion order.

Examples

Example 1:
Input: tasks = [AgentTask('t1', lambda: 'a', {}), AgentTask('t2', lambda: 'b', {})] results = asyncio.run(executor.execute_all(tasks)) [r.result for r in results]
Output: ['a', 'b']
Explanation: Both tasks run in parallel, results returned in order.

Starter Code

import asyncio
from typing import List, Dict, Any, Callable, Optional
from dataclasses import dataclass
import time

@dataclass
class AgentTask:
    task_id: str
    agent_fn: Callable
    args: Dict
    timeout: float = 30.0
    priority: int = 5

@dataclass  
class AgentResult:
    task_id: str
    result: Any
    error: Optional[str]
    duration_ms: float
    agent_fn_name: str

class ParallelAgentExecutor:
    def __init__(self, max_concurrent: int = 5):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def execute_task(self, task: AgentTask) -> AgentResult:
        pass

    async def execute_all(self, tasks: List[AgentTask]) -> List[AgentResult]:
        # TODO: Execute all tasks in parallel with semaphore control
        pass

    async def execute_with_fallback(self, primary: AgentTask, fallback: AgentTask) -> AgentResult:
        # TODO: Try primary, fall back on failure
        pass
Lines: 1Characters: 0
Ready
The AI Interview - Master AI/ML Interviews