Parallel Agent Executor

Medium
Agents

Implement parallel agent execution with dependencies:

  1. execute_parallel(agents, dependencies): Run agents in parallel
    • Only execute agent when all dependencies completed
    • Use ThreadPoolExecutor for concurrency
    • Handle exceptions gracefully (store exception object)
  2. execute_with_timeout(func, args, timeout): Run with timeout

Dependency Format: {'agent_a': [], 'agent_b': ['agent_a'], 'agent_c': ['agent_a']}

Execution Order:

  • agent_a runs first (no deps)
  • agent_b and agent_c run in parallel after agent_a completes

Return: Dict mapping agent_id to result or exception object if failed

Examples

Example 1:
Input: pe = ParallelAgentExecutor(2); agents = [{'id':'a','func':lambda:1,'args':()}]; pe.execute_parallel(agents)
Output: {'a': 1}
Explanation: Single agent executes and returns result

Starter Code

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

class ParallelAgentExecutor:
    """
    Execute multiple agents in parallel with dependency management.
    """
    
    def __init__(self, max_workers=4):
        self.max_workers = max_workers
        self.results = {}
    
    def execute_parallel(self, agents, dependencies=None):
        """
        Execute agents respecting dependencies.
        
        agents: list of {'id': str, 'func': callable, 'args': tuple}
        dependencies: dict of {agent_id: [list of dependency ids]}
        
        Returns: dict of {agent_id: result or exception}
        """
        # Your implementation here
        pass
    
    def _can_execute(self, agent_id, dependencies, completed):
        """Check if all dependencies are satisfied"""
        # Your implementation here
        pass
    
    def execute_with_timeout(self, func, args, timeout_sec):
        """Execute function with timeout"""
        # Your implementation here
        pass
Lines: 1Characters: 0
Ready
The AI Interview - Master AI/ML Interviews