Latency Optimization for Multi-Step Agent Pipelines

Hard
Agents

Latency Optimization for Agent Pipelines

Multi-step agent pipelines are often slow. This question focuses on maximizing parallelism and caching.

Task

Implement LatencyOptimizer that:

  1. Analyzes a DAG of pipeline steps to identify parallelism opportunities.
  2. Computes the critical path (bottleneck sequence).
  3. Executes steps in parallel groups with dependency ordering.
  4. Caches cacheable step results with TTL.
  5. Supports speculative execution for probable inputs.
  6. Reports latency gains vs sequential baseline.

Non-Functional Requirements

  • Achieve at least 50% latency reduction on non-critical-path steps.
  • Cache hit should add <1ms overhead.
  • Speculative results must be invalidated if actual input differs.

Constraints

  • Step dependencies form a DAG (no cycles).
  • Cache key: (step_id, sorted_args_hash).
  • Critical path: sequence with maximum total estimated latency.

Examples

Example 1:
Input: steps = [ PipelineStep('search', search_fn, [], estimated_latency_ms=300), PipelineStep('fetch', fetch_fn, [], estimated_latency_ms=200), PipelineStep('analyze', analyze_fn, ['search','fetch'], estimated_latency_ms=400) ] optimizer.analyze_dag(steps)
Output: ExecutionPlan(parallel_groups=[[search,fetch],[analyze]], estimated_total_ms=700, critical_path=['search','analyze'])
Explanation: Search and fetch run in parallel (300ms), then analyze runs (400ms). Critical path: search→analyze=700ms.

Starter Code

import asyncio
from typing import List, Dict, Any, Optional, Callable, Tuple
from dataclasses import dataclass, field
import time
from functools import lru_cache

@dataclass
class PipelineStep:
    step_id: str
    fn: Callable
    dependencies: List[str] = field(default_factory=list)
    is_llm_call: bool = False
    estimated_latency_ms: float = 100.0
    cacheable: bool = False
    cache_ttl: float = 300.0  # seconds

@dataclass
class ExecutionPlan:
    parallel_groups: List[List[PipelineStep]]
    estimated_total_ms: float
    critical_path: List[str]

class LatencyOptimizer:
    def __init__(self):
        self._cache: Dict[str, Tuple[Any, float]] = {}
        self.execution_history: List[Dict] = []

    def analyze_dag(self, steps: List[PipelineStep]) -> ExecutionPlan:
        # TODO: Build execution plan with parallel groups
        # Identify critical path (longest dependency chain)
        pass

    async def execute_optimized(self, steps: List[PipelineStep], inputs: Dict) -> Dict:
        # TODO: Execute in parallel groups, pass results forward
        pass

    def _get_cached(self, step_id: str, args: Dict) -> Optional[Any]:
        pass

    def _set_cached(self, step_id: str, args: Dict, result: Any, ttl: float) -> None:
        pass

    def speculative_execute(self, step: PipelineStep, probable_inputs: List[Dict]) -> Dict:
        # TODO: Pre-execute step for most likely inputs
        pass

    def report_optimization_gains(self) -> Dict:
        # Compare sequential vs optimized execution times
        pass
Lines: 1Characters: 0
Ready
The AI Interview - Master AI/ML Interviews