Worker Architecture
Overview
Section titled “Overview”+----------------------------------------------------------+| Worker Process (single binary) || - Subscribes to NOTIFY channels || - Claims tasks from PostgreSQL || - Dispatches to tokio tasks || - Writes results back || || +----------------------------------------------------+ || | Tokio Runtime + Semaphore(concurrency) | || | +----------+ +----------+ +----------+ | || | | Task A | | Task B | | (idle) | ... | || | | (spawn) | | (spawn) | | | | || | +----------+ +----------+ +----------+ | || +----------------------------------------------------+ |+----------------------------------------------------------+Concurrency Model
Section titled “Concurrency Model”Workers use the tokio async runtime with a tokio::sync::Semaphore for concurrency control:
- Single process: One OS process per worker, no child processes
- Async tasks: Each task runs in a
tokio::spawngreen thread - Blocking tasks: CPU-bound work runs via
tokio::task::spawn_blocking - Concurrency: Configurable via
WorkerConfig.concurrency
Benefits:
- No process pool overhead or inter-process serialization
- Lightweight task spawning (thousands of concurrent tasks)
- Shared memory between tasks (broker connection pool)
- Panics in tasks are caught without killing the worker
Execution Flow
Section titled “Execution Flow”1. Startup
Section titled “1. Startup”// Programmatic startupapp.run_worker().await?;
// Or with custom configurationuse horsies::WorkerConfig;
let worker_config = WorkerConfig { concurrency: 8, ..Default::default()};app.run_worker_with(worker_config).await?;- Validate configuration
- Connect to PostgreSQL (broker pool + LISTEN/NOTIFY)
- Create semaphore with configured concurrency
- Subscribe to NOTIFY channels
2. Main Loop
Section titled “2. Main Loop”while not cancelled: claim_and_dispatch_all() wait_for_notify_or_poll()- Claim pass: Query for available tasks, claim them
- Dispatch: Spawn claimed tasks into tokio
- Wait: Listen for NOTIFY until new tasks arrive (with poll fallback)
- Repeat
3. Claiming
Section titled “3. Claiming”WITH next AS ( SELECT id FROM tasks WHERE queue_name = :queue AND status = 'PENDING' AND enqueued_at <= now() AND (good_until IS NULL OR good_until > now()) ORDER BY priority ASC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT :limit)UPDATE tasks SET status='CLAIMED', claimed_at=now()FROM next WHERE tasks.id = next.idRETURNING tasks.idKey features:
FOR UPDATE SKIP LOCKED: Non-blocking concurrent claiming- Priority ordering: Lower priority number = higher priority
- FIFO within same priority
- Expiry check: Skip tasks past
good_until - A second expiry guard runs when moving
ClaimedtoRunning; ifgood_untilpassed in the buffer, the task becomesExpiredand user code does not run
4. Task Execution
Section titled “4. Task Execution”Inside the spawned tokio task:
- Resolve task from registry by name
- Deserialize arguments via serde
- Start heartbeat (background tokio task)
- Call task function (
asyncorspawn_blockingfor blocking tasks) - Serialize result
- Stop heartbeat
5. Result Handling
Section titled “5. Result Handling”After task completes:
- Parse result (Ok or Err)
- Check for retry eligibility
- Update task status (COMPLETED or FAILED)
- Send NOTIFY for result waiters
Worker Configuration
Section titled “Worker Configuration”From WorkerConfig:
| Field | Description |
|---|---|
queues | Queue names to process |
concurrency | Maximum concurrent tasks |
max_claim_batch | Max claims per queue per pass |
max_claim_per_worker | Total claim limit |
queue_priorities | Priority ordering for queues |
queue_max_concurrency | Per-queue concurrency limits |
coalesce_notifies | NOTIFY messages to drain per wake |
loglevel | Log level for this worker |
Graceful Shutdown
Section titled “Graceful Shutdown”On SIGTERM/SIGINT:
- Stop accepting new claims
- Wait for running tasks to complete
- Close database connections
// The worker handles signals automatically.// Ctrl+C or SIGTERM triggers graceful shutdown.app.run_worker().await?;Resilience
Section titled “Resilience”Workers recover from transient infrastructure failures without operator intervention. Configure via WorkerResilienceConfig in AppConfig.
What It Handles
Section titled “What It Handles”- Database outages: Worker retries on connection loss during startup, claiming, and result writes. No crash, no manual restart.
- Silent NOTIFY channels: If the PostgreSQL NOTIFY mechanism stops delivering (broken listener connection, dropped trigger), the worker falls back to periodic polling so tasks are still picked up.
- Task panics: Panics in
tokio::spawntasks are caught. The task is marked FAILED. The worker continues operating.
Configuration
Section titled “Configuration”use horsies::WorkerResilienceConfig;
let config = AppConfig { resilience: WorkerResilienceConfig { db_retry_initial_ms: 500, // initial backoff on DB errors db_retry_max_ms: 30_000, // backoff cap db_retry_max_attempts: 0, // 0 = retry forever notify_poll_interval_ms: 5_000, // poll fallback when NOTIFY is silent }, ..AppConfig::for_database_url("postgresql://...")};| Field | Type | Default | Description |
|---|---|---|---|
db_retry_initial_ms | u64 | 500 | Initial backoff delay (100-60,000ms) |
db_retry_max_ms | u64 | 30000 | Maximum backoff delay (500-300,000ms) |
db_retry_max_attempts | u64 | 0 | Max retry attempts; 0 = infinite (0-10,000) |
notify_poll_interval_ms | u64 | 5000 | Poll interval when NOTIFY is silent (1,000-300,000ms) |
Backoff uses exponential growth (initial_ms * 2^attempt) with jitter, capped at db_retry_max_ms. Backoff resets after each successful loop iteration.
Error Handling
Section titled “Error Handling”| Error Type | Handling |
|---|---|
Task returns Err(TaskError) | Stored as FAILED with error code |
| Panic in task | Caught, wrapped as UNHANDLED_ERROR |
| Serialization error | WORKER_SERIALIZATION_ERROR |
| Task not found in registry | WORKER_RESOLUTION_ERROR |
| Worker crash (detected via heartbeat) | Marked FAILED by reaper |
Multiple Workers
Section titled “Multiple Workers”Run multiple worker instances for horizontal scaling:
// On machine 1: binary with concurrency=8let worker_config = WorkerConfig { concurrency: 8, ..Default::default() };app.run_worker_with(worker_config).await?;
// On machine 2: same binary, same config- Each worker claims independently
SKIP LOCKEDprevents duplicate claims- Advisory locks serialize certain operations
cluster_wide_caplimits total concurrency
Schema Setup
Section titled “Schema Setup”Workers ensure the Horsies schema during startup before entering the claim loop. If you want an explicit preflight step in provisioning scripts or standalone tools, call broker.migrate().await? or broker.ensure_schema_initialized().await? yourself.
let broker = PostgresBroker::connect_with(config).await?;broker.ensure_schema_initialized().await?;