Agent Failure Recovery and Checkpointing
Long-running agents crash. Production systems must recover without starting over.
Task
Implement a checkpoint and recovery system:
- Save agent state every N steps to
CheckpointStore. - On failure, load the latest valid checkpoint.
- Resume execution from checkpoint (not from step 0).
- Validate checkpoint integrity via checksum.
- Handle corrupt checkpoints by falling back to earlier ones.
Non-Functional Requirements
- Checkpoint overhead < 50ms per save.
- Recovery time < 2s from crash to resumption.
- Zero data re-processing after recovery.
- Support for both in-memory and persistent backends.
Constraints
- Checksum must be verified before restoring.
- If all checkpoints corrupt, restart from step 0.
- Tool results must be preserved in checkpoint (avoid re-execution of side-effecting tools).
Examples
Example 1:
Input:
# Simulate crash at step 5, recover from step 3 checkpoint
result = await agent.run_with_recovery('long task', 'run_001')
result['recovered_from_step']Output:
3Explanation: Step 3 was last valid checkpoint; steps 4-5 re-executed from there.
Starter Code
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field
from datetime import datetime
import json
import hashlib
import copy
@dataclass
class Checkpoint:
checkpoint_id: str
run_id: str
step_number: int
state: Dict
messages: List[Dict]
tool_results: Dict
timestamp: str
is_valid: bool = True
checksum: str = ''
def __post_init__(self):
if not self.checksum:
self.checksum = self._compute_checksum()
def _compute_checksum(self) -> str:
data = json.dumps({'state': self.state, 'step': self.step_number}, sort_keys=True)
return hashlib.sha256(data.encode()).hexdigest()[:16]
class CheckpointStore:
def __init__(self, storage_backend=None):
self._data: Dict[str, List[Checkpoint]] = {} # run_id -> checkpoints
def save(self, checkpoint: Checkpoint) -> None:
pass
def load_latest(self, run_id: str) -> Optional[Checkpoint]:
pass
def load_at_step(self, run_id: str, step: int) -> Optional[Checkpoint]:
pass
def validate(self, checkpoint: Checkpoint) -> bool:
pass
class RecoverableAgent:
def __init__(self, agent_fn: callable, checkpoint_store: CheckpointStore,
checkpoint_every_n: int = 3):
self.agent_fn = agent_fn
self.store = checkpoint_store
self.checkpoint_every_n = checkpoint_every_n
self.recovery_count = 0
async def run_with_recovery(self, task: str, run_id: str) -> Dict:
# TODO: Run agent with checkpointing and crash recovery
pass
async def _run_from_checkpoint(self, checkpoint: Checkpoint, task: str) -> Dict:
# TODO: Resume execution from checkpoint
pass
def _should_checkpoint(self, step: int) -> bool:
pass
Python3
ReadyLines: 1Characters: 0
Ready