Source code for apilinker.core.error_handling

"""
Advanced error handling and recovery system for APILinker.

This module provides sophisticated error handling capabilities including:
1. Circuit breakers to prevent cascading failures
2. Dead Letter Queue (DLQ) for failed operations
3. Configurable recovery strategies
4. Detailed error logging and analytics
"""

import json
import logging
import os
import time
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import (
    Any,
    Callable,
    Dict,
    List,
    Optional,
    Tuple,
    TypeVar,
    Generic,
)

import httpx

# Type variable for circuit breaker result type
T = TypeVar("T")

# Set up module logger
logger = logging.getLogger(__name__)


[docs] class CircuitBreakerState(Enum): """States for the circuit breaker pattern.""" CLOSED = "CLOSED" # Normal operation, requests pass through OPEN = "OPEN" # Circuit is open, requests fail fast HALF_OPEN = "HALF_OPEN" # Testing if service is back online
[docs] class ErrorCategory(Enum): """Categories of errors for better handling and reporting.""" NETWORK = "NETWORK" # Network connectivity issues AUTHENTICATION = "AUTH" # Authentication/authorization failures VALIDATION = "VALIDATION" # Invalid request data TIMEOUT = "TIMEOUT" # Request timeout RATE_LIMIT = "RATE_LIMIT" # API rate limit exceeded SERVER = "SERVER" # Server-side errors (5xx) CLIENT = "CLIENT" # Client-side errors (4xx) MAPPING = "MAPPING" # Data mapping errors PLUGIN = "PLUGIN" # Plugin-related errors UNKNOWN = "UNKNOWN" # Uncategorized errors
[docs] class RecoveryStrategy(Enum): """Strategies for recovering from errors.""" RETRY = "RETRY" # Simple retry EXPONENTIAL_BACKOFF = "EXPONENTIAL_BACKOFF" # Retry with increasing delay CIRCUIT_BREAKER = "CIRCUIT_BREAKER" # Circuit breaker pattern FALLBACK = "FALLBACK" # Use fallback data/operation SKIP = "SKIP" # Skip this operation FAIL_FAST = "FAIL_FAST" # Fail immediately
[docs] class ApiLinkerError(Exception): """Base exception class for APILinker errors with enhanced context.""" def __init__( self, message: str, error_category: ErrorCategory = ErrorCategory.UNKNOWN, status_code: Optional[int] = None, response_body: Optional[str] = None, request_url: Optional[str] = None, request_method: Optional[str] = None, operation_id: Optional[str] = None, correlation_id: Optional[str] = None, additional_context: Optional[Dict[str, Any]] = None, ): super().__init__(message) self.message = message self.error_category = error_category self.status_code = status_code self.response_body = response_body self.request_url = request_url self.request_method = request_method self.timestamp = datetime.now().isoformat() self.operation_id = operation_id self.correlation_id = correlation_id self.additional_context = additional_context or {}
[docs] @classmethod def from_exception(cls, exc: Exception, **kwargs) -> "ApiLinkerError": """Convert a standard exception to an ApiLinkerError with additional context.""" # Extract HTTP-specific information if available status_code = getattr(exc, "status_code", None) response_body = getattr(exc, "response", None) request_url = getattr(exc, "url", None) request_method = getattr(exc, "method", None) # Determine error category based on exception type or status code error_category = ErrorCategory.UNKNOWN if isinstance(exc, httpx.TimeoutException): error_category = ErrorCategory.TIMEOUT elif isinstance(exc, httpx.NetworkError): error_category = ErrorCategory.NETWORK elif status_code: if 400 <= status_code < 500: if status_code == 401 or status_code == 403: error_category = ErrorCategory.AUTHENTICATION elif status_code == 422: error_category = ErrorCategory.VALIDATION elif status_code == 429: error_category = ErrorCategory.RATE_LIMIT else: error_category = ErrorCategory.CLIENT elif 500 <= status_code < 600: error_category = ErrorCategory.SERVER # Convert response to string if it's not already if response_body and not isinstance(response_body, str): try: response_body = str(response_body)[:1000] # Limit size except: response_body = "<Unable to convert response to string>" # Merge with provided kwargs, kwargs take precedence error_kwargs = { "message": str(exc), "error_category": error_category, "status_code": status_code, "response_body": response_body, "request_url": request_url, "request_method": request_method, } error_kwargs.update(kwargs) return cls(**error_kwargs)
[docs] def to_dict(self) -> Dict[str, Any]: """Convert error to a dictionary for logging or serialization.""" return { "message": self.message, "error_category": self.error_category.value, "status_code": self.status_code, "response_body": self.response_body, "request_url": self.request_url, "request_method": self.request_method, "timestamp": self.timestamp, "operation_id": self.operation_id, "correlation_id": self.correlation_id, "additional_context": self.additional_context, }
def __str__(self) -> str: """String representation of the error.""" parts = [f"[{self.error_category.value}] {self.message}"] if self.status_code: parts.append(f"Status: {self.status_code}") if self.request_method and self.request_url: parts.append(f"{self.request_method} {self.request_url}") if self.correlation_id: parts.append(f"Correlation ID: {self.correlation_id}") return " | ".join(parts)
[docs] class DeadLetterQueue: """ Dead Letter Queue for storing failed operations for later analysis or retry. This implementation stores failed operations as JSON files in a specified directory for durability and easy inspection. """
[docs] def __init__(self, storage_dir: Optional[str] = None): """ Initialize the Dead Letter Queue. Args: storage_dir: Directory to store DLQ items. If None, defaults to a 'dlq' subdirectory in the current working directory. """ self.storage_dir = storage_dir or os.path.join(os.getcwd(), "dlq") os.makedirs(self.storage_dir, exist_ok=True) logger.info(f"Initialized Dead Letter Queue at {self.storage_dir}")
[docs] def add_item( self, error: ApiLinkerError, payload: Any, metadata: Optional[Dict[str, Any]] = None, ) -> str: """ Add a failed item to the Dead Letter Queue. Args: error: The error that caused the operation to fail payload: The data that was being processed when the failure occurred metadata: Additional metadata about the operation Returns: ID of the DLQ item for reference """ # Generate a unique ID for the item based on timestamp and error details item_id = f"{int(time.time())}_{error.error_category.value}_{error.correlation_id or 'unknown'}" # Create the DLQ item with all relevant information dlq_item = { "id": item_id, "timestamp": datetime.now().isoformat(), "error": error.to_dict(), "payload": payload, "metadata": metadata or {}, } # Write to file for persistence file_path = os.path.join(self.storage_dir, f"{item_id}.json") with open(file_path, "w") as f: json.dump(dlq_item, f, default=str, indent=2) logger.info(f"Added item to DLQ: {item_id}") return item_id
[docs] def get_items( self, error_category: Optional[ErrorCategory] = None, since_timestamp: Optional[str] = None, limit: int = 100, ) -> List[Dict[str, Any]]: """ Retrieve items from the Dead Letter Queue with optional filtering. Args: error_category: Filter items by error category since_timestamp: Only return items after this timestamp limit: Maximum number of items to return Returns: List of DLQ items matching the criteria """ items = [] # List all DLQ files dlq_files = list(Path(self.storage_dir).glob("*.json")) # Sort by creation time (newest first) dlq_files.sort(key=lambda p: p.stat().st_mtime, reverse=True) # Apply filters and load items for file_path in dlq_files[:limit]: try: with open(file_path, "r") as f: item = json.load(f) # Apply filters if ( error_category and item["error"]["error_category"] != error_category.value ): continue if since_timestamp and item["timestamp"] < since_timestamp: continue items.append(item) if len(items) >= limit: break except (json.JSONDecodeError, KeyError) as e: logger.warning(f"Error reading DLQ item {file_path}: {str(e)}") return items
[docs] def retry_item( self, item_id: str, operation: Callable[[Any], Any] ) -> Tuple[bool, Optional[Any], Optional[ApiLinkerError]]: """ Retry a specific item from the Dead Letter Queue. Args: item_id: ID of the item to retry operation: Function to call with the payload Returns: Tuple of (success, result, error) """ file_path = os.path.join(self.storage_dir, f"{item_id}.json") if not os.path.exists(file_path): logger.error(f"DLQ item not found: {item_id}") return False, None, ApiLinkerError(f"DLQ item not found: {item_id}") try: with open(file_path, "r") as f: item = json.load(f) payload = item["payload"] # Attempt to retry the operation result = operation(payload) # If successful, move the item to a 'processed' subdirectory processed_dir = os.path.join(self.storage_dir, "processed") os.makedirs(processed_dir, exist_ok=True) os.rename(file_path, os.path.join(processed_dir, f"{item_id}.json")) logger.info(f"Successfully retried DLQ item: {item_id}") return True, result, None except Exception as e: error = ApiLinkerError.from_exception( e, correlation_id=item.get("error", {}).get("correlation_id"), operation_id=f"dlq_retry_{item_id}", ) logger.error(f"Failed to retry DLQ item: {item_id} - {error}") return False, None, error
[docs] class CircuitBreaker(Generic[T]): """ Circuit Breaker implementation to prevent cascading failures. When a service is failing, the circuit breaker will "open" after a certain threshold of failures, preventing further calls to the failing service until a reset timeout has passed. This helps to: 1. Prevent overwhelming an already struggling service 2. Fail fast rather than waiting for timeouts 3. Allow the service time to recover """
[docs] def __init__( self, name: str, failure_threshold: int = 5, reset_timeout_seconds: float = 60.0, half_open_max_calls: int = 1, ): """ Initialize a circuit breaker. Args: name: Name of this circuit breaker for logging failure_threshold: Number of consecutive failures before opening circuit reset_timeout_seconds: Time to wait before trying again (moving to HALF_OPEN) half_open_max_calls: Number of test calls allowed in HALF_OPEN state """ self.name = name self.failure_threshold = failure_threshold self.reset_timeout_seconds = reset_timeout_seconds self.half_open_max_calls = half_open_max_calls self._state = CircuitBreakerState.CLOSED self._failure_count = 0 self._last_failure_time = 0.0 self._half_open_calls = 0 logger.info( f"Initialized circuit breaker '{name}' with failure threshold {failure_threshold}" )
@property def state(self) -> CircuitBreakerState: """Get the current state of the circuit breaker.""" # Check if it's time to move from OPEN to HALF_OPEN if ( self._state == CircuitBreakerState.OPEN and time.time() - self._last_failure_time >= self.reset_timeout_seconds ): self._state = CircuitBreakerState.HALF_OPEN self._half_open_calls = 0 logger.info(f"Circuit '{self.name}' state changed from OPEN to HALF_OPEN") return self._state
[docs] def execute( self, operation: Callable[[], T] ) -> Tuple[Optional[T], Optional[ApiLinkerError]]: """ Execute an operation with circuit breaker protection. Args: operation: The function to execute Returns: Tuple of (result, error) """ current_state = self.state # Check if circuit is open - fail fast if current_state == CircuitBreakerState.OPEN: logger.warning(f"Circuit '{self.name}' is OPEN - failing fast") return None, ApiLinkerError( message=f"Circuit breaker '{self.name}' is open", error_category=ErrorCategory.SERVER, additional_context={"circuit_breaker": self.name}, ) # Check if we've reached the call limit in HALF_OPEN state if ( current_state == CircuitBreakerState.HALF_OPEN and self._half_open_calls >= self.half_open_max_calls ): logger.warning( f"Circuit '{self.name}' is HALF_OPEN and call limit reached - failing fast" ) return None, ApiLinkerError( message=f"Circuit breaker '{self.name}' is half-open and call limit reached", error_category=ErrorCategory.SERVER, additional_context={"circuit_breaker": self.name}, ) # Increment call counter in HALF_OPEN state if current_state == CircuitBreakerState.HALF_OPEN: self._half_open_calls += 1 try: # Execute the operation result = operation() # Success - reset failure count self._success() return result, None except Exception as e: # Handle failure error = ApiLinkerError.from_exception( e, additional_context={"circuit_breaker": self.name} ) self._failure() return None, error
def _success(self) -> None: """Handle a successful operation.""" if self._state != CircuitBreakerState.CLOSED: logger.info(f"Circuit '{self.name}' closing due to successful operation") self._state = CircuitBreakerState.CLOSED self._failure_count = 0 def _failure(self) -> None: """Handle a failed operation.""" self._failure_count += 1 self._last_failure_time = time.time() if ( self._state == CircuitBreakerState.CLOSED and self._failure_count >= self.failure_threshold ): logger.warning( f"Circuit '{self.name}' opening after {self._failure_count} consecutive failures" ) self._state = CircuitBreakerState.OPEN elif self._state == CircuitBreakerState.HALF_OPEN: logger.warning( f"Circuit '{self.name}' reopening after failure in HALF_OPEN state" ) self._state = CircuitBreakerState.OPEN logger.debug( f"Circuit '{self.name}' recorded failure ({self._failure_count}/{self.failure_threshold})" )
[docs] class ErrorRecoveryManager: """ Manages error recovery strategies for different types of operations. This class centralizes error recovery logic to make it configurable and consistent across the application. """
[docs] def __init__(self, dlq: Optional[DeadLetterQueue] = None): """ Initialize the error recovery manager. Args: dlq: Dead letter queue for storing failed operations """ self.dlq = dlq or DeadLetterQueue() self.circuit_breakers: Dict[str, CircuitBreaker] = {} self.default_strategies: Dict[ErrorCategory, List[RecoveryStrategy]] = { ErrorCategory.NETWORK: [RecoveryStrategy.EXPONENTIAL_BACKOFF], ErrorCategory.TIMEOUT: [RecoveryStrategy.EXPONENTIAL_BACKOFF], ErrorCategory.RATE_LIMIT: [RecoveryStrategy.EXPONENTIAL_BACKOFF], ErrorCategory.SERVER: [ RecoveryStrategy.CIRCUIT_BREAKER, RecoveryStrategy.EXPONENTIAL_BACKOFF, ], ErrorCategory.CLIENT: [RecoveryStrategy.FAIL_FAST], ErrorCategory.AUTHENTICATION: [RecoveryStrategy.FAIL_FAST], ErrorCategory.VALIDATION: [RecoveryStrategy.FAIL_FAST], ErrorCategory.MAPPING: [RecoveryStrategy.FAIL_FAST], ErrorCategory.PLUGIN: [RecoveryStrategy.FAIL_FAST], ErrorCategory.UNKNOWN: [RecoveryStrategy.RETRY], } # Custom strategies by operation type self.custom_strategies: Dict[ str, Dict[ErrorCategory, List[RecoveryStrategy]] ] = {} logger.info("Initialized ErrorRecoveryManager")
[docs] def get_circuit_breaker(self, name: str) -> CircuitBreaker: """ Get or create a circuit breaker for a specific operation. Args: name: Name of the circuit breaker Returns: CircuitBreaker instance """ if name not in self.circuit_breakers: self.circuit_breakers[name] = CircuitBreaker(name) return self.circuit_breakers[name]
[docs] def set_strategy( self, error_category: ErrorCategory, strategies: List[RecoveryStrategy], operation_type: Optional[str] = None, ) -> None: """ Set recovery strategies for an error category. Args: error_category: Type of error strategies: List of strategies to apply in order operation_type: Optional specific operation type, or None for default """ if operation_type: if operation_type not in self.custom_strategies: self.custom_strategies[operation_type] = {} self.custom_strategies[operation_type][error_category] = strategies else: self.default_strategies[error_category] = strategies logger.debug( f"Set {len(strategies)} recovery strategies for {error_category.value}" + (f" in {operation_type}" if operation_type else "") )
[docs] def get_strategies( self, error_category: ErrorCategory, operation_type: Optional[str] = None ) -> List[RecoveryStrategy]: """ Get recovery strategies for an error category and operation type. Args: error_category: Type of error operation_type: Optional specific operation type Returns: List of recovery strategies """ # Check for custom strategies first if operation_type and operation_type in self.custom_strategies: if error_category in self.custom_strategies[operation_type]: return self.custom_strategies[operation_type][error_category] # Fall back to default strategies return self.default_strategies.get(error_category, [RecoveryStrategy.RETRY])
[docs] def handle_error( self, error: ApiLinkerError, payload: Any, operation: Callable[[Any], Any], operation_type: str, max_retries: int = 3, retry_delay: float = 1.0, retry_backoff_factor: float = 2.0, ) -> Tuple[bool, Any, Optional[ApiLinkerError]]: """ Handle an error using appropriate recovery strategies. Args: error: The error that occurred payload: Data being processed when the error occurred operation: Function to call for retry attempts operation_type: Type of operation for strategy selection max_retries: Maximum number of retry attempts retry_delay: Initial delay between retries in seconds retry_backoff_factor: Multiplicative factor for retry delay Returns: Tuple of (success, result, error) """ logger.debug(f"Handling error for {operation_type}: {error}") # Get applicable strategies strategies = self.get_strategies(error.error_category, operation_type) # Track whether we need to add to DLQ add_to_dlq = True # Apply each strategy in order for strategy in strategies: logger.debug(f"Applying {strategy.value} strategy for {operation_type}") # Circuit breaker strategy if strategy == RecoveryStrategy.CIRCUIT_BREAKER: circuit = self.get_circuit_breaker(operation_type) result, cb_error = circuit.execute(lambda: operation(payload)) if result is not None: # Circuit breaker allowed the call and it succeeded return True, result, None # If circuit is open, skip other strategies if cb_error and cb_error.additional_context.get("circuit_breaker"): return False, None, cb_error # Simple retry strategy elif strategy == RecoveryStrategy.RETRY: for attempt in range(max_retries): try: result = operation(payload) return True, result, None except Exception as e: logger.warning( f"Retry {attempt + 1}/{max_retries} failed: {str(e)}" ) if attempt == max_retries - 1: error = ApiLinkerError.from_exception( e, operation_id=f"{operation_type}_retry_{attempt}", correlation_id=error.correlation_id, ) # Exponential backoff retry strategy elif strategy == RecoveryStrategy.EXPONENTIAL_BACKOFF: current_delay = retry_delay for attempt in range(max_retries): try: if attempt > 0: logger.info( f"Waiting {current_delay:.2f}s before retry {attempt + 1}/{max_retries}" ) time.sleep(current_delay) current_delay *= retry_backoff_factor result = operation(payload) return True, result, None except Exception as e: logger.warning( f"Retry {attempt + 1}/{max_retries} failed: {str(e)}" ) if attempt == max_retries - 1: error = ApiLinkerError.from_exception( e, operation_id=f"{operation_type}_retry_{attempt}", correlation_id=error.correlation_id, ) # Skip strategy - don't add to DLQ elif strategy == RecoveryStrategy.SKIP: add_to_dlq = False return False, None, error # Fail fast strategy - just return the error elif strategy == RecoveryStrategy.FAIL_FAST: # Still add to DLQ for analysis break # Fallback strategy would go here - not implemented yet # If we reach here, all strategies failed # Add to DLQ if configured if add_to_dlq: dlq_id = self.dlq.add_item( error=error, payload=payload, metadata={"operation_type": operation_type}, ) # Update error with DLQ information if error.additional_context is None: error.additional_context = {} error.additional_context["dlq_id"] = dlq_id return False, None, error
[docs] class ErrorAnalytics: """ Collects and analyzes error data to provide insights on system health. """
[docs] def __init__(self, max_errors: int = 1000): """ Initialize error analytics. Args: max_errors: Maximum number of errors to keep in memory """ self.max_errors = max_errors self.errors: List[Dict[str, Any]] = [] self.error_counts: Dict[str, int] = {} # By category self.error_rates: Dict[str, List[Tuple[float, int]]] = ( {} ) # Category -> [(timestamp, count)] self.last_analyzed: float = time.time()
[docs] def record_error(self, error: ApiLinkerError) -> None: """Record an error for analysis.""" # Convert to dict for storage error_dict = error.to_dict() # Add to errors list, keeping under max size self.errors.append(error_dict) if len(self.errors) > self.max_errors: self.errors.pop(0) # Update error counts by category category = error.error_category.value self.error_counts[category] = self.error_counts.get(category, 0) + 1 # Update error rates if category not in self.error_rates: self.error_rates[category] = [] now = time.time() self.error_rates[category].append((now, 1)) # Clean up old rate data (older than 1 hour) one_hour_ago = now - 3600 for cat in self.error_rates: self.error_rates[cat] = [ (ts, count) for ts, count in self.error_rates[cat] if ts > one_hour_ago ]
[docs] def get_error_rate( self, category: Optional[ErrorCategory] = None, minutes: int = 5 ) -> float: """ Get the error rate for a category over the specified time period. Args: category: Error category or None for all categories minutes: Time window in minutes Returns: Errors per minute """ now = time.time() since = now - (minutes * 60) if category: categories = [category.value] else: categories = list(self.error_rates.keys()) error_count = 0 for cat in categories: if cat in self.error_rates: error_count += sum( count for ts, count in self.error_rates[cat] if ts > since ) return error_count / minutes if minutes > 0 else 0
[docs] def get_top_errors(self, limit: int = 5) -> List[Dict[str, Any]]: """Get the most frequent error categories.""" sorted_categories = sorted( self.error_counts.items(), key=lambda x: x[1], reverse=True ) return [ {"category": cat, "count": count} for cat, count in sorted_categories[:limit] ]
[docs] def get_summary(self) -> Dict[str, Any]: """Get a summary of error statistics.""" total_errors = sum(self.error_counts.values()) return { "total_errors": total_errors, "error_counts_by_category": self.error_counts, "recent_error_rate": self.get_error_rate(minutes=5), "top_errors": self.get_top_errors(), }
# Factory function to create ErrorHandler for easy usage
[docs] def create_error_handler( dlq_dir: Optional[str] = None, ) -> Tuple[DeadLetterQueue, ErrorRecoveryManager, ErrorAnalytics]: """ Create a complete error handling system. Args: dlq_dir: Directory to store DLQ items Returns: Tuple of (DeadLetterQueue, ErrorRecoveryManager, ErrorAnalytics) """ dlq = DeadLetterQueue(dlq_dir) recovery_manager = ErrorRecoveryManager(dlq) analytics = ErrorAnalytics() return dlq, recovery_manager, analytics