API Reference

This section provides detailed API documentation for all ApiLinker modules and classes.

ApiLinker

Main ApiLinker class that orchestrates the connection, mapping, and data transfer between APIs.

class apilinker.api_linker.ApiLinker(config_path: str | None = None, source_config: Dict[str, Any] | None = None, target_config: Dict[str, Any] | None = None, mapping_config: Dict[str, Any] | None = None, schedule_config: Dict[str, Any] | None = None, error_handling_config: Dict[str, Any] | None = None, security_config: Dict[str, Any] | None = None, validation_config: Dict[str, Any] | None = None, observability_config: Dict[str, Any] | None = None, secret_manager_config: Dict[str, Any] | None = None, log_level: str = 'INFO', log_file: str | None = None)[source]

Bases: object

Main class for connecting, mapping and transferring data between APIs.

This class orchestrates the entire process of: 1. Connecting to source and target APIs 2. Fetching data from the source 3. Mapping fields according to configuration 4. Transforming data as needed 5. Sending data to the target 6. Scheduling recurring operations

Parameters:
  • config_path – Path to YAML/JSON configuration file

  • source_config – Direct source configuration dictionary

  • target_config – Direct target configuration dictionary

  • mapping_config – Direct mapping configuration dictionary

  • schedule_config – Direct scheduling configuration dictionary

  • log_level – Logging level (DEBUG, INFO, WARNING, ERROR)

  • log_file – Path to log file

add_mapping(source: str, target: str, fields: List[Dict[str, Any]]) None[source]

Add a field mapping between source and target endpoints.

Parameters:
  • source – Source endpoint name

  • target – Target endpoint name

  • fields – List of field mappings

add_named_source(name: str, type: str, base_url: str, auth: Dict[str, Any] | None = None, endpoints: Dict[str, Any] | None = None, **kwargs: Any) None[source]

Register an additional named source connector for aggregation workflows.

Parameters:
  • name – Logical source name used in aggregation requests

  • type – Type of API connector (rest, graphql, etc.)

  • base_url – Base URL of the API

  • auth – Authentication configuration

  • endpoints – Configured endpoints

  • **kwargs – Additional connector configuration

add_schedule(type: str, **kwargs: Any) None[source]

Add a schedule for recurring syncs.

Parameters:
  • type – Type of schedule (interval, cron)

  • **kwargs – Schedule-specific parameters

add_source(type: str, base_url: str, auth: Dict[str, Any] | None = None, endpoints: Dict[str, Any] | None = None, **kwargs: Any) None[source]

Add a source API connector.

Parameters:
  • type – Type of API connector (rest, graphql, etc.)

  • base_url – Base URL of the API

  • auth – Authentication configuration

  • endpoints – Configured endpoints

  • **kwargs – Additional configuration parameters

add_target(type: str, base_url: str, auth: Dict[str, Any] | None = None, endpoints: Dict[str, Any] | None = None, **kwargs: Any) None[source]

Add a target API connector.

Parameters:
  • type – Type of API connector (rest, graphql, etc.)

  • base_url – Base URL of the API

  • auth – Authentication configuration

  • endpoints – Configured endpoints

  • **kwargs – Additional configuration parameters

add_user(username: str, role: str, api_key: str | None = None) Dict[str, Any][source]

Add a user to the system with specified role.

Parameters:
  • username – Username to identify the user

  • role – Access role (admin, operator, viewer, developer)

  • api_key – Optional API key for authentication

Returns:

User data including generated API key if not provided

aggregate_source_data(source_data: Dict[str, Any], aggregation_config: Dict[str, Any]) List[Dict[str, Any]][source]

Aggregate pre-fetched source payloads using the configured mapper rules.

Parameters:
  • source_data – Mapping of source name to payload

  • aggregation_config – Aggregation config consumed by MultiSourceAggregator

Returns:

Aggregated record list.

aggregate_sources(source_requests: Dict[str, Dict[str, Any]], aggregation_config: Dict[str, Any], parallel: bool = True) List[Dict[str, Any]][source]

Fetch multiple sources and aggregate them into a single dataset.

Parameters:
  • source_requests – Mapping of source alias to request config. Each request supports connector, endpoint, and optional params.

  • aggregation_config – Multi-source aggregation config.

  • parallel – Whether to fetch sources concurrently.

Returns:

Aggregated record list.

download(endpoint: str, destination_path: str, params: Dict[str, Any] | None = None, **kwargs: Any) Dict[str, Any][source]

Convenience wrapper to download a streamed response from the source.

Parameters:
  • endpoint – Source endpoint name to download from

  • destination_path – Destination file path

  • params – Optional query parameters

  • **kwargs – Additional arguments forwarded to ApiConnector.download_stream

Returns:

Download result metadata.

fetch(endpoint: str, params: Dict[str, Any] | None = None) Any[source]

Convenience wrapper to fetch data from the configured source connector.

Parameters:
  • endpoint – Source endpoint name to fetch from

  • params – Optional parameters for the request

Returns:

Parsed response payload from the source API

get_credential(name: str) Dict[str, Any] | None[source]

Get stored API credentials.

Parameters:

name – Name of the credential

Returns:

Credential data if found, None otherwise

get_error_analytics() Dict[str, Any][source]

Get error analytics summary.

Returns:

Dictionary with error statistics

list_credentials() List[str][source]

List available credential names.

Returns:

List of credential names

list_users() List[Dict[str, Any]][source]

List all users in the system.

Returns:

List of user data dictionaries

load_config(config_path: str) None[source]

Load configuration from a YAML or JSON file.

Parameters:

config_path – Path to the configuration file

process_dlq(operation_type: str | None = None, limit: int = 10) Dict[str, Any][source]

Process items in the Dead Letter Queue for retry.

Parameters:
  • operation_type – Optional operation type to filter by

  • limit – Maximum number of items to process

Returns:

Dictionary with processing results

send(endpoint: str, data: Dict[str, Any] | List[Dict[str, Any]], **kwargs: Any) Any[source]

Convenience wrapper to send data to the configured target connector.

Parameters:
  • endpoint – Target endpoint name to send to

  • data – Payload to send (single item or list)

Returns:

Target connector response (if any)

start_scheduled_sync() None[source]

Start scheduled sync operations.

stop_scheduled_sync() None[source]

Stop scheduled sync operations.

store_credential(name: str, credential_data: Dict[str, Any]) bool[source]

Store API credentials securely.

Parameters:
  • name – Name to identify the credential

  • credential_data – Credential data to store

Returns:

True if successful, False otherwise

stream(endpoint: str, params: Dict[str, Any] | None = None, **kwargs: Any) Any[source]

Convenience wrapper to stream a response from the configured source.

Parameters:
  • endpoint – Source endpoint name to stream from

  • params – Optional query parameters

  • **kwargs – Additional arguments forwarded to ApiConnector.stream_response

Returns:

A generator yielding byte chunks from the source response.

sync(source_endpoint: str | None = None, target_endpoint: str | None = None, params: Dict[str, Any] | None = None, max_retries: int = 3, retry_delay: float = 1.0, retry_backoff_factor: float = 2.0, retry_status_codes: List[int] | None = None) SyncResult[source]

Execute a sync operation between source and target APIs.

Parameters:
  • source_endpoint – Source endpoint to use (overrides mapping)

  • target_endpoint – Target endpoint to use (overrides mapping)

  • params – Additional parameters for the source API call

  • max_retries – Maximum number of retry attempts for transient failures

  • retry_delay – Initial delay between retries in seconds

  • retry_backoff_factor – Multiplicative factor for retry delay

  • retry_status_codes – HTTP status codes to retry (default: 429, 502, 503, 504)

Returns:

Result of the sync operation

Return type:

SyncResult

class apilinker.api_linker.ErrorDetail(*, message: str, status_code: int | None = None, response_body: str | None = None, request_url: str | None = None, request_method: str | None = None, timestamp: str | None = None, error_type: str = 'general')[source]

Bases: BaseModel

Detailed error information for API requests.

error_type: str
classmethod from_apilinker_error(error: ApiLinkerError) ErrorDetail[source]

Convert an ApiLinkerError to ErrorDetail for backward compatibility.

message: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

request_method: str | None
request_url: str | None
response_body: str | None
status_code: int | None
timestamp: str | None
class apilinker.api_linker.SyncResult(*, count: int = 0, success: bool = True, errors: ~typing.List[~typing.Dict[str, ~typing.Any]] = <factory>, details: ~typing.Dict[str, ~typing.Any] = <factory>, correlation_id: str = <factory>, source_response: ~typing.Dict[str, ~typing.Any] = <factory>, target_response: ~typing.Dict[str, ~typing.Any] = <factory>, duration_ms: int | None = None)[source]

Bases: BaseModel

Result of a sync operation with enhanced error reporting.

correlation_id: str
count: int
details: Dict[str, Any]
duration_ms: int | None
errors: List[Dict[str, Any]]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

source_response: Dict[str, Any]
success: bool
target_response: Dict[str, Any]