Source code for apilinker.api_linker

"""
Main ApiLinker class that orchestrates the connection, mapping, and data transfer between APIs.
"""

from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import os
import time
import uuid
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Tuple, TypedDict, Union

import yaml
from pydantic import BaseModel, Field

from apilinker.core.auth import AuthManager
from apilinker.core.aggregation import MultiSourceAggregator
from apilinker.core.connector import ApiConnector
from apilinker.core.error_handling import (
    ApiLinkerError,
    CircuitBreaker,
    DeadLetterQueue,
    ErrorCategory,
    RecoveryStrategy,
    create_error_handler,
)
from apilinker.core.logger import setup_logger
from apilinker.core.mapper import FieldMapper
from apilinker.core.scheduler import Scheduler
from apilinker.core.security import (
    EncryptionLevel,
)
from apilinker.core.security_integration import (
    SecurityManager,
    integrate_security_with_auth_manager,
)
from apilinker.core.validation import (
    validate_payload_against_schema,
    is_validator_available,
)
from apilinker.core.provenance import ProvenanceRecorder
from apilinker.core.idempotency import InMemoryDeduplicator, generate_idempotency_key
from apilinker.core.state_store import (
    FileStateStore,
    SQLiteStateStore,
    StateStore,
    now_iso,
)
from apilinker.core.observability import (
    ObservabilityConfig,
    TelemetryManager,
)
from apilinker.core.secrets import (
    SecretManager,
    SecretManagerConfig,
    SecretNotFoundError,
    SecretAccessError,
)


# Legacy error detail class kept for backward compatibility
[docs] class ErrorDetail(BaseModel): """Detailed error information for API requests.""" message: str status_code: Optional[int] = None response_body: Optional[str] = None request_url: Optional[str] = None request_method: Optional[str] = None timestamp: Optional[str] = None error_type: str = "general"
[docs] @classmethod def from_apilinker_error(cls, error: ApiLinkerError) -> "ErrorDetail": """Convert an ApiLinkerError to ErrorDetail for backward compatibility.""" return cls( message=error.message, status_code=error.status_code, response_body=error.response_body, request_url=error.request_url, request_method=error.request_method, timestamp=error.timestamp, error_type=error.error_category.value.lower(), )
[docs] class SyncResult(BaseModel): """Result of a sync operation with enhanced error reporting.""" count: int = 0 success: bool = True errors: List[Dict[str, Any]] = Field(default_factory=list) details: Dict[str, Any] = Field(default_factory=dict) correlation_id: str = Field(default_factory=lambda: str(uuid.uuid4())) source_response: Dict[str, Any] = Field(default_factory=dict) target_response: Dict[str, Any] = Field(default_factory=dict) duration_ms: Optional[int] = None
[docs] class ApiLinker: """ Main class for connecting, mapping and transferring data between APIs. This class orchestrates the entire process of: 1. Connecting to source and target APIs 2. Fetching data from the source 3. Mapping fields according to configuration 4. Transforming data as needed 5. Sending data to the target 6. Scheduling recurring operations Args: config_path: Path to YAML/JSON configuration file source_config: Direct source configuration dictionary target_config: Direct target configuration dictionary mapping_config: Direct mapping configuration dictionary schedule_config: Direct scheduling configuration dictionary log_level: Logging level (DEBUG, INFO, WARNING, ERROR) log_file: Path to log file """ def __init__( self, config_path: Optional[str] = None, source_config: Optional[Dict[str, Any]] = None, target_config: Optional[Dict[str, Any]] = None, mapping_config: Optional[Dict[str, Any]] = None, schedule_config: Optional[Dict[str, Any]] = None, error_handling_config: Optional[Dict[str, Any]] = None, security_config: Optional[Dict[str, Any]] = None, validation_config: Optional[Dict[str, Any]] = None, observability_config: Optional[Dict[str, Any]] = None, secret_manager_config: Optional[Dict[str, Any]] = None, log_level: str = "INFO", log_file: Optional[str] = None, ) -> None: # Initialize logger self.logger = setup_logger(log_level, log_file) self.logger.info("Initializing ApiLinker") # Initialize components self.source: Optional[ApiConnector] = None self.target: Optional[ApiConnector] = None self.sources: Dict[str, ApiConnector] = {} self.mapper = FieldMapper() self.multi_source_aggregator = MultiSourceAggregator(self.mapper) self.scheduler = Scheduler() self.validation_config = validation_config or {"strict_mode": False} self.provenance = ProvenanceRecorder() self.deduplicator = InMemoryDeduplicator() self.state_store: Optional[StateStore] = None # Initialize observability self.telemetry = self._initialize_observability(observability_config) # Initialize secret management self.secret_manager = self._initialize_secret_manager(secret_manager_config) # Initialize security system self.security_manager = self._initialize_security(security_config) # Initialize auth manager and integrate with security self.auth_manager = AuthManager() integrate_security_with_auth_manager(self.security_manager, self.auth_manager) # Initialize error handling system self.dlq, self.error_recovery_manager, self.error_analytics = ( create_error_handler() ) # Load configuration if provided if config_path: self.load_config(config_path) else: # Set up direct configurations if provided if source_config: self.add_source(**source_config) if target_config: self.add_target(**target_config) if mapping_config: self.add_mapping(**mapping_config) if schedule_config: self.add_schedule(**schedule_config) if error_handling_config: self._configure_error_handling(error_handling_config) if security_config: self._configure_security(security_config)
[docs] def load_config(self, config_path: str) -> None: """ Load configuration from a YAML or JSON file. Args: config_path: Path to the configuration file """ self.logger.info(f"Loading configuration from {config_path}") # Resolve environment variables in config path config_path = os.path.expandvars(config_path) if not os.path.exists(config_path): raise FileNotFoundError(f"Configuration file not found: {config_path}") with open(config_path, "r") as f: config = yaml.safe_load(f) # Set up components from config if "source" in config: self.add_source(**config["source"]) if "sources" in config: sources_config = config["sources"] if isinstance(sources_config, dict): for source_name, source_config in sources_config.items(): self.add_named_source(source_name, **source_config) else: for source_config in sources_config: named_source_config = dict(source_config) source_name = named_source_config.pop("name") self.add_named_source(source_name, **named_source_config) if "target" in config: self.add_target(**config["target"]) if "mapping" in config: if isinstance(config["mapping"], list): for mapping in config["mapping"]: self.add_mapping(**mapping) else: self.add_mapping(**config["mapping"]) if "schedule" in config: self.add_schedule(**config["schedule"]) # Configure error handling if specified if "error_handling" in config: self._configure_error_handling(config["error_handling"]) # Configure security if specified if "security" in config: self._configure_security(config["security"]) # Configure secret management if specified if "secrets" in config: self.secret_manager = self._initialize_secret_manager(config["secrets"]) # Validation configuration if "validation" in config: self.validation_config = config["validation"] if "logging" in config: log_config = config["logging"] log_level = log_config.get("level", "INFO") log_file = log_config.get("file") self.logger = setup_logger(log_level, log_file) # Provenance options if "provenance" in config: prov_cfg = config["provenance"] output_dir = prov_cfg.get("output_dir") jsonl_log = prov_cfg.get("jsonl_log") self.provenance = ProvenanceRecorder( output_dir=output_dir, jsonl_log_path=jsonl_log ) # Idempotency if "idempotency" in config: self.idempotency_config = config["idempotency"] else: self.idempotency_config = {"enabled": False, "salt": ""} # State store if "state" in config: st_cfg = config["state"] st_type = st_cfg.get("type", "file") if st_type == "file": path = st_cfg.get("path", ".apilinker/state.json") default_last_sync = st_cfg.get("default_last_sync") self.state_store = FileStateStore( path, default_last_sync=default_last_sync ) elif st_type == "sqlite": path = st_cfg.get("path", ".apilinker/state.db") default_last_sync = st_cfg.get("default_last_sync") self.state_store = SQLiteStateStore( path, default_last_sync=default_last_sync )
def _create_connector( self, type: str, base_url: str, auth: Optional[Dict[str, Any]] = None, endpoints: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> ApiConnector: """Create a connector while resolving auth secrets consistently.""" if auth: auth = self._resolve_auth_secrets(auth) auth_config = self.auth_manager.configure_auth(auth) else: auth_config = None return ApiConnector( connector_type=type, base_url=base_url, auth_config=auth_config, endpoints=endpoints or {}, **kwargs, )
[docs] def add_source( self, type: str, base_url: str, auth: Optional[Dict[str, Any]] = None, endpoints: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: """ Add a source API connector. Args: type: Type of API connector (rest, graphql, etc.) base_url: Base URL of the API auth: Authentication configuration endpoints: Configured endpoints **kwargs: Additional configuration parameters """ self.logger.info(f"Adding source connector: {type} for {base_url}") self.source = self._create_connector(type, base_url, auth, endpoints, **kwargs) self.sources["source"] = self.source
[docs] def add_named_source( self, name: str, type: str, base_url: str, auth: Optional[Dict[str, Any]] = None, endpoints: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: """ Register an additional named source connector for aggregation workflows. Args: name: Logical source name used in aggregation requests type: Type of API connector (rest, graphql, etc.) base_url: Base URL of the API auth: Authentication configuration endpoints: Configured endpoints **kwargs: Additional connector configuration """ self.logger.info( f"Adding named source connector '{name}': {type} for {base_url}" ) connector = self._create_connector(type, base_url, auth, endpoints, **kwargs) self.sources[name] = connector if name == "source": self.source = connector
[docs] def add_target( self, type: str, base_url: str, auth: Optional[Dict[str, Any]] = None, endpoints: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: """ Add a target API connector. Args: type: Type of API connector (rest, graphql, etc.) base_url: Base URL of the API auth: Authentication configuration endpoints: Configured endpoints **kwargs: Additional configuration parameters """ self.logger.info(f"Adding target connector: {type} for {base_url}") self.target = self._create_connector(type, base_url, auth, endpoints, **kwargs)
[docs] def add_mapping( self, source: str, target: str, fields: List[Dict[str, Any]] ) -> None: """ Add a field mapping between source and target endpoints. Args: source: Source endpoint name target: Target endpoint name fields: List of field mappings """ self.logger.info( f"Adding mapping from {source} to {target} with {len(fields)} fields" ) self.mapper.add_mapping(source, target, fields)
[docs] def add_schedule(self, type: str, **kwargs: Any) -> None: """ Add a schedule for recurring syncs. Args: type: Type of schedule (interval, cron) **kwargs: Schedule-specific parameters """ self.logger.info(f"Adding schedule: {type}") self.scheduler.add_schedule(type, **kwargs)
def _initialize_security( self, config: Optional[Dict[str, Any]] = None ) -> SecurityManager: """ Initialize the security system based on provided configuration. Args: config: Security configuration dictionary Returns: SecurityManager instance """ if not config: config = {} # Extract security configuration master_password = config.get("master_password") storage_path = config.get("credential_storage_path") encryption_level = config.get("encryption_level", "none") encryption_key = config.get("encryption_key") enable_access_control = config.get("enable_access_control", False) # Initialize security manager security_manager = SecurityManager( master_password=master_password, storage_path=storage_path, encryption_level=encryption_level, encryption_key=encryption_key, enable_access_control=enable_access_control, ) # Set up initial users if access control is enabled if enable_access_control and "users" in config: for user_config in config["users"]: username = user_config.get("username") role = user_config.get("role", "viewer") api_key = user_config.get("api_key") if username: security_manager.add_user(username, role, api_key) return security_manager def _initialize_observability( self, config: Optional[Dict[str, Any]] = None ) -> TelemetryManager: """ Initialize the observability system based on provided configuration. Args: config: Observability configuration dictionary Returns: TelemetryManager instance """ if not config: config = {} # Create observability configuration obs_config = ObservabilityConfig( enabled=config.get("enabled", True), service_name=config.get("service_name", "apilinker"), enable_tracing=config.get("enable_tracing", True), enable_metrics=config.get("enable_metrics", True), export_to_console=config.get("export_to_console", False), export_to_prometheus=config.get("export_to_prometheus", False), prometheus_host=config.get("prometheus_host", "0.0.0.0"), prometheus_port=config.get("prometheus_port", 9090), ) return TelemetryManager(obs_config) def _initialize_secret_manager( self, config: Optional[Dict[str, Any]] = None ) -> Optional[SecretManager]: """ Initialize the secret management system based on provided configuration. Args: config: Secret manager configuration dictionary Returns: SecretManager instance or None if not configured """ if not config: return None try: # Create secret manager configuration from apilinker.core.secrets import SecretProvider, RotationStrategy provider_str = config.get("provider", "env") provider = SecretProvider(provider_str) rotation_str = config.get("rotation_strategy", "manual") rotation_strategy = RotationStrategy(rotation_str) secret_config = SecretManagerConfig( provider=provider, vault_config=config.get("vault"), aws_config=config.get("aws"), azure_config=config.get("azure"), gcp_config=config.get("gcp"), rotation_strategy=rotation_strategy, rotation_interval_days=config.get("rotation_interval_days", 90), cache_ttl_seconds=config.get("cache_ttl_seconds", 300), enable_least_privilege=config.get("enable_least_privilege", True), ) self.logger.info(f"Initialized secret manager with provider: {provider}") return SecretManager(secret_config) except Exception as e: self.logger.warning(f"Failed to initialize secret manager: {e}") return None def _resolve_secret(self, value: Any) -> Any: """ Resolve a secret reference to its actual value. Secret references can be specified as: - String starting with "secret://" (e.g., "secret://api-key") - Dict with "secret" key (e.g., {"secret": "api-key"}) Args: value: Value that may contain a secret reference Returns: Resolved secret value or original value if not a secret reference """ if not self.secret_manager: return value # Handle string secret references if isinstance(value, str) and value.startswith("secret://"): secret_name = value[9:] # Remove "secret://" prefix try: secret_value = self.secret_manager.get_secret(secret_name) self.logger.debug(f"Retrieved secret: {secret_name}") return secret_value except (SecretNotFoundError, SecretAccessError) as e: self.logger.error(f"Failed to retrieve secret '{secret_name}': {e}") raise # Handle dict secret references if isinstance(value, dict) and "secret" in value: secret_name = value["secret"] version = value.get("version") try: secret_value = self.secret_manager.get_secret(secret_name, version) self.logger.debug(f"Retrieved secret: {secret_name}") return secret_value except (SecretNotFoundError, SecretAccessError) as e: self.logger.error(f"Failed to retrieve secret '{secret_name}': {e}") raise return value def _resolve_auth_secrets(self, auth_config: Dict[str, Any]) -> Dict[str, Any]: """ Recursively resolve secret references in authentication configuration. Args: auth_config: Authentication configuration that may contain secret references Returns: Authentication configuration with resolved secrets """ if not self.secret_manager: return auth_config resolved_config = {} for key, value in auth_config.items(): if isinstance(value, dict): # Recursively resolve nested dicts resolved_config[key] = self._resolve_auth_secrets(value) elif isinstance(value, list): # Resolve each item in list resolved_config[key] = [ # type: ignore[assignment] ( self._resolve_secret(item) if isinstance(item, (str, dict)) else item ) for item in value ] else: # Resolve individual value resolved_config[key] = self._resolve_secret(value) return resolved_config def _configure_security(self, config: Dict[str, Any]) -> None: """ Configure security features based on provided configuration. Args: config: Security configuration dictionary """ self.logger.info("Configuring security features") # Update encryption level if specified if "encryption_level" in config: encryption_level = config["encryption_level"] try: if isinstance(encryption_level, str): encryption_level = EncryptionLevel[encryption_level.upper()] self.security_manager.request_encryption.encryption_level = ( encryption_level ) self.logger.debug(f"Updated encryption level to {encryption_level}") except (KeyError, ValueError): self.logger.warning(f"Invalid encryption level: {encryption_level}") # Add users if specified and access control is enabled if self.security_manager.enable_access_control and "users" in config: for user_config in config["users"]: username = user_config.get("username") role = user_config.get("role", "viewer") api_key = user_config.get("api_key") if username: self.security_manager.add_user(username, role, api_key) self.logger.debug(f"Added user {username} with role {role}") def _configure_error_handling(self, config: Dict[str, Any]) -> None: """ Configure the error handling system based on provided configuration. Args: config: Error handling configuration dictionary """ self.logger.info("Configuring error handling system") # Configure circuit breakers if "circuit_breakers" in config: for cb_name, cb_config in config["circuit_breakers"].items(): failure_threshold = cb_config.get("failure_threshold", 5) reset_timeout = cb_config.get("reset_timeout_seconds", 60) half_open_max_calls = cb_config.get("half_open_max_calls", 1) # Create and register circuit breaker circuit: CircuitBreaker = CircuitBreaker( name=cb_name, failure_threshold=failure_threshold, reset_timeout_seconds=reset_timeout, half_open_max_calls=half_open_max_calls, ) self.error_recovery_manager.circuit_breakers[cb_name] = circuit self.logger.debug(f"Configured circuit breaker: {cb_name}") # Configure recovery strategies if "recovery_strategies" in config: for category_name, strategies in config["recovery_strategies"].items(): try: error_category = ErrorCategory[category_name.upper()] strategy_list = [ RecoveryStrategy[str(s).upper()] for s in strategies ] self.error_recovery_manager.set_strategy( error_category, strategy_list ) self.logger.debug( f"Configured recovery strategies for {category_name}: {strategies}" ) except (KeyError, ValueError) as e: self.logger.warning( f"Invalid recovery strategy configuration: {str(e)}" ) # Configure DLQ if "dlq" in config: dlq_dir = config["dlq"].get("directory") if dlq_dir: self.dlq = DeadLetterQueue(dlq_dir) self.error_recovery_manager.dlq = self.dlq self.logger.info(f"Configured Dead Letter Queue at {dlq_dir}")
[docs] def get_error_analytics(self) -> Dict[str, Any]: """ Get error analytics summary. Returns: Dictionary with error statistics """ return self.error_analytics.get_summary()
[docs] def add_user( self, username: str, role: str, api_key: Optional[str] = None ) -> Dict[str, Any]: """ Add a user to the system with specified role. Args: username: Username to identify the user role: Access role (admin, operator, viewer, developer) api_key: Optional API key for authentication Returns: User data including generated API key if not provided """ if not self.security_manager.enable_access_control: raise ValueError( "Access control is not enabled. Enable it in the security configuration." ) return self.security_manager.add_user(username, role, api_key)
[docs] def list_users(self) -> List[Dict[str, Any]]: """ List all users in the system. Returns: List of user data dictionaries """ if not self.security_manager.enable_access_control: raise ValueError( "Access control is not enabled. Enable it in the security configuration." ) users = [] for username in self.security_manager.access_control.users: user_data = self.security_manager.access_control.get_user(username) # Remove sensitive data like API key if "api_key" in user_data: user_data["api_key"] = "*" * 8 # Mask API key users.append(user_data) return users
[docs] def store_credential(self, name: str, credential_data: Dict[str, Any]) -> bool: """ Store API credentials securely. Args: name: Name to identify the credential credential_data: Credential data to store Returns: True if successful, False otherwise """ return self.security_manager.store_credential(name, credential_data)
[docs] def get_credential(self, name: str) -> Optional[Dict[str, Any]]: """ Get stored API credentials. Args: name: Name of the credential Returns: Credential data if found, None otherwise """ return self.security_manager.get_credential(name)
[docs] def list_credentials(self) -> List[str]: """ List available credential names. Returns: List of credential names """ return self.security_manager.list_credentials()
[docs] def process_dlq( self, operation_type: Optional[str] = None, limit: int = 10 ) -> Dict[str, Any]: """ Process items in the Dead Letter Queue for retry. Args: operation_type: Optional operation type to filter by limit: Maximum number of items to process Returns: Dictionary with processing results """ self.logger.info( f"Processing Dead Letter Queue (type={operation_type}, limit={limit})" ) # Get DLQ items items = self.dlq.get_items(limit=limit) class DLQResults(TypedDict): total_processed: int successful: int failed: int items: List[Dict[str, Any]] results: DLQResults = { "total_processed": 0, "successful": 0, "failed": 0, "items": [], } for item in items: # Skip if not matching the requested operation type if ( operation_type and item.get("metadata", {}).get("operation_type") != operation_type ): continue item_id = item.get("id", "unknown") payload = item.get("payload", {}) metadata = item.get("metadata", {}) retry_result = { "id": item_id, "success": False, "message": "Operation not retried", } # Determine what type of operation this is and how to retry it if "endpoint" in payload and "source_" in metadata.get( "operation_type", "" ): # This is a source operation try: self.source.fetch_data( payload.get("endpoint"), payload.get("params") ) retry_result["success"] = True retry_result["message"] = "Successfully retried source operation" results["successful"] += 1 except Exception as e: retry_result["message"] = ( f"Failed to retry source operation: {str(e)}" ) results["failed"] += 1 elif "endpoint" in payload and "target_" in metadata.get( "operation_type", "" ): # This is a target operation try: self.target.send_data(payload.get("endpoint"), payload.get("data")) retry_result["success"] = True retry_result["message"] = "Successfully retried target operation" results["successful"] += 1 except Exception as e: retry_result["message"] = ( f"Failed to retry target operation: {str(e)}" ) results["failed"] += 1 else: # Unknown operation type retry_result["message"] = "Unknown operation type - cannot retry" results["failed"] += 1 results["total_processed"] += 1 results["items"].append(retry_result) self.logger.info( f"DLQ processing complete: {results['successful']} successful, {results['failed']} failed" ) return dict(results)
[docs] def fetch( self, endpoint: str, params: Optional[Dict[str, Any]] = None, ) -> Any: """ Convenience wrapper to fetch data from the configured source connector. Args: endpoint: Source endpoint name to fetch from params: Optional parameters for the request Returns: Parsed response payload from the source API """ if not self.source: raise ValueError("Source connector is not configured") return self.source.fetch_data(endpoint, params)
[docs] def stream( self, endpoint: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> Any: """ Convenience wrapper to stream a response from the configured source. Args: endpoint: Source endpoint name to stream from params: Optional query parameters **kwargs: Additional arguments forwarded to ``ApiConnector.stream_response`` Returns: A generator yielding byte chunks from the source response. """ if not self.source: raise ValueError("Source connector is not configured") return self.source.stream_response(endpoint, params, **kwargs)
[docs] def download( self, endpoint: str, destination_path: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> Dict[str, Any]: """ Convenience wrapper to download a streamed response from the source. Args: endpoint: Source endpoint name to download from destination_path: Destination file path params: Optional query parameters **kwargs: Additional arguments forwarded to ``ApiConnector.download_stream`` Returns: Download result metadata. """ if not self.source: raise ValueError("Source connector is not configured") return self.source.download_stream( endpoint, destination_path, params=params, **kwargs )
[docs] def send( self, endpoint: str, data: Union[Dict[str, Any], List[Dict[str, Any]]], **kwargs: Any, ) -> Any: """ Convenience wrapper to send data to the configured target connector. Args: endpoint: Target endpoint name to send to data: Payload to send (single item or list) Returns: Target connector response (if any) """ if not self.target: raise ValueError("Target connector is not configured") return self.target.send_data(endpoint, data, **kwargs)
def _resolve_source_connector(self, name: str) -> ApiConnector: """Resolve a source connector by logical name.""" connector = self.sources.get(name) if connector: return connector if name == "source" and self.source: return self.source raise ValueError(f"Source connector '{name}' is not registered")
[docs] def aggregate_source_data( self, source_data: Dict[str, Any], aggregation_config: Dict[str, Any], ) -> List[Dict[str, Any]]: """ Aggregate pre-fetched source payloads using the configured mapper rules. Args: source_data: Mapping of source name to payload aggregation_config: Aggregation config consumed by ``MultiSourceAggregator`` Returns: Aggregated record list. """ return self.multi_source_aggregator.aggregate(source_data, aggregation_config)
[docs] def aggregate_sources( self, source_requests: Dict[str, Dict[str, Any]], aggregation_config: Dict[str, Any], parallel: bool = True, ) -> List[Dict[str, Any]]: """ Fetch multiple sources and aggregate them into a single dataset. Args: source_requests: Mapping of source alias to request config. Each request supports ``connector``, ``endpoint``, and optional ``params``. aggregation_config: Multi-source aggregation config. parallel: Whether to fetch sources concurrently. Returns: Aggregated record list. """ if not source_requests: return [] def _fetch_source( alias: str, request_config: Dict[str, Any] ) -> Tuple[str, Any]: connector_name = request_config.get("connector", alias) endpoint = request_config["endpoint"] params = request_config.get("params") connector = self._resolve_source_connector(connector_name) return alias, connector.fetch_data(endpoint, params) source_payloads: Dict[str, Any] = {} if parallel and len(source_requests) > 1: max_workers = min(8, len(source_requests)) with ThreadPoolExecutor(max_workers=max_workers) as executor: future_map = { executor.submit(_fetch_source, alias, request_config): alias for alias, request_config in source_requests.items() } for future in as_completed(future_map): alias, payload = future.result() source_payloads[alias] = payload else: for alias, request_config in source_requests.items(): resolved_alias, payload = _fetch_source(alias, request_config) source_payloads[resolved_alias] = payload return self.aggregate_source_data(source_payloads, aggregation_config)
[docs] def sync( self, source_endpoint: Optional[str] = None, target_endpoint: Optional[str] = None, params: Optional[Dict[str, Any]] = None, max_retries: int = 3, retry_delay: float = 1.0, retry_backoff_factor: float = 2.0, retry_status_codes: Optional[List[int]] = None, ) -> SyncResult: """ Execute a sync operation between source and target APIs. Args: source_endpoint: Source endpoint to use (overrides mapping) target_endpoint: Target endpoint to use (overrides mapping) params: Additional parameters for the source API call max_retries: Maximum number of retry attempts for transient failures retry_delay: Initial delay between retries in seconds retry_backoff_factor: Multiplicative factor for retry delay retry_status_codes: HTTP status codes to retry (default: 429, 502, 503, 504) Returns: SyncResult: Result of the sync operation """ if not self.source or not self.target: raise ValueError( "Source and target connectors must be configured before syncing" ) # If no endpoints specified, use the first mapping if not source_endpoint or not target_endpoint: mapping = self.mapper.get_first_mapping() if not mapping: raise ValueError("No mapping configured and no endpoints specified") source_endpoint = mapping["source"] target_endpoint = mapping["target"] # Generate correlation ID for this sync operation correlation_id = str(uuid.uuid4()) start_time = time.time() # Wrap entire sync operation in distributed tracing with self.telemetry.trace_sync( source_endpoint, target_endpoint, correlation_id ): # Start provenance self.provenance.start_run( correlation_id=correlation_id, config_path=( config_path if (config_path := getattr(self, "_last_config_path", None)) else None ), source_endpoint=source_endpoint, target_endpoint=target_endpoint, ) # Default retry status codes if none provided if retry_status_codes is None: retry_status_codes = [429, 502, 503, 504] # Common transient failures self.logger.info( f"[{correlation_id}] Starting sync from {source_endpoint} to {target_endpoint}" ) # Initialize result object sync_result = SyncResult(correlation_id=correlation_id) # Get circuit breaker for source endpoint source_circuit_name = f"source_{source_endpoint}" source_cb = self.error_recovery_manager.get_circuit_breaker( source_circuit_name ) # Check if user has permission for this operation if self.security_manager.enable_access_control: current_user = getattr(self, "current_user", None) if current_user and not self.security_manager.check_permission( current_user, "run_sync" ): raise PermissionError( f"User {current_user} does not have permission to run sync operations" ) # Merge last_sync into params from state store if not provided if params is None: effective_params = None else: effective_params = dict(params) if self.state_store: # Ensure we have a dict if we are going to inject need_inject = (effective_params is None) or ( "updated_since" not in effective_params ) if need_inject: last_sync = self.state_store.get_last_sync(source_endpoint) if last_sync: effective_params = dict(effective_params or {}) effective_params["updated_since"] = last_sync # Always use standard non-encrypted call source_data, source_error = source_cb.execute( lambda: self.source.fetch_data(source_endpoint, effective_params) ) # If circuit breaker failed, try recovery strategies if source_error: # Create payload for retry fetch_payload = {"endpoint": source_endpoint, "params": params} # Apply recovery strategies success, result, error = self.error_recovery_manager.handle_error( error=source_error, payload=fetch_payload, operation=lambda p: self.source.fetch_data( p["endpoint"], p["params"] ), operation_type=source_circuit_name, max_retries=max_retries, retry_delay=retry_delay, retry_backoff_factor=retry_backoff_factor, ) if success: source_data = result else: # Record the error for analytics self.error_analytics.record_error(error) # Update result with error details end_time = time.time() sync_result.duration_ms = int((end_time - start_time) * 1000) sync_result.success = False sync_result.errors.append(error.to_dict()) self.logger.error(f"[{correlation_id}] Sync failed: {error}") return sync_result try: # Map fields according to configuration transformed_data = self.mapper.map_data( source_endpoint, target_endpoint, source_data ) # Optional strict validation against target request schema (if defined in connector) if ( self.validation_config.get("strict_mode") and is_validator_available() ): target_endpoint_cfg = ( self.target.endpoints.get(target_endpoint) if self.target else None ) if target_endpoint_cfg and target_endpoint_cfg.request_schema: if isinstance(transformed_data, list): for item in transformed_data: valid, diffs = validate_payload_against_schema( item, target_endpoint_cfg.request_schema ) if not valid: raise ApiLinkerError( message="Strict mode: target payload failed schema validation", error_category=ErrorCategory.VALIDATION, status_code=0, additional_context={"diffs": diffs}, ) else: valid, diffs = validate_payload_against_schema( transformed_data, target_endpoint_cfg.request_schema ) if not valid: raise ApiLinkerError( message="Strict mode: target payload failed schema validation", error_category=ErrorCategory.VALIDATION, status_code=0, additional_context={"diffs": diffs}, ) # Record source data metrics source_count = len(source_data) if isinstance(source_data, list) else 1 sync_result.details["source_count"] = source_count # Get circuit breaker for target endpoint target_circuit_name = f"target_{target_endpoint}" target_cb = self.error_recovery_manager.get_circuit_breaker( target_circuit_name ) # Idempotency: skip payloads we've already sent during replays def _send(): # If idempotency enabled, de-duplicate per item if self.idempotency_config.get("enabled") and isinstance( transformed_data, list ): salt = self.idempotency_config.get("salt", "") filtered = [] for item in transformed_data: key = generate_idempotency_key(item, salt=salt) if not self.deduplicator.has_seen(target_endpoint, key): self.deduplicator.mark_seen(target_endpoint, key) filtered.append(item) payload: Union[Dict[str, Any], List[Dict[str, Any]]] = filtered else: payload = transformed_data return self.target.send_data(target_endpoint, payload) # Always use standard non-encrypted call target_result, target_error = target_cb.execute(_send) # If circuit breaker failed, try recovery strategies if target_error: # Create payload for retry send_payload = { "endpoint": target_endpoint, "data": transformed_data, } # Apply recovery strategies success, result, error = self.error_recovery_manager.handle_error( error=target_error, payload=send_payload, operation=lambda p: self.target.send_data( p["endpoint"], p["data"] ), operation_type=target_circuit_name, max_retries=max_retries, retry_delay=retry_delay, retry_backoff_factor=retry_backoff_factor, ) if success: target_result = result else: # Record the error for analytics self.error_analytics.record_error(error) # Update result with error details end_time = time.time() sync_result.duration_ms = int((end_time - start_time) * 1000) sync_result.success = False sync_result.errors.append(error.to_dict()) self.logger.error(f"[{correlation_id}] Sync failed: {error}") return sync_result # Update result with success information sync_result.count = ( len(transformed_data) if isinstance(transformed_data, list) else 1 ) sync_result.success = True # Set target response directly if isinstance(target_result, dict): sync_result.target_response = target_result else: sync_result.target_response = {} # Calculate duration end_time = time.time() sync_result.duration_ms = int((end_time - start_time) * 1000) self.logger.info( f"[{correlation_id}] Sync completed successfully: {sync_result.count} items transferred in {sync_result.duration_ms}ms" ) # Update last_sync checkpoint if self.state_store: self.state_store.set_last_sync(source_endpoint, now_iso()) # Complete provenance self.provenance.complete_run( True, sync_result.count, sync_result.details ) # Record telemetry metrics self.telemetry.record_sync_completion( source_endpoint, target_endpoint, True, sync_result.count ) return sync_result except Exception as e: # Convert to ApiLinkerError error = ApiLinkerError.from_exception( e, error_category=ErrorCategory.MAPPING, correlation_id=correlation_id, operation_id=f"mapping_{source_endpoint}_to_{target_endpoint}", ) # Record the error for analytics self.error_analytics.record_error(error) # Update result end_time = time.time() sync_result.duration_ms = int((end_time - start_time) * 1000) sync_result.success = False sync_result.errors.append(error.to_dict()) self.logger.error( f"[{correlation_id}] Sync failed during mapping: {error}" ) # Record error in provenance self.provenance.record_error( error.message, category=error.error_category.value, status_code=error.status_code, endpoint=target_endpoint, ) self.provenance.complete_run(False, 0, {}) # Record telemetry metrics self.telemetry.record_sync_completion( source_endpoint, target_endpoint, False, 0 ) self.telemetry.record_error( error.error_category.value, "sync", error.message ) return sync_result
[docs] def start_scheduled_sync(self) -> None: """Start scheduled sync operations.""" self.logger.info("Starting scheduled sync") self.scheduler.start(self.sync)
[docs] def stop_scheduled_sync(self) -> None: """Stop scheduled sync operations.""" self.logger.info("Stopping scheduled sync") self.scheduler.stop()
def _with_retries( self, operation: Callable[[], Any], operation_name: str, max_retries: int, retry_delay: float, retry_backoff_factor: float, retry_status_codes: List[int], correlation_id: str, ) -> Tuple[Any, Optional[ErrorDetail]]: """ Execute an operation with configurable retry logic for transient failures. Args: operation: Callable function to execute operation_name: Name of operation for logging max_retries: Maximum number of retry attempts retry_delay: Initial delay between retries in seconds retry_backoff_factor: Multiplicative factor for retry delay retry_status_codes: HTTP status codes that should trigger a retry correlation_id: Correlation ID for tracing Returns: Tuple of (result, error_detail) - If successful, error_detail will be None """ current_delay = retry_delay for attempt in range(max_retries + 1): try: if attempt > 0: self.logger.info( f"[{correlation_id}] Retry attempt {attempt}/{max_retries} for {operation_name} after {current_delay:.2f}s delay" ) time.sleep(current_delay) current_delay *= retry_backoff_factor result = operation() if attempt > 0: self.logger.info( f"[{correlation_id}] Retry succeeded for {operation_name}" ) return result, None except Exception as e: status_code = getattr(e, "status_code", None) response_body = getattr(e, "response", None) request_url = getattr(e, "url", None) request_method = getattr(e, "method", None) # Convert response to string if it's not already if response_body and not isinstance(response_body, str): try: response_body = str(response_body)[:1000] # Limit size except: response_body = "<Unable to convert response to string>" error_detail = ErrorDetail( message=str(e), status_code=status_code, response_body=response_body, request_url=request_url, request_method=request_method, timestamp=datetime.now().isoformat(), error_type=( "transient_error" if status_code in retry_status_codes else "api_error" ), ) # Check if this is a retryable error is_retryable = ( status_code in retry_status_codes if status_code else False ) if is_retryable and attempt < max_retries: self.logger.warning( f"[{correlation_id}] {operation_name} failed with retryable error (status: {status_code}): {str(e)}" ) else: # Either not retryable or out of retries log_level = logging.WARNING if is_retryable else logging.ERROR retry_msg = ( "out of retry attempts" if is_retryable else "non-retryable error" ) self.logger.log( log_level, f"[{correlation_id}] {operation_name} failed with {retry_msg}: {str(e)}", ) return None, error_detail # We should never reach here, but just in case fallback_error = ErrorDetail( message=f"Unknown error during {operation_name}", timestamp=datetime.now().isoformat(), error_type="unknown_error", ) return None, fallback_error