Reliability and Fault Tolerance¶
GraphBit provides comprehensive reliability features to ensure robust production workflows. This guide covers error handling, fault tolerance, recovery strategies, and building resilient workflow systems.
Overview¶
Reliability in GraphBit encompasses: - Error Handling: Graceful handling of failures and exceptions - Fault Tolerance: Continuing operation despite component failures - Recovery Strategies: Automatic and manual recovery mechanisms - Circuit Breakers: Preventing cascading failures - Retry Logic: Intelligent retry patterns for transient failures - Health Monitoring: Continuous system health assessment
Error Handling Patterns¶
Basic Error Handling¶
import graphbit
import time
import os
# Initialize GraphBit
graphbit.init()
def safe_workflow_execution(workflow, executor, max_retries=3):
"""Execute workflow with comprehensive error handling."""
for attempt in range(max_retries + 1):
try:
print(f"Execution attempt {attempt + 1}/{max_retries + 1}")
result = executor.execute(workflow)
if result.is_completed():
print("✅ Workflow executed successfully")
return result
else:
error_msg = result.error()
print(f"❌ Workflow failed: {error_msg}")
if attempt < max_retries:
wait_time = 2 ** attempt # Exponential backoff
print(f"⏳ Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
print("❌ Max retries exceeded")
return result
except Exception as e:
print(f"❌ Execution exception: {e}")
if attempt < max_retries:
wait_time = 2 ** attempt
print(f"⏳ Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
print("❌ Max retries exceeded")
raise e
return None
def create_fault_tolerant_workflow():
"""Create workflow with built-in fault tolerance."""
workflow = graphbit.Workflow("Fault Tolerant Workflow")
# Input validator with error handling
validator = graphbit.Node.agent(
name="Input Validator",
prompt="""
Validate this input and handle any issues gracefully:
Input: {input}
If the input is invalid:
1. Identify the specific issues
2. Suggest corrections if possible
3. Return a status indicating validation result
If valid, return the input with validation confirmation.
""",
agent_id="validator"
)
# Robust processor with fallback logic
processor = graphbit.Node.agent(
name="Robust Processor",
prompt="""
Process this validated input with error resilience:
Input: {validated_input}
If processing encounters issues:
1. Try alternative processing methods
2. Provide partial results if possible
3. Report any limitations or warnings
Always return some form of useful output.
""",
agent_id="processor"
)
# Error recovery node
recovery_handler = graphbit.Node.agent(
name="Recovery Handler",
prompt="""
Handle any errors or partial results:
Results: {processed_results}
If there are errors or incomplete results:
1. Attempt data recovery
2. Fill in missing information where possible
3. Flag areas that need manual review
""",
agent_id="recovery_handler"
)
# Build fault-tolerant chain
validator_id = workflow.add_node(validator)
processor_id = workflow.add_node(processor)
recovery_id = workflow.add_node(recovery_handler)
workflow.connect(validator_id, processor_id)
workflow.connect(processor_id, recovery_id)
return workflow
Circuit Breaker Pattern¶
Implementing Circuit Breakers¶
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker for preventing cascading failures."""
def __init__(self, failure_threshold=5, recovery_timeout=60, timeout_seconds=30):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout # seconds
self.timeout_seconds = timeout_seconds
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def can_execute(self):
"""Check if execution is allowed."""
if self.state == CircuitState.CLOSED:
return True
elif self.state == CircuitState.OPEN:
if self.last_failure_time and \
(datetime.now() - self.last_failure_time).seconds >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
return True
return False
elif self.state == CircuitState.HALF_OPEN:
return True
return False
def record_success(self):
"""Record successful execution."""
self.failure_count = 0
self.state = CircuitState.CLOSED
def record_failure(self):
"""Record failed execution."""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def get_state(self):
"""Get current circuit breaker state."""
return self.state
class ReliableExecutor:
"""Executor with circuit breaker protection."""
def __init__(self, base_executor, circuit_breaker=None):
self.base_executor = base_executor
self.circuit_breaker = circuit_breaker or CircuitBreaker()
def execute(self, workflow):
"""Execute workflow with circuit breaker protection."""
if not self.circuit_breaker.can_execute():
raise Exception(f"Circuit breaker is {self.circuit_breaker.get_state().value}")
try:
start_time = time.time()
result = self.base_executor.execute(workflow)
duration = time.time() - start_time
if result.is_completed():
self.circuit_breaker.record_success()
return result
else:
self.circuit_breaker.record_failure()
return result
except Exception as e:
self.circuit_breaker.record_failure()
raise e
def create_circuit_breaker_executor():
"""Create executor with circuit breaker protection."""
# Base executor
llm_config = graphbit.LlmConfig.openai(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini"
)
base_executor = graphbit.Executor(llm_config)
# Circuit breaker configuration
circuit_breaker = CircuitBreaker(
failure_threshold=3, # Open after 3 failures
recovery_timeout=30, # Try again after 30 seconds
timeout_seconds=60 # Individual execution timeout
)
reliable_executor = ReliableExecutor(base_executor, circuit_breaker)
return reliable_executor
Retry Strategies¶
Advanced Retry Logic¶
import random
from typing import Callable, Optional
class RetryStrategy:
"""Base class for retry strategies."""
def get_wait_time(self, attempt: int) -> float:
"""Get wait time for given attempt number."""
raise NotImplementedError
class ExponentialBackoff(RetryStrategy):
"""Exponential backoff retry strategy."""
def __init__(self, base_delay=1.0, max_delay=60.0, multiplier=2.0):
self.base_delay = base_delay
self.max_delay = max_delay
self.multiplier = multiplier
def get_wait_time(self, attempt: int) -> float:
delay = self.base_delay * (self.multiplier ** attempt)
return min(delay, self.max_delay)
class JitteredBackoff(RetryStrategy):
"""Exponential backoff with jitter to avoid thundering herd."""
def __init__(self, base_delay=1.0, max_delay=60.0, multiplier=2.0, jitter_ratio=0.1):
self.base_delay = base_delay
self.max_delay = max_delay
self.multiplier = multiplier
self.jitter_ratio = jitter_ratio
def get_wait_time(self, attempt: int) -> float:
delay = self.base_delay * (self.multiplier ** attempt)
# Add jitter
jitter = delay * self.jitter_ratio * random.random()
delay += jitter
return min(delay, self.max_delay)
class RetryableExecutor:
"""Executor with configurable retry strategies."""
def __init__(self, base_executor, retry_strategy=None, max_retries=3):
self.base_executor = base_executor
self.retry_strategy = retry_strategy or ExponentialBackoff()
self.max_retries = max_retries
def execute(self, workflow, retry_condition: Optional[Callable] = None):
"""Execute workflow with retry logic."""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
result = self.base_executor.execute(workflow)
if result.is_completed():
if attempt > 0:
print(f"✅ Workflow succeeded on attempt {attempt + 1}")
return result
else:
# Check if this failure should trigger a retry
if retry_condition and not retry_condition(result):
print(f"❌ Non-retryable failure: {result.error()}")
return result
if attempt < self.max_retries:
wait_time = self.retry_strategy.get_wait_time(attempt)
print(f"⏳ Workflow failed, retrying in {wait_time:.1f}s (attempt {attempt + 1})")
time.sleep(wait_time)
else:
print(f"❌ Workflow failed after {self.max_retries + 1} attempts")
return result
except Exception as e:
last_exception = e
if attempt < self.max_retries:
wait_time = self.retry_strategy.get_wait_time(attempt)
print(f"⏳ Exception occurred, retrying in {wait_time:.1f}s: {e}")
time.sleep(wait_time)
else:
print(f"❌ Exception after {self.max_retries + 1} attempts: {e}")
raise e
if last_exception:
raise last_exception
Health Monitoring and Recovery¶
Health Check System¶
class HealthChecker:
"""Comprehensive health monitoring system."""
def __init__(self):
self.health_checks = {}
self.health_history = []
def register_health_check(self, name: str, check_func: Callable, critical: bool = True):
"""Register a health check function."""
self.health_checks[name] = {
"func": check_func,
"critical": critical,
"last_result": None,
"last_check": None
}
def run_health_checks(self):
"""Run all registered health checks."""
results = {}
overall_healthy = True
for name, check_config in self.health_checks.items():
try:
start_time = time.time()
result = check_config["func"]()
duration = (time.time() - start_time) * 1000
check_result = {
"healthy": bool(result),
"duration_ms": duration,
"timestamp": datetime.now(),
"details": result if isinstance(result, dict) else {}
}
results[name] = check_result
check_config["last_result"] = check_result
check_config["last_check"] = datetime.now()
# Update overall health
if check_config["critical"] and not check_result["healthy"]:
overall_healthy = False
except Exception as e:
check_result = {
"healthy": False,
"error": str(e),
"timestamp": datetime.now()
}
results[name] = check_result
check_config["last_result"] = check_result
check_config["last_check"] = datetime.now()
if check_config["critical"]:
overall_healthy = False
health_report = {
"overall_healthy": overall_healthy,
"timestamp": datetime.now(),
"checks": results
}
self.health_history.append(health_report)
# Keep only last 100 health checks
if len(self.health_history) > 100:
self.health_history = self.health_history[-100:]
return health_report
def create_health_checks():
"""Create standard health checks for GraphBit."""
def check_graphbit_system():
"""Check GraphBit system health."""
try:
system_info = graphbit.get_system_info()
health_check = graphbit.health_check()
return {
"system_available": True,
"runtime_initialized": system_info.get("runtime_initialized", False),
"health_check": health_check
}
except Exception as e:
return False
def check_llm_connectivity():
"""Check LLM provider connectivity."""
try:
config = graphbit.LlmConfig.openai(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini"
)
client = graphbit.LlmClient(config)
# Simple test completion
response = client.complete("Test connectivity: {input}", {"input": "test"})
return {
"provider_accessible": True,
"response_received": len(response) > 0
}
except Exception as e:
return False
# Create health checker and register checks
health_checker = HealthChecker()
health_checker.register_health_check("graphbit_system", check_graphbit_system, critical=True)
health_checker.register_health_check("llm_connectivity", check_llm_connectivity, critical=True)
return health_checker
Fallback and Degradation Strategies¶
Graceful Degradation¶
class FallbackWorkflow:
"""Workflow with multiple fallback levels."""
def __init__(self, name):
self.name = name
self.primary_workflow = None
self.fallback_workflows = []
def set_primary_workflow(self, workflow):
"""Set the primary workflow."""
self.primary_workflow = workflow
def add_fallback_workflow(self, workflow, priority=1):
"""Add a fallback workflow with priority."""
self.fallback_workflows.append({
"workflow": workflow,
"priority": priority
})
# Sort by priority (lower numbers = higher priority)
self.fallback_workflows.sort(key=lambda x: x["priority"])
def execute(self, executor):
"""Execute workflow with fallback chain."""
# Try primary workflow first
if self.primary_workflow:
try:
print("🚀 Attempting primary workflow...")
result = executor.execute(self.primary_workflow)
if result.is_completed():
print("✅ Primary workflow succeeded")
return result
else:
print(f"❌ Primary workflow failed: {result.error()}")
except Exception as e:
print(f"❌ Primary workflow exception: {e}")
# Try fallback workflows in priority order
for i, fallback in enumerate(self.fallback_workflows):
try:
print(f"🔄 Attempting fallback {i+1}...")
result = executor.execute(fallback["workflow"])
if result.is_completed():
print(f"✅ Fallback {i+1} succeeded")
return result
else:
print(f"❌ Fallback {i+1} failed: {result.error()}")
except Exception as e:
print(f"❌ Fallback {i+1} exception: {e}")
print("❌ All workflows failed")
return None
def create_degraded_processing_workflows():
"""Create workflows with different levels of processing."""
# High-quality processing (primary)
primary_workflow = graphbit.Workflow("High Quality Processing")
primary_processor = graphbit.Node.agent(
name="Advanced Processor",
prompt="""
Perform comprehensive analysis of this input:
Input: {input}
Provide:
1. Detailed analysis
2. Multiple perspectives
3. Confidence scores
4. Recommendations
5. Risk assessment
""",
agent_id="advanced_processor"
)
primary_workflow.add_node(primary_processor)
# Basic processing (fallback)
fallback_workflow = graphbit.Workflow("Basic Processing")
basic_processor = graphbit.Node.agent(
name="Basic Processor",
prompt="Provide a simple summary of: {input}",
agent_id="basic_processor"
)
fallback_workflow.add_node(basic_processor)
# Emergency processing (last resort)
emergency_workflow = graphbit.Workflow("Emergency Processing")
emergency_processor = graphbit.Node.transform(
name="Emergency Processor",
transformation="uppercase" # Simple transformation as last resort
)
emergency_workflow.add_node(emergency_processor)
# Create fallback workflow
fallback_system = FallbackWorkflow("Degraded Processing System")
fallback_system.set_primary_workflow(primary_workflow)
fallback_system.add_fallback_workflow(fallback_workflow, priority=1)
fallback_system.add_fallback_workflow(emergency_workflow, priority=2)
return fallback_system
Production Reliability Patterns¶
Complete Reliability Stack¶
class ProductionExecutor:
"""Production-ready executor with full reliability stack."""
def __init__(self, llm_config):
# Base executor
self.base_executor = graphbit.Executor(llm_config)
# Reliability components
self.circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
self.retry_strategy = JitteredBackoff(base_delay=1.0, max_delay=30.0)
self.health_checker = create_health_checks()
# Metrics
self.execution_count = 0
self.success_count = 0
self.failure_count = 0
def execute(self, workflow, execution_id=None):
"""Execute workflow with full reliability features."""
self.execution_count += 1
# Generate execution ID if not provided
if execution_id is None:
execution_id = f"prod_exec_{int(time.time())}_{self.execution_count}"
# Health check before execution
health_report = self.health_checker.run_health_checks()
if not health_report["overall_healthy"]:
self.failure_count += 1
raise Exception("System health check failed")
# Circuit breaker check
if not self.circuit_breaker.can_execute():
self.failure_count += 1
raise Exception(f"Circuit breaker is {self.circuit_breaker.get_state().value}")
# Retry loop
max_retries = 3
for attempt in range(max_retries + 1):
try:
# Execute workflow
result = self.base_executor.execute(workflow)
if result.is_completed():
self.success_count += 1
self.circuit_breaker.record_success()
return result
else:
self.failure_count += 1
self.circuit_breaker.record_failure()
if attempt < max_retries:
wait_time = self.retry_strategy.get_wait_time(attempt)
print(f"⏳ Retrying in {wait_time:.1f}s (attempt {attempt + 1})")
time.sleep(wait_time)
else:
return result
except Exception as e:
self.failure_count += 1
self.circuit_breaker.record_failure()
if attempt < max_retries:
wait_time = self.retry_strategy.get_wait_time(attempt)
print(f"⏳ Retrying after exception in {wait_time:.1f}s: {e}")
time.sleep(wait_time)
else:
raise e
def get_reliability_metrics(self):
"""Get reliability metrics."""
success_rate = (self.success_count / self.execution_count) * 100 if self.execution_count > 0 else 0
return {
"total_executions": self.execution_count,
"successful_executions": self.success_count,
"failed_executions": self.failure_count,
"success_rate_percent": success_rate,
"circuit_breaker_state": self.circuit_breaker.get_state().value
}
def create_production_executor():
"""Create production-ready executor."""
llm_config = graphbit.LlmConfig.openai(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini"
)
return ProductionExecutor(llm_config)
Best Practices¶
1. Reliability Design Principles¶
def get_reliability_best_practices():
"""Get best practices for building reliable workflows."""
best_practices = {
"fail_fast": "Detect and report failures quickly",
"graceful_degradation": "Provide reduced functionality when components fail",
"idempotency": "Ensure operations can be safely repeated",
"timeout_management": "Set appropriate timeouts for all operations",
"resource_cleanup": "Always clean up resources, even on failure",
"monitoring": "Continuously monitor system health and performance",
"testing": "Test failure scenarios regularly"
}
for practice, description in best_practices.items():
print(f"✅ {practice.replace('_', ' ').title()}: {description}")
return best_practices
2. Error Classification¶
def classify_error_type(error):
"""Classify errors for appropriate handling."""
error_message = str(error).lower()
# Transient errors - should retry
if any(keyword in error_message for keyword in [
"timeout", "network", "connection", "temporary", "rate limit"
]):
return "transient"
# Permanent errors - should not retry
if any(keyword in error_message for keyword in [
"authentication", "permission", "not found", "invalid"
]):
return "permanent"
# System errors - may need recovery
if any(keyword in error_message for keyword in [
"memory", "disk", "resource", "capacity"
]):
return "system"
# Unknown errors - handle conservatively
return "unknown"
Usage Examples¶
Production Reliability Example¶
def example_production_usage():
"""Example of production reliability patterns."""
# Initialize GraphBit
graphbit.init()
# Create production executor
prod_executor = create_production_executor()
# Create reliable workflow
workflow = create_fault_tolerant_workflow()
try:
# Execute with full reliability features
result = prod_executor.execute(workflow, execution_id="example_prod_run")
if result.is_completed():
print(f"✅ Production execution successful: {result.output()}")
else:
print(f"❌ Production execution failed: {result.error()}")
# Print reliability metrics
metrics = prod_executor.get_reliability_metrics()
print(f"📊 Reliability Metrics: {metrics}")
except Exception as e:
print(f"❌ Production execution exception: {e}")
if __name__ == "__main__":
example_production_usage()
What's Next¶
- Learn about Monitoring for tracking reliability metrics
- Explore Performance optimization for reliable systems
- Check Validation for ensuring system correctness
- See LLM Providers for provider-specific reliability features