Auto-Scaling Agent Worker Pool

Hard
Agents

Auto-Scaling Agent Worker Pool

Bursty agent workloads require dynamic worker management to balance cost and latency.

Task

Build DynamicWorkerPool that:

  1. Starts with min_workers and scales up when queue depth > scale_up_threshold.
  2. Scales down to min_workers when queue nearly empty.
  3. Enforces cooldown between scale events to prevent thrashing.
  4. Tracks per-worker stats.
  5. Supports graceful shutdown (drain queue first).

Constraints

  • Scale one worker at a time.
  • Never exceed max_workers or go below min_workers.
  • Cooldown: minimum seconds between any scale events.
  • Idle workers removed first during scale-down.

Examples

Example 1:
Input: pool = DynamicWorkerPool(agent_fn, min_workers=2, max_workers=10, scale_up_threshold=5) for i in range(20): await pool.submit({'task': i})
Output: pool.get_stats()['current_workers'] >= 3
Explanation: Queue exceeded threshold; additional worker spawned.

Starter Code

import asyncio
from typing import Dict, List, Any, Callable, Optional
from dataclasses import dataclass, field
import time, uuid

@dataclass
class WorkerStats:
    worker_id: str
    tasks_completed: int = 0
    tasks_failed: int = 0
    avg_latency_ms: float = 0.0
    created_at: float = field(default_factory=time.time)

class DynamicWorkerPool:
    def __init__(
        self,
        worker_fn: Callable,
        min_workers: int = 1,
        max_workers: int = 20,
        scale_up_threshold: int = 10,
        scale_down_threshold: int = 2,
        scale_cooldown_s: float = 30.0
    ):
        self.worker_fn = worker_fn
        self.min_workers = min_workers
        self.max_workers = max_workers
        self.scale_up_threshold = scale_up_threshold
        self.scale_down_threshold = scale_down_threshold
        self.scale_cooldown_s = scale_cooldown_s
        self.workers: Dict[str, WorkerStats] = {}
        self.task_queue: asyncio.Queue = asyncio.Queue()
        self.last_scale_time: float = 0.0

    async def submit(self, task: Dict) -> None:
        pass

    async def _autoscale(self) -> None:
        pass

    async def _spawn_worker(self) -> str:
        pass

    async def _terminate_worker(self, worker_id: str) -> None:
        pass

    def get_stats(self) -> Dict:
        return {'current_workers': len(self.workers), 'queue_depth': self.task_queue.qsize()}
Lines: 1Characters: 0
Ready
The AI Interview - Master AI/ML Interviews