Scheduler Overview
Architecture
Section titled “Architecture”+------------------------------+| Scheduler (tokio task) || - Checks schedules || - Calculates next run || - Enqueues via broker |+--------------+---------------+ | enqueue() v+------------------------------+| PostgreSQL || - tasks table || - schedule_state table |+--------------+---------------+ | NOTIFY v+------------------------------+| Workers || - Execute scheduled tasks |+------------------------------+The scheduler:
- Runs as a separate process (or tokio task) from workers
- Checks configured schedules at regular intervals
- Enqueues tasks when schedules are due
- Tracks state in database to prevent duplicates
Complete Example
Section titled “Complete Example”This is the smallest realistic scheduler setup: one registered task, one schedule, one scheduler process.
use chrono::NaiveTime;use horsies::{ task, AppConfig, DailySchedule, Horsies, ScheduleConfig, SchedulePattern, TaskError, TaskSchedule,};
#[task("cleanup_old_data")]async fn cleanup_old_data() -> Result<(), TaskError> { Ok(())}
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let config = AppConfig { schedule: Some(ScheduleConfig::new(vec![ TaskSchedule::new( "daily-cleanup", "cleanup_old_data", SchedulePattern::Daily(DailySchedule { time: NaiveTime::from_hms_opt(3, 0, 0).unwrap(), }), ), ])), ..AppConfig::for_database_url("postgresql://localhost/mydb") };
let mut app = Horsies::new(config)?; cleanup_old_data::register(&mut app)?;
app.run_scheduler().await?; Ok(())}In production, the scheduler usually runs in its own process and workers run in separate processes. The scheduler only enqueues; workers execute the scheduled tasks.
Configuration
Section titled “Configuration”Add a ScheduleConfig to your AppConfig:
use horsies::{ AppConfig, ScheduleConfig, TaskSchedule, SchedulePattern, DailySchedule,};use chrono::NaiveTime;
let config = AppConfig { schedule: Some(ScheduleConfig::new(vec![ TaskSchedule::new( "daily-cleanup", "cleanup_old_data", SchedulePattern::Daily(DailySchedule { time: NaiveTime::from_hms_opt(3, 0, 0).unwrap(), }), ), ])), ..AppConfig::for_database_url("postgresql://...")};Running the Scheduler
Section titled “Running the Scheduler”app.run_scheduler().await?;Run separately from workers. One scheduler per cluster is sufficient in normal deployments. Multiple schedulers can coexist safely because the scheduler loop serializes work with a PostgreSQL advisory lock. The startup path connects to PostgreSQL and ensures the Horsies schema before the scheduler loop begins.
For manual control over the scheduler lifecycle:
use horsies::spawn_scheduler;use tokio_util::sync::CancellationToken;
let cancel = CancellationToken::new();let handle = spawn_scheduler(broker, schedule_config, app_config, cancel.clone());
// Later, to stop:cancel.cancel();handle.await?;If you construct the broker yourself instead of taking it from app.get_broker().await?, ensure the schema first:
broker.ensure_schema_initialized().await?;What the Scheduler Actually Does
Section titled “What the Scheduler Actually Does”On startup, the scheduler:
- validates the configured schedules
- connects to PostgreSQL
- ensures the Horsies schema exists
- initializes or refreshes rows in
horsies_schedule_state
On each tick, it:
- acquires a PostgreSQL advisory lock
- checks which schedules are due
- enqueues deterministic task IDs for those runs
- updates
horsies_schedule_state
That advisory lock is why multiple scheduler instances do not double-enqueue the same run.
Key Concepts
Section titled “Key Concepts”Schedule Name
Section titled “Schedule Name”Each schedule has a unique name used for state tracking:
TaskSchedule::new( "daily-report", // Must be unique "generate_report", SchedulePattern::Daily(DailySchedule { time: NaiveTime::from_hms_opt(9, 0, 0).unwrap(), }),)Task Name
Section titled “Task Name”The task_name must match a registered #[task]:
#[task("generate_report")]async fn generate_report() -> Result<String, TaskError> { // ... Ok("done".into())}
TaskSchedule::new( "weekly-report", "generate_report", // Must match #[task] name SchedulePattern::Weekly(WeeklySchedule { days: vec![Weekday::Monday], time: NaiveTime::from_hms_opt(9, 0, 0).unwrap(), }),)Check Interval
Section titled “Check Interval”How often the scheduler checks for due schedules:
ScheduleConfig::new(vec![/* ... */]) .check_interval_seconds(1) // Check every secondRange: 1-60 seconds. Lower values provide better precision but more database queries.
State Tracking
Section titled “State Tracking”The schedule_state table tracks:
| Field | Purpose |
|---|---|
schedule_name | Schedule identifier |
last_run_at | When schedule last executed |
next_run_at | When schedule should run next |
last_task_id | ID of most recent enqueued task |
run_count | Total execution count |
config_hash | Detects configuration changes |
This prevents duplicate executions when:
- Scheduler restarts
- Multiple schedulers run (advisory locks serialize)
- Network issues cause delays
Catch-Up Logic
Section titled “Catch-Up Logic”When catch_up_missed=true, missed runs are executed:
TaskSchedule::new( "hourly-sync", "sync_data", SchedulePattern::Interval(IntervalSchedule { hours: Some(1), ..Default::default() }),).catch_up_missed(true) // Execute missed runsIf the scheduler was down for 3 hours, it will enqueue 3 tasks on restart, up to max_catch_up_runs for one scheduler tick.
When catch_up_missed=false (default), only the next scheduled run is executed.
Timezone Support
Section titled “Timezone Support”Each schedule can have its own timezone:
TaskSchedule::new( "morning-report", "send_report", SchedulePattern::Daily(DailySchedule { time: NaiveTime::from_hms_opt(9, 0, 0).unwrap(), }),).timezone("America/New_York") // 9 AM EasternUses IANA timezone names via chrono-tz. Default is “UTC”.
Validation
Section titled “Validation”Schedules are validated at startup:
- Task must be registered
- Queue must be valid (CUSTOM mode)
- Timezone must be a valid IANA name
// This will fail at scheduler start:TaskSchedule::new( "bad", "nonexistent_task", // Not registered SchedulePattern::Daily(DailySchedule { time: NaiveTime::from_hms_opt(9, 0, 0).unwrap(), }),)Graceful Shutdown
Section titled “Graceful Shutdown”The scheduler handles SIGTERM/SIGINT (or cancellation token) for clean shutdown. The current schedule check completes, then the scheduler exits.