Implement Failure Recovery and Checkpoint System

Hard
Agents

Agent Failure Recovery and Checkpointing

Long-running agents crash. Production systems must recover without starting over.

Task

Implement a checkpoint and recovery system:

  1. Save agent state every N steps to CheckpointStore.
  2. On failure, load the latest valid checkpoint.
  3. Resume execution from checkpoint (not from step 0).
  4. Validate checkpoint integrity via checksum.
  5. Handle corrupt checkpoints by falling back to earlier ones.

Non-Functional Requirements

  • Checkpoint overhead < 50ms per save.
  • Recovery time < 2s from crash to resumption.
  • Zero data re-processing after recovery.
  • Support for both in-memory and persistent backends.

Constraints

  • Checksum must be verified before restoring.
  • If all checkpoints corrupt, restart from step 0.
  • Tool results must be preserved in checkpoint (avoid re-execution of side-effecting tools).

Examples

Example 1:
Input: # Simulate crash at step 5, recover from step 3 checkpoint result = await agent.run_with_recovery('long task', 'run_001') result['recovered_from_step']
Output: 3
Explanation: Step 3 was last valid checkpoint; steps 4-5 re-executed from there.

Starter Code

from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
import json
import hashlib
import copy

@dataclass
class Checkpoint:
    checkpoint_id: str
    run_id: str
    step_number: int
    state: Dict
    messages: List[Dict]
    tool_results: Dict
    timestamp: str
    is_valid: bool = True
    checksum: str = ''

    def __post_init__(self):
        if not self.checksum:
            self.checksum = self._compute_checksum()

    def _compute_checksum(self) -> str:
        data = json.dumps({'state': self.state, 'step': self.step_number}, sort_keys=True)
        return hashlib.sha256(data.encode()).hexdigest()[:16]

class CheckpointStore:
    def __init__(self, storage_backend=None):
        self._data: Dict[str, List[Checkpoint]] = {}  # run_id -> checkpoints

    def save(self, checkpoint: Checkpoint) -> None:
        pass

    def load_latest(self, run_id: str) -> Optional[Checkpoint]:
        pass

    def load_at_step(self, run_id: str, step: int) -> Optional[Checkpoint]:
        pass

    def validate(self, checkpoint: Checkpoint) -> bool:
        pass

class RecoverableAgent:
    def __init__(self, agent_fn: callable, checkpoint_store: CheckpointStore,
                 checkpoint_every_n: int = 3):
        self.agent_fn = agent_fn
        self.store = checkpoint_store
        self.checkpoint_every_n = checkpoint_every_n
        self.recovery_count = 0

    async def run_with_recovery(self, task: str, run_id: str) -> Dict:
        # TODO: Run agent with checkpointing and crash recovery
        pass

    async def _run_from_checkpoint(self, checkpoint: Checkpoint, task: str) -> Dict:
        # TODO: Resume execution from checkpoint
        pass

    def _should_checkpoint(self, step: int) -> bool:
        pass
Lines: 1Characters: 0
Ready
The AI Interview - Master AI/ML Interviews