Worker Architecture
Overview
Section titled “Overview”┌─────────────────────────────────────────────────────────┐│ Worker Process (Main) ││ - Subscribes to NOTIFY channels ││ - Claims tasks from PostgreSQL ││ - Dispatches to process pool ││ - Writes results back ││ ││ ┌─────────────────────────────────────────────────┐ ││ │ ProcessPoolExecutor │ ││ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ ││ │ │ Child 1 │ │ Child 2 │ │ Child N │ ... │ ││ │ │ Task A │ │ Task B │ │ (idle) │ │ ││ │ └─────────┘ └─────────┘ └─────────┘ │ ││ └─────────────────────────────────────────────────┘ │└─────────────────────────────────────────────────────────┘Process Model
Section titled “Process Model”Workers use Python’s ProcessPoolExecutor:
- Main process: Coordinates claiming, dispatching, result handling
- Child processes: Execute task code in isolation
- N workers: Configurable via
--processesflag
Benefits:
- GIL doesn’t block concurrent task execution
- Crashed tasks don’t kill the main worker
- Memory isolation between tasks
Execution Flow
Section titled “Execution Flow”1. Startup
Section titled “1. Startup”# In CLIhorsies worker myapp.instance:app --processes=8- Load app from module
- Import task modules (for registry population)
- Subscribe to NOTIFY channels
- Create ProcessPoolExecutor with N workers
- Initialize child processes (each loads app independently)
2. Main Loop
Section titled “2. Main Loop”while not stopped: await claim_and_dispatch_all() await wait_for_any_notify()- Claim pass: Query for available tasks, claim them
- Dispatch: Submit claimed tasks to process pool
- Wait: Block on NOTIFY until new tasks arrive
- Repeat
3. Claiming
Section titled “3. Claiming”WITH next AS ( SELECT id FROM tasks WHERE queue_name = :queue AND status = 'PENDING' AND enqueued_at <= now() AND (good_until IS NULL OR good_until > now()) ORDER BY priority ASC, enqueued_at ASC FOR UPDATE SKIP LOCKED LIMIT :limit)UPDATE tasks SET status='CLAIMED', claimed_at=now()FROM next WHERE tasks.id = next.idRETURNING tasks.idKey features:
FOR UPDATE SKIP LOCKED: Non-blocking concurrent claiming- Priority ordering: Lower priority number = higher priority
- FIFO within same priority
- Expiry check: Skip tasks past
good_until
4. Task Execution
Section titled “4. Task Execution”In child process:
- Resolve task from registry by name
- Deserialize arguments
- Start heartbeat thread
- Call task function
- Serialize result
- Stop heartbeat thread
5. Result Handling
Section titled “5. Result Handling”Main process receives result from child:
- Parse TaskResult (ok or err)
- Check for retry eligibility
- Update task status (COMPLETED or FAILED)
- Send NOTIFY for result waiters
Child Process Initialization
Section titled “Child Process Initialization”Each child process:
- Loads app via
_child_initializer() - Imports task modules
- Populates local task registry
- Suppresses sends during import (prevents side effects)
def _child_initializer(app_locator, imports): app = _locate_app(app_locator) set_current_app(app) app.suppress_sends(True) for module in imports: import_module(module) app.suppress_sends(False)Worker Configuration
Section titled “Worker Configuration”From WorkerConfig:
| Field | Description |
|---|---|
dsn | SQLAlchemy database URL |
session_dsn | Session-capable URL used for listener-capable operations |
pgbouncer_transaction_mode | Disables child-process prepared statements for PgBouncer transaction pooling |
queues | Queue names to process |
processes | Child process count |
parent_pool_size | Worker coordinator SQLAlchemy pool size |
parent_max_overflow | Worker coordinator overflow connections |
child_pool_min_size | Minimum per-child psycopg pool size |
child_pool_max_size | Maximum per-child psycopg pool size |
max_claim_batch | Max claims per queue per pass; 0 means auto-fill available capacity |
max_claim_per_worker | Total claim limit |
app_locator | Path to app instance |
imports | Task module paths |
Workers require PostgreSQL LISTEN for dispatch. When using transaction-pooled
PgBouncer, configure PostgresConfig.session_database_url with a direct or
session-capable URL. Workers fail startup instead of silently switching to
poll-only dispatch if LISTEN is unavailable.
Child database pools are initialized inside the child process, after the process
pool starts. Psycopg connections are not process-safe and must not be opened in
the parent and shared across forked children. Workers warm child processes
before opening parent listener/coordinator sockets; child DB connections remain
lazy by default when worker_child_pool_min_size=0.
If a process pool must be replaced after startup, the worker uses a non-inheriting process start method for the replacement so live parent database sockets are not forked into new children.
Graceful Shutdown
Section titled “Graceful Shutdown”On SIGTERM/SIGINT:
- Stop accepting new claims
- Wait for running tasks to complete
- Close process pool
- Close database connections
async def stop(self): self._stop.set() await self.listener.close() self._executor.shutdown(wait=True)Resilience
Section titled “Resilience”Workers recover from transient infrastructure failures without operator intervention. Configure via WorkerResilienceConfig in AppConfig.
What It Handles
Section titled “What It Handles”- Database outages: Worker retries on connection loss during startup, claiming, and result writes. No crash, no manual restart.
- Silent NOTIFY channels: If the PostgreSQL NOTIFY mechanism stops delivering (broken listener connection, dropped trigger), the worker falls back to periodic polling so tasks are still picked up.
- Broken process pool: If child processes crash, the process pool is recreated. Tasks still in CLAIMED status are requeued back to PENDING. Tasks already in RUNNING status are recovered through the same retry policy used by the reaper: if
WORKER_CRASHEDis listed inauto_retry_forand retries remain, the task is scheduled for retry; otherwise it is marked FAILED with aWORKER_CRASHEDresult.
Configuration
Section titled “Configuration”from horsies import WorkerResilienceConfig
config = AppConfig( broker=broker, resilience=WorkerResilienceConfig( db_retry_initial_ms=500, # initial backoff on DB errors db_retry_max_ms=30_000, # backoff cap db_retry_max_attempts=0, # 0 = retry forever notify_poll_interval_ms=5_000, # poll fallback when NOTIFY is silent ),)| Field | Type | Default | Description |
|---|---|---|---|
db_retry_initial_ms | int | 500 | Initial backoff delay (100–60,000ms) |
db_retry_max_ms | int | 30000 | Maximum backoff delay (500–300,000ms) |
db_retry_max_attempts | int | 0 | Max retry attempts; 0 = infinite (0–10,000) |
notify_poll_interval_ms | int | 5000 | Poll interval when NOTIFY is silent (1,000–300,000ms) |
Backoff uses exponential growth (initial_ms * 2^attempt) with ±25% jitter, capped at db_retry_max_ms. Backoff resets after each successful loop iteration.
Module Discovery
Section titled “Module Discovery”Workers need to know where tasks are defined:
# In instance.py# Dotted module paths (recommended)app.discover_tasks(["myapp.tasks", "myapp.jobs.worker_tasks"])
# Or file pathsapp.discover_tasks(["tasks.py", "more_tasks.py"])This list is passed to child processes for import.
Error Handling
Section titled “Error Handling”| Error Type | Handling |
|---|---|
| Task exception | Wrapped in TaskResult(err=…) |
| Serialization error | WORKER_SERIALIZATION_ERROR |
| Task not found | WORKER_RESOLUTION_ERROR |
| Child crash | Detected via heartbeat, marked FAILED |
Multiple Workers
Section titled “Multiple Workers”Run multiple worker processes for horizontal scaling:
# On machine 1horsies worker myapp.instance:app --processes=8
# On machine 2horsies worker myapp.instance:app --processes=8- Each worker claims independently
SKIP LOCKEDprevents duplicate claims- Advisory locks serialize certain operations
cluster_wide_caplimits total concurrency