Connector
API Connector module for handling connections to different types of APIs.
- class apilinker.core.connector.ApiConnector(connector_type: str, base_url: str, auth_config: AuthConfig | None = None, endpoints: Dict[str, Dict[str, Any]] | None = None, timeout: int = 30, retry_count: int = 3, retry_delay: int = 1, default_headers: Dict[str, str] | None = None, **kwargs: Any)[source]
Bases:
objectAPI Connector for interacting with REST APIs.
This class handles the connection to APIs, making requests, and processing responses.
- Parameters:
connector_type – Type of connector (rest, graphql, etc.)
base_url – Base URL for the API
auth_config – Authentication configuration
endpoints – Dictionary of endpoint configurations
timeout – Request timeout in seconds
retry_count – Number of retries on failure
retry_delay – Delay between retries in seconds
- check_health() HealthCheckResult[source]
Check the health of the API connection.
- Returns:
HealthCheckResult indicating the status.
- consume_sse(endpoint_name: str, params: Dict[str, Any] | None = None, processor: Callable[[List[Dict[str, Any]]], Any] | None = None, chunk_size: int = 50, max_events: int | None = None, backpressure_buffer_size: int = 500, drop_policy: str = 'block', **stream_kwargs: Any) Dict[str, Any][source]
Consume SSE events in chunks with bounded in-memory buffering.
This method adds chunked processing and explicit backpressure handling: -
drop_policy='block': flushes buffer chunks before accepting more data. -drop_policy='drop_oldest': drops oldest buffered events when full.- Parameters:
endpoint_name – Endpoint key configured in
self.endpoints.params – Query parameters.
processor – Optional callback invoked with each chunk.
chunk_size – Number of events per processing chunk.
max_events – Maximum number of events to consume.
backpressure_buffer_size – Max in-memory event buffer.
drop_policy –
blockordrop_oldest.**stream_kwargs – Extra kwargs forwarded to
stream_sse.
- Returns:
Summary dictionary with processing metrics and chunk results.
- download_stream(endpoint_name: str, destination_path: str, params: Dict[str, Any] | None = None, chunk_size: int = 65536, resume: bool = True, read_timeout: float | None = None, progress_callback: Callable[[Dict[str, Any]], None] | None = None, extra_headers: Dict[str, str] | None = None) Dict[str, Any][source]
Download a streamed HTTP response directly to disk.
Downloads are written incrementally to keep memory usage bounded. When
resumeis enabled and the destination file already exists, ApiLinker will attempt a byte-range request and append only the remaining bytes when the upstream API returns206 Partial Content.
- fetch_data(endpoint_name: str, params: Dict[str, Any] | None = None) Dict[str, Any] | List[Dict[str, Any]][source]
Fetch data from the specified endpoint.
- Parameters:
endpoint_name – Name of the endpoint to use
params – Additional parameters for the request
- Returns:
The parsed response data
- Raises:
ApiLinkerError – On API request failure with enhanced error context
- send_data(endpoint_name: str, data: Dict[str, Any] | List[Dict[str, Any]]) Dict[str, Any][source]
Send data to the specified endpoint.
- Parameters:
endpoint_name – Name of the endpoint to use
data – Data to send
- Returns:
The parsed response
- Raises:
ApiLinkerError – On API request failure with enhanced error context
- stream_response(endpoint_name: str, params: Dict[str, Any] | None = None, chunk_size: int = 65536, read_timeout: float | None = None, progress_callback: Callable[[Dict[str, Any]], None] | None = None, resume_from: int = 0, extra_headers: Dict[str, str] | None = None) Generator[bytes, None, None][source]
Stream a generic HTTP response in bounded-size chunks.
This method supports memory-efficient chunk processing, optional progress callbacks, and byte-range resume when the upstream API supports
Rangerequests.- Parameters:
endpoint_name – Endpoint key configured in
self.endpoints.params – Optional query parameters.
chunk_size – Byte size for each yielded chunk.
read_timeout – Per-stream read timeout. Defaults to connector timeout.
progress_callback – Optional callback invoked with progress metadata.
resume_from – Byte offset to resume from using a
Rangerequest.extra_headers – Extra request headers merged into the stream request.
- Yields:
Raw byte chunks from the response body.
- stream_sse(endpoint_name: str, params: Dict[str, Any] | None = None, max_events: int | None = None, reconnect: bool = True, reconnect_delay: float = 1.0, max_reconnect_attempts: int | None = None, read_timeout: float | None = None, decode_json: bool = True) Generator[Dict[str, Any], None, None][source]
Stream events from an SSE endpoint.
Supports automatic reconnection, Last-Event-ID resume, and optional JSON decoding of event payloads.
- Parameters:
endpoint_name – Endpoint key configured in
self.endpoints.params – Query parameters.
max_events – Stop after this many dispatched events.
reconnect – Whether to reconnect on disconnect/error.
reconnect_delay – Base reconnect delay in seconds.
max_reconnect_attempts – Maximum reconnect attempts (None = unlimited).
read_timeout – Per-stream read timeout. Defaults to connector timeout.
decode_json – Whether to decode JSON payloads automatically.
- Yields:
Parsed SSE events with keys –
id,event,data,retry,raw_data.
- class apilinker.core.connector.EndpointConfig(*, path: str, method: str = 'GET', params: ~typing.Dict[str, ~typing.Any] = <factory>, headers: ~typing.Dict[str, str] = <factory>, body_template: ~typing.Dict[str, ~typing.Any] | None = None, pagination: ~typing.Dict[str, ~typing.Any] | None = None, response_path: str | None = None, response_schema: ~typing.Dict[str, ~typing.Any] | None = None, request_schema: ~typing.Dict[str, ~typing.Any] | None = None, rate_limit: ~typing.Dict[str, ~typing.Any] | None = None, sse: ~typing.Dict[str, ~typing.Any] | None = None, streaming: ~typing.Dict[str, ~typing.Any] | None = None)[source]
Bases:
BaseModelConfiguration for an API endpoint.
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].