Auto-Scaling Agent Worker Pool
Bursty agent workloads require dynamic worker management to balance cost and latency.
Task
Build DynamicWorkerPool that:
- Starts with min_workers and scales up when queue depth > scale_up_threshold.
- Scales down to min_workers when queue nearly empty.
- Enforces cooldown between scale events to prevent thrashing.
- Tracks per-worker stats.
- Supports graceful shutdown (drain queue first).
Constraints
- Scale one worker at a time.
- Never exceed max_workers or go below min_workers.
- Cooldown: minimum seconds between any scale events.
- Idle workers removed first during scale-down.
Examples
Example 1:
Input:
pool = DynamicWorkerPool(agent_fn, min_workers=2, max_workers=10, scale_up_threshold=5)
for i in range(20): await pool.submit({'task': i})Output:
pool.get_stats()['current_workers'] >= 3Explanation: Queue exceeded threshold; additional worker spawned.
Starter Code
import asyncio
from typing import Dict, List, Any, Callable, Optional
from dataclasses import dataclass, field
import time, uuid
@dataclass
class WorkerStats:
worker_id: str
tasks_completed: int = 0
tasks_failed: int = 0
avg_latency_ms: float = 0.0
created_at: float = field(default_factory=time.time)
class DynamicWorkerPool:
def __init__(
self,
worker_fn: Callable,
min_workers: int = 1,
max_workers: int = 20,
scale_up_threshold: int = 10,
scale_down_threshold: int = 2,
scale_cooldown_s: float = 30.0
):
self.worker_fn = worker_fn
self.min_workers = min_workers
self.max_workers = max_workers
self.scale_up_threshold = scale_up_threshold
self.scale_down_threshold = scale_down_threshold
self.scale_cooldown_s = scale_cooldown_s
self.workers: Dict[str, WorkerStats] = {}
self.task_queue: asyncio.Queue = asyncio.Queue()
self.last_scale_time: float = 0.0
async def submit(self, task: Dict) -> None:
pass
async def _autoscale(self) -> None:
pass
async def _spawn_worker(self) -> str:
pass
async def _terminate_worker(self, worker_id: str) -> None:
pass
def get_stats(self) -> Dict:
return {'current_workers': len(self.workers), 'queue_depth': self.task_queue.qsize()}
Python3
ReadyLines: 1Characters: 0
Ready