Source code for apilinker.core.scheduler

"""
Scheduler for running API syncs on defined intervals or cron schedules.
"""

import logging
import threading
import time
from datetime import datetime, timedelta
from typing import Any, Callable, Dict, Optional, TypeVar, cast

from croniter import croniter

logger = logging.getLogger(__name__)

# Type variables for callbacks
T = TypeVar("T")
R = TypeVar("R")


[docs] class Scheduler: """ Scheduler for running API sync operations on defined schedules. This class handles scheduling of sync operations based on: - Interval: Run every N minutes/hours - Cron: Run according to cron expression - One-time: Run once at a specific time """ def __init__(self) -> None: self.schedule_type: Optional[str] = None self.schedule_config: Dict[str, Any] = {} self.running: bool = False self.thread: Optional[threading.Thread] = None self.last_run: Optional[datetime] = None logger.debug("Initialized Scheduler")
[docs] def add_schedule(self, type: str, **kwargs) -> None: """ Configure the schedule for sync operations. Args: type: Type of schedule ('interval', 'cron', or 'once') **kwargs: Schedule-specific parameters """ self.schedule_type = type self.schedule_config = kwargs # Validate schedule config based on type if type == "interval": if not any(k in kwargs for k in ["seconds", "minutes", "hours", "days"]): raise ValueError( "Interval schedule must specify seconds, minutes, hours, or days" ) elif type == "cron": if "expression" not in kwargs: raise ValueError("Cron schedule must specify an expression") # Validate cron expression try: croniter(kwargs["expression"], datetime.now()) except Exception as e: raise ValueError(f"Invalid cron expression: {str(e)}") elif type == "once": if "datetime" not in kwargs: raise ValueError("One-time schedule must specify a datetime") else: raise ValueError(f"Unsupported schedule type: {type}") logger.info(f"Added {type} schedule: {kwargs}")
[docs] def get_schedule_info(self) -> str: """Get human-readable description of the current schedule.""" if not self.schedule_type: return "No schedule configured" if self.schedule_type == "interval": interval_parts = [] for unit in ["seconds", "minutes", "hours", "days"]: if unit in self.schedule_config: value = self.schedule_config[unit] if value == 1: interval_parts.append( f"1 {unit[:-1]}" ) # Remove 's' for singular else: interval_parts.append(f"{value} {unit}") return f"Every {', '.join(interval_parts)}" elif self.schedule_type == "cron": return f"Cron: {self.schedule_config['expression']}" elif self.schedule_type == "once": return f"Once at {self.schedule_config['datetime']}" return "Unknown schedule"
def _calculate_next_run(self) -> datetime: """Calculate the next scheduled run time based on configuration.""" now = datetime.now() if self.schedule_type == "interval": interval = timedelta( seconds=self.schedule_config.get("seconds", 0), minutes=self.schedule_config.get("minutes", 0), hours=self.schedule_config.get("hours", 0), days=self.schedule_config.get("days", 0), ) if self.last_run: return self.last_run + interval else: return now + interval elif self.schedule_type == "cron" and "expression" in self.schedule_config: cron = croniter(self.schedule_config["expression"], now) return cast(datetime, cron.get_next(datetime)) elif self.schedule_type == "once" and "datetime" in self.schedule_config: return cast(datetime, self.schedule_config["datetime"]) # Default fallback return now + timedelta(hours=1) def _scheduler_loop( self, callback: Callable[..., Any], *args: Any, **kwargs: Any ) -> None: """ Main scheduler loop that runs the callback at scheduled times. Args: callback: Function to call on schedule ``*args``, ``**kwargs``: Arguments to pass to the callback """ self.running = True # One-time run for "once" schedule type if self.schedule_type == "once": target_time = self.schedule_config["datetime"] now = datetime.now() if target_time > now: # Sleep until target time sleep_seconds = (target_time - now).total_seconds() logger.info(f"Scheduled to run once in {sleep_seconds:.1f} seconds") # Wait for scheduled time or until stopped time_elapsed = 0 while self.running and time_elapsed < sleep_seconds: time.sleep(min(1, sleep_seconds - time_elapsed)) time_elapsed += 1 if self.running: self.last_run = datetime.now() try: logger.info("Running scheduled sync") callback(*args, **kwargs) except Exception as e: logger.error(f"Error in scheduled sync: {str(e)}") else: logger.warning("One-time schedule is in the past, not running") self.running = False return # Recurring schedule (interval or cron) while self.running: # Calculate next run time next_run = self._calculate_next_run() now = datetime.now() if next_run > now: # Sleep until next run time sleep_seconds = (next_run - now).total_seconds() logger.info(f"Next sync scheduled in {sleep_seconds:.1f} seconds") # Wait for next run time or until stopped time_elapsed = 0 while self.running and time_elapsed < sleep_seconds: time.sleep(min(1, sleep_seconds - time_elapsed)) time_elapsed += 1 # Run the callback if still running if self.running: self.last_run = datetime.now() try: logger.info("Running scheduled sync") callback(*args, **kwargs) except Exception as e: logger.error(f"Error in scheduled sync: {str(e)}")
[docs] def start(self, callback: Callable[..., Any], *args: Any, **kwargs: Any) -> None: """ Start the scheduler with the provided callback function. Args: callback: Function to call on schedule ``*args``, ``**kwargs``: Arguments to pass to the callback """ if not self.schedule_type: raise ValueError("Schedule not configured") if self.running: logger.warning("Scheduler is already running") return # Set running flag before starting thread self.running = True # Start the scheduler in a separate thread self.thread = threading.Thread( target=self._scheduler_loop, args=(callback,) + args, kwargs=kwargs, daemon=True, ) self.thread.start() logger.info("Scheduler started")
[docs] def stop(self) -> None: """Stop the scheduler.""" if not self.running: logger.warning("Scheduler is not running") return self.running = False if self.thread: self.thread.join(timeout=2.0) self.thread = None logger.info("Scheduler stopped")