Connector

API Connector module for handling connections to different types of APIs.

class apilinker.core.connector.ApiConnector(connector_type: str, base_url: str, auth_config: AuthConfig | None = None, endpoints: Dict[str, Dict[str, Any]] | None = None, timeout: int = 30, retry_count: int = 3, retry_delay: int = 1, default_headers: Dict[str, str] | None = None, **kwargs: Any)[source]

Bases: object

API Connector for interacting with REST APIs.

This class handles the connection to APIs, making requests, and processing responses.

Parameters:
  • connector_type – Type of connector (rest, graphql, etc.)

  • base_url – Base URL for the API

  • auth_config – Authentication configuration

  • endpoints – Dictionary of endpoint configurations

  • timeout – Request timeout in seconds

  • retry_count – Number of retries on failure

  • retry_delay – Delay between retries in seconds

check_health() HealthCheckResult[source]

Check the health of the API connection.

Returns:

HealthCheckResult indicating the status.

consume_sse(endpoint_name: str, params: Dict[str, Any] | None = None, processor: Callable[[List[Dict[str, Any]]], Any] | None = None, chunk_size: int = 50, max_events: int | None = None, backpressure_buffer_size: int = 500, drop_policy: str = 'block', **stream_kwargs: Any) Dict[str, Any][source]

Consume SSE events in chunks with bounded in-memory buffering.

This method adds chunked processing and explicit backpressure handling: - drop_policy='block': flushes buffer chunks before accepting more data. - drop_policy='drop_oldest': drops oldest buffered events when full.

Parameters:
  • endpoint_name – Endpoint key configured in self.endpoints.

  • params – Query parameters.

  • processor – Optional callback invoked with each chunk.

  • chunk_size – Number of events per processing chunk.

  • max_events – Maximum number of events to consume.

  • backpressure_buffer_size – Max in-memory event buffer.

  • drop_policyblock or drop_oldest.

  • **stream_kwargs – Extra kwargs forwarded to stream_sse.

Returns:

Summary dictionary with processing metrics and chunk results.

download_stream(endpoint_name: str, destination_path: str, params: Dict[str, Any] | None = None, chunk_size: int = 65536, resume: bool = True, read_timeout: float | None = None, progress_callback: Callable[[Dict[str, Any]], None] | None = None, extra_headers: Dict[str, str] | None = None) Dict[str, Any][source]

Download a streamed HTTP response directly to disk.

Downloads are written incrementally to keep memory usage bounded. When resume is enabled and the destination file already exists, ApiLinker will attempt a byte-range request and append only the remaining bytes when the upstream API returns 206 Partial Content.

fetch_data(endpoint_name: str, params: Dict[str, Any] | None = None) Dict[str, Any] | List[Dict[str, Any]][source]

Fetch data from the specified endpoint.

Parameters:
  • endpoint_name – Name of the endpoint to use

  • params – Additional parameters for the request

Returns:

The parsed response data

Raises:

ApiLinkerError – On API request failure with enhanced error context

send_data(endpoint_name: str, data: Dict[str, Any] | List[Dict[str, Any]]) Dict[str, Any][source]

Send data to the specified endpoint.

Parameters:
  • endpoint_name – Name of the endpoint to use

  • data – Data to send

Returns:

The parsed response

Raises:

ApiLinkerError – On API request failure with enhanced error context

stream_response(endpoint_name: str, params: Dict[str, Any] | None = None, chunk_size: int = 65536, read_timeout: float | None = None, progress_callback: Callable[[Dict[str, Any]], None] | None = None, resume_from: int = 0, extra_headers: Dict[str, str] | None = None) Generator[bytes, None, None][source]

Stream a generic HTTP response in bounded-size chunks.

This method supports memory-efficient chunk processing, optional progress callbacks, and byte-range resume when the upstream API supports Range requests.

Parameters:
  • endpoint_name – Endpoint key configured in self.endpoints.

  • params – Optional query parameters.

  • chunk_size – Byte size for each yielded chunk.

  • read_timeout – Per-stream read timeout. Defaults to connector timeout.

  • progress_callback – Optional callback invoked with progress metadata.

  • resume_from – Byte offset to resume from using a Range request.

  • extra_headers – Extra request headers merged into the stream request.

Yields:

Raw byte chunks from the response body.

stream_sse(endpoint_name: str, params: Dict[str, Any] | None = None, max_events: int | None = None, reconnect: bool = True, reconnect_delay: float = 1.0, max_reconnect_attempts: int | None = None, read_timeout: float | None = None, decode_json: bool = True) Generator[Dict[str, Any], None, None][source]

Stream events from an SSE endpoint.

Supports automatic reconnection, Last-Event-ID resume, and optional JSON decoding of event payloads.

Parameters:
  • endpoint_name – Endpoint key configured in self.endpoints.

  • params – Query parameters.

  • max_events – Stop after this many dispatched events.

  • reconnect – Whether to reconnect on disconnect/error.

  • reconnect_delay – Base reconnect delay in seconds.

  • max_reconnect_attempts – Maximum reconnect attempts (None = unlimited).

  • read_timeout – Per-stream read timeout. Defaults to connector timeout.

  • decode_json – Whether to decode JSON payloads automatically.

Yields:

Parsed SSE events with keysid, event, data, retry, raw_data.

class apilinker.core.connector.EndpointConfig(*, path: str, method: str = 'GET', params: ~typing.Dict[str, ~typing.Any] = <factory>, headers: ~typing.Dict[str, str] = <factory>, body_template: ~typing.Dict[str, ~typing.Any] | None = None, pagination: ~typing.Dict[str, ~typing.Any] | None = None, response_path: str | None = None, response_schema: ~typing.Dict[str, ~typing.Any] | None = None, request_schema: ~typing.Dict[str, ~typing.Any] | None = None, rate_limit: ~typing.Dict[str, ~typing.Any] | None = None, sse: ~typing.Dict[str, ~typing.Any] | None = None, streaming: ~typing.Dict[str, ~typing.Any] | None = None)[source]

Bases: BaseModel

Configuration for an API endpoint.

body_template: Dict[str, Any] | None
headers: Dict[str, str]
method: str
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

pagination: Dict[str, Any] | None
params: Dict[str, Any]
path: str
rate_limit: Dict[str, Any] | None
request_schema: Dict[str, Any] | None
response_path: str | None
response_schema: Dict[str, Any] | None
sse: Dict[str, Any] | None
streaming: Dict[str, Any] | None