Scheduler Overview
Architecture
Section titled “Architecture”┌─────────────────────────────┐│ Scheduler Process ││ - Checks schedules ││ - Calculates next run ││ - Enqueues via broker │└──────────────┬──────────────┘ │ enqueue_async() ▼┌─────────────────────────────┐│ PostgreSQL ││ - tasks table ││ - schedule_state table │└──────────────┬──────────────┘ │ NOTIFY ▼┌─────────────────────────────┐│ Workers ││ - Execute scheduled tasks │└─────────────────────────────┘The scheduler:
- Runs as a separate process/dyno from workers
- Checks configured schedules at regular intervals
- Enqueues tasks when schedules are due
- Tracks state in database to prevent duplicates
Configuration
Section titled “Configuration”Add a ScheduleConfig to your AppConfig:
from horsies.core.models.schedule import ScheduleConfig, TaskSchedule, DailySchedulefrom datetime import time
config = AppConfig( broker=PostgresConfig(...), schedule=ScheduleConfig( enabled=True, check_interval_seconds=1, schedules=[ TaskSchedule( name="daily-cleanup", task_name="cleanup_old_data", pattern=DailySchedule(time=time(3, 0, 0)), timezone="UTC", ), ], ),)Running the Scheduler
Section titled “Running the Scheduler”horsies scheduler myapp.instance:app --loglevel=INFORun separately from workers. One scheduler per cluster is sufficient.
Key Concepts
Section titled “Key Concepts”Schedule Name
Section titled “Schedule Name”Each schedule has a unique name used for state tracking:
TaskSchedule( name="daily-report", # Must be unique task_name="generate_report", ...)Task Name
Section titled “Task Name”The task_name must match a registered @app.task():
@app.task("generate_report")def generate_report() -> TaskResult[str, TaskError]: ...
TaskSchedule( name="...", task_name="generate_report", # Must match decorator ...)Check Interval
Section titled “Check Interval”How often the scheduler checks for due schedules:
ScheduleConfig( check_interval_seconds=1, # Check every second ...)Range: 1-60 seconds. Lower values provide better precision but more database queries.
State Tracking
Section titled “State Tracking”The schedule_state table tracks:
| Field | Purpose |
|---|---|
schedule_name | Schedule identifier |
last_run_at | When schedule last executed |
next_run_at | When schedule should run next |
last_task_id | ID of most recent enqueued task |
run_count | Total execution count |
config_hash | Detects configuration changes |
This prevents duplicate executions when:
- Scheduler restarts
- Multiple schedulers run (advisory locks serialize)
- Network issues cause delays
Catch-Up Logic
Section titled “Catch-Up Logic”When catch_up_missed=True, missed runs are executed:
TaskSchedule( name="hourly-sync", task_name="sync_data", pattern=IntervalSchedule(hours=1), catch_up_missed=True, # Execute missed runs)If the scheduler was down for 3 hours, it will enqueue 3 tasks on restart.
When catch_up_missed=False (default), only the next scheduled run is executed.
Timezone Support
Section titled “Timezone Support”Each schedule can have its own timezone:
TaskSchedule( name="morning-report", task_name="send_report", pattern=DailySchedule(time=time(9, 0, 0)), timezone="America/New_York", # 9 AM Eastern)Uses Python’s zoneinfo module. Default is “UTC”.
Validation
Section titled “Validation”Schedules are validated at startup:
- Task must be registered
- Queue must be valid (CUSTOM mode)
- Required arguments must be provided
# This will fail at scheduler start:TaskSchedule( name="bad", task_name="nonexistent_task", # Not registered ...)Graceful Shutdown
Section titled “Graceful Shutdown”The scheduler handles SIGTERM/SIGINT for clean shutdown:
# Sends SIGTERMkill <scheduler_pid>Current schedule check completes, then scheduler exits.