Latency Optimization for Agent Pipelines
Multi-step agent pipelines are often slow. This question focuses on maximizing parallelism and caching.
Task
Implement LatencyOptimizer that:
- Analyzes a DAG of pipeline steps to identify parallelism opportunities.
- Computes the critical path (bottleneck sequence).
- Executes steps in parallel groups with dependency ordering.
- Caches cacheable step results with TTL.
- Supports speculative execution for probable inputs.
- Reports latency gains vs sequential baseline.
Non-Functional Requirements
- Achieve at least 50% latency reduction on non-critical-path steps.
- Cache hit should add <1ms overhead.
- Speculative results must be invalidated if actual input differs.
Constraints
- Step dependencies form a DAG (no cycles).
- Cache key: (step_id, sorted_args_hash).
- Critical path: sequence with maximum total estimated latency.
Examples
Example 1:
Input:
steps = [
PipelineStep('search', search_fn, [], estimated_latency_ms=300),
PipelineStep('fetch', fetch_fn, [], estimated_latency_ms=200),
PipelineStep('analyze', analyze_fn, ['search','fetch'], estimated_latency_ms=400)
]
optimizer.analyze_dag(steps)Output:
ExecutionPlan(parallel_groups=[[search,fetch],[analyze]], estimated_total_ms=700, critical_path=['search','analyze'])Explanation: Search and fetch run in parallel (300ms), then analyze runs (400ms). Critical path: search→analyze=700ms.
Starter Code
import asyncio
from typing import List, Dict, Any, Optional, Callable, Tuple
from dataclasses import dataclass, field
import time
from functools import lru_cache
@dataclass
class PipelineStep:
step_id: str
fn: Callable
dependencies: List[str] = field(default_factory=list)
is_llm_call: bool = False
estimated_latency_ms: float = 100.0
cacheable: bool = False
cache_ttl: float = 300.0 # seconds
@dataclass
class ExecutionPlan:
parallel_groups: List[List[PipelineStep]]
estimated_total_ms: float
critical_path: List[str]
class LatencyOptimizer:
def __init__(self):
self._cache: Dict[str, Tuple[Any, float]] = {}
self.execution_history: List[Dict] = []
def analyze_dag(self, steps: List[PipelineStep]) -> ExecutionPlan:
# TODO: Build execution plan with parallel groups
# Identify critical path (longest dependency chain)
pass
async def execute_optimized(self, steps: List[PipelineStep], inputs: Dict) -> Dict:
# TODO: Execute in parallel groups, pass results forward
pass
def _get_cached(self, step_id: str, args: Dict) -> Optional[Any]:
pass
def _set_cached(self, step_id: str, args: Dict, result: Any, ttl: float) -> None:
pass
def speculative_execute(self, step: PipelineStep, probable_inputs: List[Dict]) -> Dict:
# TODO: Pre-execute step for most likely inputs
pass
def report_optimization_gains(self) -> Dict:
# Compare sequential vs optimized execution times
pass
Python3
ReadyLines: 1Characters: 0
Ready