Worker Architecture
Overview
Section titled “Overview”┌─────────────────────────────────────────────────────────┐│ Worker Process (Main) ││ - Subscribes to NOTIFY channels ││ - Claims tasks from PostgreSQL ││ - Dispatches to process pool ││ - Writes results back ││ ││ ┌─────────────────────────────────────────────────┐ ││ │ ProcessPoolExecutor │ ││ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ ││ │ │ Child 1 │ │ Child 2 │ │ Child N │ ... │ ││ │ │ Task A │ │ Task B │ │ (idle) │ │ ││ │ └─────────┘ └─────────┘ └─────────┘ │ ││ └─────────────────────────────────────────────────┘ │└─────────────────────────────────────────────────────────┘Process Model
Section titled “Process Model”Workers use Python’s ProcessPoolExecutor:
- Main process: Coordinates claiming, dispatching, result handling
- Child processes: Execute task code in isolation
- N workers: Configurable via
--processesflag
Benefits:
- GIL doesn’t block concurrent task execution
- Crashed tasks don’t kill the main worker
- Memory isolation between tasks
Execution Flow
Section titled “Execution Flow”1. Startup
Section titled “1. Startup”# In CLIhorsies worker myapp.instance:app --processes=8- Load app from module
- Import task modules (for registry population)
- Create ProcessPoolExecutor with N workers
- Initialize child processes (each loads app independently)
- Subscribe to NOTIFY channels
2. Main Loop
Section titled “2. Main Loop”while not stopped: await claim_and_dispatch_all() await wait_for_any_notify()- Claim pass: Query for available tasks, claim them
- Dispatch: Submit claimed tasks to process pool
- Wait: Block on NOTIFY until new tasks arrive
- Repeat
3. Claiming
Section titled “3. Claiming”WITH next AS ( SELECT id FROM tasks WHERE queue_name = :queue AND status = 'PENDING' AND enqueued_at <= now() AND (good_until IS NULL OR good_until > now()) ORDER BY priority ASC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT :limit)UPDATE tasks SET status='CLAIMED', claimed_at=now()FROM next WHERE tasks.id = next.idRETURNING tasks.idKey features:
FOR UPDATE SKIP LOCKED: Non-blocking concurrent claiming- Priority ordering: Lower priority number = higher priority
- FIFO within same priority
- Expiry check: Skip tasks past
good_until
4. Task Execution
Section titled “4. Task Execution”In child process:
- Resolve task from registry by name
- Deserialize arguments
- Start heartbeat thread
- Call task function
- Serialize result
- Stop heartbeat thread
5. Result Handling
Section titled “5. Result Handling”Main process receives result from child:
- Parse TaskResult (ok or err)
- Check for retry eligibility
- Update task status (COMPLETED or FAILED)
- Send NOTIFY for result waiters
Child Process Initialization
Section titled “Child Process Initialization”Each child process:
- Loads app via
_child_initializer() - Imports task modules
- Populates local task registry
- Suppresses sends during import (prevents side effects)
def _child_initializer(app_locator, imports): app = _locate_app(app_locator) set_current_app(app) app.suppress_sends(True) for module in imports: import_module(module) app.suppress_sends(False)Worker Configuration
Section titled “Worker Configuration”From WorkerConfig:
| Field | Description |
|---|---|
dsn | SQLAlchemy database URL |
queues | Queue names to process |
processes | Child process count |
max_claim_batch | Max claims per queue per pass |
max_claim_per_worker | Total claim limit |
app_locator | Path to app instance |
imports | Task module paths |
Graceful Shutdown
Section titled “Graceful Shutdown”On SIGTERM/SIGINT:
- Stop accepting new claims
- Wait for running tasks to complete
- Close process pool
- Close database connections
async def stop(self): self._stop.set() await self.listener.close() self._executor.shutdown(wait=True)Resilience
Section titled “Resilience”Workers recover from transient infrastructure failures without operator intervention. Configure via WorkerResilienceConfig in AppConfig.
What It Handles
Section titled “What It Handles”- Database outages: Worker retries on connection loss during startup, claiming, and result writes. No crash, no manual restart.
- Silent NOTIFY channels: If the PostgreSQL NOTIFY mechanism stops delivering (broken listener connection, dropped trigger), the worker falls back to periodic polling so tasks are still picked up.
- Broken process pool: If child processes crash, the process pool is recreated. Tasks still in CLAIMED status are requeued back to PENDING. Tasks already in RUNNING status are handled by the recovery reaper: if the task has a retry policy with
WORKER_CRASHEDinauto_retry_forand retries remaining, the task is scheduled for retry; otherwise it is marked FAILED.
Configuration
Section titled “Configuration”from horsies import WorkerResilienceConfig
config = AppConfig( broker=broker, resilience=WorkerResilienceConfig( db_retry_initial_ms=500, # initial backoff on DB errors db_retry_max_ms=30_000, # backoff cap db_retry_max_attempts=0, # 0 = retry forever notify_poll_interval_ms=5_000, # poll fallback when NOTIFY is silent ),)| Field | Type | Default | Description |
|---|---|---|---|
db_retry_initial_ms | int | 500 | Initial backoff delay (100–60,000ms) |
db_retry_max_ms | int | 30000 | Maximum backoff delay (500–300,000ms) |
db_retry_max_attempts | int | 0 | Max retry attempts; 0 = infinite (0–10,000) |
notify_poll_interval_ms | int | 5000 | Poll interval when NOTIFY is silent (1,000–300,000ms) |
Backoff uses exponential growth (initial_ms * 2^attempt) with ±25% jitter, capped at db_retry_max_ms. Backoff resets after each successful loop iteration.
Module Discovery
Section titled “Module Discovery”Workers need to know where tasks are defined:
# In instance.py# Dotted module paths (recommended)app.discover_tasks(["myapp.tasks", "myapp.jobs.worker_tasks"])
# Or file pathsapp.discover_tasks(["tasks.py", "more_tasks.py"])This list is passed to child processes for import.
Error Handling
Section titled “Error Handling”| Error Type | Handling |
|---|---|
| Task exception | Wrapped in TaskResult(err=…) |
| Serialization error | WORKER_SERIALIZATION_ERROR |
| Task not found | WORKER_RESOLUTION_ERROR |
| Child crash | Detected via heartbeat, marked FAILED |
Multiple Workers
Section titled “Multiple Workers”Run multiple worker processes for horizontal scaling:
# On machine 1horsies worker myapp.instance:app --processes=8
# On machine 2horsies worker myapp.instance:app --processes=8- Each worker claims independently
SKIP LOCKEDprevents duplicate claims- Advisory locks serialize certain operations
cluster_wide_caplimits total concurrency