Skip to content

Scheduler Overview

┌─────────────────────────────┐
│ Scheduler Process │
│ - Checks schedules │
│ - Calculates next run │
│ - Enqueues via broker │
└──────────────┬──────────────┘
│ enqueue_async()
┌─────────────────────────────┐
│ PostgreSQL │
│ - tasks table │
│ - schedule_state table │
└──────────────┬──────────────┘
│ NOTIFY
┌─────────────────────────────┐
│ Workers │
│ - Execute scheduled tasks │
└─────────────────────────────┘

The scheduler:

  1. Runs as a separate process/dyno from workers
  2. Checks configured schedules at regular intervals
  3. Enqueues tasks when schedules are due
  4. Tracks state in database to prevent duplicates

Add a ScheduleConfig to your AppConfig:

from horsies.core.models.schedule import ScheduleConfig, TaskSchedule, DailySchedule
from datetime import time
config = AppConfig(
broker=PostgresConfig(...),
schedule=ScheduleConfig(
enabled=True,
check_interval_seconds=1,
schedules=[
TaskSchedule(
name="daily-cleanup",
task_name="cleanup_old_data",
pattern=DailySchedule(time=time(3, 0, 0)),
timezone="UTC",
),
],
),
)
Terminal window
horsies scheduler myapp.instance:app --loglevel=INFO

Run separately from workers. One scheduler per cluster is sufficient.

Each schedule has a unique name used for state tracking:

TaskSchedule(
name="daily-report", # Must be unique
task_name="generate_report",
...
)

The task_name must match a registered @app.task():

@app.task("generate_report")
def generate_report() -> TaskResult[str, TaskError]:
...
TaskSchedule(
name="...",
task_name="generate_report", # Must match decorator
...
)

How often the scheduler checks for due schedules:

ScheduleConfig(
check_interval_seconds=1, # Check every second
...
)

Range: 1-60 seconds. Lower values provide better precision but more database queries.

The schedule_state table tracks:

FieldPurpose
schedule_nameSchedule identifier
last_run_atWhen schedule last executed
next_run_atWhen schedule should run next
last_task_idID of most recent enqueued task
run_countTotal execution count
config_hashDetects configuration changes

This prevents duplicate executions when:

  • Scheduler restarts
  • Multiple schedulers run (advisory locks serialize)
  • Network issues cause delays

When catch_up_missed=True, missed runs are executed:

TaskSchedule(
name="hourly-sync",
task_name="sync_data",
pattern=IntervalSchedule(hours=1),
catch_up_missed=True, # Execute missed runs
)

If the scheduler was down for 3 hours, it will enqueue 3 tasks on restart.

When catch_up_missed=False (default), only the next scheduled run is executed.

Each schedule can have its own timezone:

TaskSchedule(
name="morning-report",
task_name="send_report",
pattern=DailySchedule(time=time(9, 0, 0)),
timezone="America/New_York", # 9 AM Eastern
)

Uses Python’s zoneinfo module. Default is “UTC”.

Schedules are validated at startup:

  • Task must be registered
  • Queue must be valid (CUSTOM mode)
  • Required arguments must be provided
# This will fail at scheduler start:
TaskSchedule(
name="bad",
task_name="nonexistent_task", # Not registered
...
)

The scheduler handles SIGTERM/SIGINT for clean shutdown:

Terminal window
# Sends SIGTERM
kill <scheduler_pid>

Current schedule check completes, then scheduler exits.