Implement circuit breaker for agent tool resilience:
States:
- CLOSED: Normal operation, calls pass through
- OPEN: Failure threshold reached, calls blocked immediately
- HALF_OPEN: Testing if service recovered (limited calls allowed)
Transitions:
- CLOSED -> OPEN: failure_count >= failure_threshold
- OPEN -> HALF_OPEN: now - last_failure > recovery_timeout
- HALF_OPEN -> CLOSED: half_open_calls_successful >= half_open_max_calls
- HALF_OPEN -> OPEN: Any failure in half-open
Methods:
can_execute(): Check if call allowed in current staterecord_success/failure(): Update statecall(func, ...): Execute with protection, returns {'result': ..., 'state': ..., 'allowed': bool}
Examples
Example 1:
Input:
cb = CircuitBreaker(failure_threshold=2); cb.record_failure(); cb.record_failure(); cb.can_execute()Output:
FalseExplanation: 2 failures >= threshold, circuit OPEN, calls blocked
Starter Code
class CircuitBreaker:
"""
Circuit breaker pattern for agent tool calls.
Prevents cascading failures.
"""
STATES = ['CLOSED', 'OPEN', 'HALF_OPEN']
def __init__(self, failure_threshold=5, recovery_timeout=30, half_open_max_calls=3):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = 'CLOSED'
self.failure_count = 0
self.last_failure_time = None
self.half_open_calls = 0
def can_execute(self):
"""Check if call should be allowed"""
# Your implementation here
pass
def record_success(self):
"""Record successful call"""
# Your implementation here
pass
def record_failure(self):
"""Record failed call"""
# Your implementation here
pass
def call(self, func, *args, **kwargs):
"""Execute func with circuit breaker protection"""
# Your implementation here
passPython3
ReadyLines: 1Characters: 0
Ready