ApiLinker
Main ApiLinker class that orchestrates the connection, mapping, and data transfer between APIs.
- class apilinker.api_linker.ApiLinker(config_path: str | None = None, source_config: Dict[str, Any] | None = None, target_config: Dict[str, Any] | None = None, mapping_config: Dict[str, Any] | None = None, schedule_config: Dict[str, Any] | None = None, error_handling_config: Dict[str, Any] | None = None, security_config: Dict[str, Any] | None = None, validation_config: Dict[str, Any] | None = None, observability_config: Dict[str, Any] | None = None, secret_manager_config: Dict[str, Any] | None = None, log_level: str = 'INFO', log_file: str | None = None)[source]
Bases:
objectMain class for connecting, mapping and transferring data between APIs.
This class orchestrates the entire process of: 1. Connecting to source and target APIs 2. Fetching data from the source 3. Mapping fields according to configuration 4. Transforming data as needed 5. Sending data to the target 6. Scheduling recurring operations
- Parameters:
config_path – Path to YAML/JSON configuration file
source_config – Direct source configuration dictionary
target_config – Direct target configuration dictionary
mapping_config – Direct mapping configuration dictionary
schedule_config – Direct scheduling configuration dictionary
log_level – Logging level (DEBUG, INFO, WARNING, ERROR)
log_file – Path to log file
- add_mapping(source: str, target: str, fields: List[Dict[str, Any]]) None[source]
Add a field mapping between source and target endpoints.
- Parameters:
source – Source endpoint name
target – Target endpoint name
fields – List of field mappings
- add_named_source(name: str, type: str, base_url: str, auth: Dict[str, Any] | None = None, endpoints: Dict[str, Any] | None = None, **kwargs: Any) None[source]
Register an additional named source connector for aggregation workflows.
- Parameters:
name – Logical source name used in aggregation requests
type – Type of API connector (rest, graphql, etc.)
base_url – Base URL of the API
auth – Authentication configuration
endpoints – Configured endpoints
**kwargs – Additional connector configuration
- add_schedule(type: str, **kwargs: Any) None[source]
Add a schedule for recurring syncs.
- Parameters:
type – Type of schedule (interval, cron)
**kwargs – Schedule-specific parameters
- add_source(type: str, base_url: str, auth: Dict[str, Any] | None = None, endpoints: Dict[str, Any] | None = None, **kwargs: Any) None[source]
Add a source API connector.
- Parameters:
type – Type of API connector (rest, graphql, etc.)
base_url – Base URL of the API
auth – Authentication configuration
endpoints – Configured endpoints
**kwargs – Additional configuration parameters
- add_target(type: str, base_url: str, auth: Dict[str, Any] | None = None, endpoints: Dict[str, Any] | None = None, **kwargs: Any) None[source]
Add a target API connector.
- Parameters:
type – Type of API connector (rest, graphql, etc.)
base_url – Base URL of the API
auth – Authentication configuration
endpoints – Configured endpoints
**kwargs – Additional configuration parameters
- add_user(username: str, role: str, api_key: str | None = None) Dict[str, Any][source]
Add a user to the system with specified role.
- Parameters:
username – Username to identify the user
role – Access role (admin, operator, viewer, developer)
api_key – Optional API key for authentication
- Returns:
User data including generated API key if not provided
- aggregate_source_data(source_data: Dict[str, Any], aggregation_config: Dict[str, Any]) List[Dict[str, Any]][source]
Aggregate pre-fetched source payloads using the configured mapper rules.
- Parameters:
source_data – Mapping of source name to payload
aggregation_config – Aggregation config consumed by
MultiSourceAggregator
- Returns:
Aggregated record list.
- aggregate_sources(source_requests: Dict[str, Dict[str, Any]], aggregation_config: Dict[str, Any], parallel: bool = True) List[Dict[str, Any]][source]
Fetch multiple sources and aggregate them into a single dataset.
- Parameters:
source_requests – Mapping of source alias to request config. Each request supports
connector,endpoint, and optionalparams.aggregation_config – Multi-source aggregation config.
parallel – Whether to fetch sources concurrently.
- Returns:
Aggregated record list.
- download(endpoint: str, destination_path: str, params: Dict[str, Any] | None = None, **kwargs: Any) Dict[str, Any][source]
Convenience wrapper to download a streamed response from the source.
- Parameters:
endpoint – Source endpoint name to download from
destination_path – Destination file path
params – Optional query parameters
**kwargs – Additional arguments forwarded to
ApiConnector.download_stream
- Returns:
Download result metadata.
- fetch(endpoint: str, params: Dict[str, Any] | None = None) Any[source]
Convenience wrapper to fetch data from the configured source connector.
- Parameters:
endpoint – Source endpoint name to fetch from
params – Optional parameters for the request
- Returns:
Parsed response payload from the source API
- get_credential(name: str) Dict[str, Any] | None[source]
Get stored API credentials.
- Parameters:
name – Name of the credential
- Returns:
Credential data if found, None otherwise
- get_error_analytics() Dict[str, Any][source]
Get error analytics summary.
- Returns:
Dictionary with error statistics
- list_credentials() List[str][source]
List available credential names.
- Returns:
List of credential names
- list_users() List[Dict[str, Any]][source]
List all users in the system.
- Returns:
List of user data dictionaries
- load_config(config_path: str) None[source]
Load configuration from a YAML or JSON file.
- Parameters:
config_path – Path to the configuration file
- process_dlq(operation_type: str | None = None, limit: int = 10) Dict[str, Any][source]
Process items in the Dead Letter Queue for retry.
- Parameters:
operation_type – Optional operation type to filter by
limit – Maximum number of items to process
- Returns:
Dictionary with processing results
- send(endpoint: str, data: Dict[str, Any] | List[Dict[str, Any]], **kwargs: Any) Any[source]
Convenience wrapper to send data to the configured target connector.
- Parameters:
endpoint – Target endpoint name to send to
data – Payload to send (single item or list)
- Returns:
Target connector response (if any)
- store_credential(name: str, credential_data: Dict[str, Any]) bool[source]
Store API credentials securely.
- Parameters:
name – Name to identify the credential
credential_data – Credential data to store
- Returns:
True if successful, False otherwise
- stream(endpoint: str, params: Dict[str, Any] | None = None, **kwargs: Any) Any[source]
Convenience wrapper to stream a response from the configured source.
- Parameters:
endpoint – Source endpoint name to stream from
params – Optional query parameters
**kwargs – Additional arguments forwarded to
ApiConnector.stream_response
- Returns:
A generator yielding byte chunks from the source response.
- sync(source_endpoint: str | None = None, target_endpoint: str | None = None, params: Dict[str, Any] | None = None, max_retries: int = 3, retry_delay: float = 1.0, retry_backoff_factor: float = 2.0, retry_status_codes: List[int] | None = None) SyncResult[source]
Execute a sync operation between source and target APIs.
- Parameters:
source_endpoint – Source endpoint to use (overrides mapping)
target_endpoint – Target endpoint to use (overrides mapping)
params – Additional parameters for the source API call
max_retries – Maximum number of retry attempts for transient failures
retry_delay – Initial delay between retries in seconds
retry_backoff_factor – Multiplicative factor for retry delay
retry_status_codes – HTTP status codes to retry (default: 429, 502, 503, 504)
- Returns:
Result of the sync operation
- Return type:
- class apilinker.api_linker.ErrorDetail(*, message: str, status_code: int | None = None, response_body: str | None = None, request_url: str | None = None, request_method: str | None = None, timestamp: str | None = None, error_type: str = 'general')[source]
Bases:
BaseModelDetailed error information for API requests.
- classmethod from_apilinker_error(error: ApiLinkerError) ErrorDetail[source]
Convert an ApiLinkerError to ErrorDetail for backward compatibility.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class apilinker.api_linker.SyncResult(*, count: int = 0, success: bool = True, errors: ~typing.List[~typing.Dict[str, ~typing.Any]] = <factory>, details: ~typing.Dict[str, ~typing.Any] = <factory>, correlation_id: str = <factory>, source_response: ~typing.Dict[str, ~typing.Any] = <factory>, target_response: ~typing.Dict[str, ~typing.Any] = <factory>, duration_ms: int | None = None)[source]
Bases:
BaseModelResult of a sync operation with enhanced error reporting.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].