Skip to content

Worker Architecture

┌─────────────────────────────────────────────────────────┐
│ Worker Process (Main) │
│ - Subscribes to NOTIFY channels │
│ - Claims tasks from PostgreSQL │
│ - Dispatches to process pool │
│ - Writes results back │
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ ProcessPoolExecutor │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Child 1 │ │ Child 2 │ │ Child N │ ... │ │
│ │ │ Task A │ │ Task B │ │ (idle) │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘

Workers use Python’s ProcessPoolExecutor:

  • Main process: Coordinates claiming, dispatching, result handling
  • Child processes: Execute task code in isolation
  • N workers: Configurable via --processes flag

Benefits:

  • GIL doesn’t block concurrent task execution
  • Crashed tasks don’t kill the main worker
  • Memory isolation between tasks
Terminal window
# In CLI
horsies worker myapp.instance:app --processes=8
  1. Load app from module
  2. Import task modules (for registry population)
  3. Subscribe to NOTIFY channels
  4. Create ProcessPoolExecutor with N workers
  5. Initialize child processes (each loads app independently)
while not stopped:
await claim_and_dispatch_all()
await wait_for_any_notify()
  1. Claim pass: Query for available tasks, claim them
  2. Dispatch: Submit claimed tasks to process pool
  3. Wait: Block on NOTIFY until new tasks arrive
  4. Repeat
WITH next AS (
SELECT id FROM tasks
WHERE queue_name = :queue
AND status = 'PENDING'
AND enqueued_at <= now()
AND (good_until IS NULL OR good_until > now())
ORDER BY priority ASC, enqueued_at ASC
FOR UPDATE SKIP LOCKED
LIMIT :limit
)
UPDATE tasks SET status='CLAIMED', claimed_at=now()
FROM next WHERE tasks.id = next.id
RETURNING tasks.id

Key features:

  • FOR UPDATE SKIP LOCKED: Non-blocking concurrent claiming
  • Priority ordering: Lower priority number = higher priority
  • FIFO within same priority
  • Expiry check: Skip tasks past good_until

In child process:

  1. Resolve task from registry by name
  2. Deserialize arguments
  3. Start heartbeat thread
  4. Call task function
  5. Serialize result
  6. Stop heartbeat thread

Main process receives result from child:

  1. Parse TaskResult (ok or err)
  2. Check for retry eligibility
  3. Update task status (COMPLETED or FAILED)
  4. Send NOTIFY for result waiters

Each child process:

  1. Loads app via _child_initializer()
  2. Imports task modules
  3. Populates local task registry
  4. Suppresses sends during import (prevents side effects)
def _child_initializer(app_locator, imports):
app = _locate_app(app_locator)
set_current_app(app)
app.suppress_sends(True)
for module in imports:
import_module(module)
app.suppress_sends(False)

From WorkerConfig:

FieldDescription
dsnSQLAlchemy database URL
session_dsnSession-capable URL used for listener-capable operations
pgbouncer_transaction_modeDisables child-process prepared statements for PgBouncer transaction pooling
queuesQueue names to process
processesChild process count
parent_pool_sizeWorker coordinator SQLAlchemy pool size
parent_max_overflowWorker coordinator overflow connections
child_pool_min_sizeMinimum per-child psycopg pool size
child_pool_max_sizeMaximum per-child psycopg pool size
max_claim_batchMax claims per queue per pass; 0 means auto-fill available capacity
max_claim_per_workerTotal claim limit
app_locatorPath to app instance
importsTask module paths

Workers require PostgreSQL LISTEN for dispatch. When using transaction-pooled PgBouncer, configure PostgresConfig.session_database_url with a direct or session-capable URL. Workers fail startup instead of silently switching to poll-only dispatch if LISTEN is unavailable.

Child database pools are initialized inside the child process, after the process pool starts. Psycopg connections are not process-safe and must not be opened in the parent and shared across forked children. Workers warm child processes before opening parent listener/coordinator sockets; child DB connections remain lazy by default when worker_child_pool_min_size=0.

If a process pool must be replaced after startup, the worker uses a non-inheriting process start method for the replacement so live parent database sockets are not forked into new children.

On SIGTERM/SIGINT:

  1. Stop accepting new claims
  2. Wait for running tasks to complete
  3. Close process pool
  4. Close database connections
async def stop(self):
self._stop.set()
await self.listener.close()
self._executor.shutdown(wait=True)

Workers recover from transient infrastructure failures without operator intervention. Configure via WorkerResilienceConfig in AppConfig.

  • Database outages: Worker retries on connection loss during startup, claiming, and result writes. No crash, no manual restart.
  • Silent NOTIFY channels: If the PostgreSQL NOTIFY mechanism stops delivering (broken listener connection, dropped trigger), the worker falls back to periodic polling so tasks are still picked up.
  • Broken process pool: If child processes crash, the process pool is recreated. Tasks still in CLAIMED status are requeued back to PENDING. Tasks already in RUNNING status are recovered through the same retry policy used by the reaper: if WORKER_CRASHED is listed in auto_retry_for and retries remain, the task is scheduled for retry; otherwise it is marked FAILED with a WORKER_CRASHED result.
from horsies import WorkerResilienceConfig
config = AppConfig(
broker=broker,
resilience=WorkerResilienceConfig(
db_retry_initial_ms=500, # initial backoff on DB errors
db_retry_max_ms=30_000, # backoff cap
db_retry_max_attempts=0, # 0 = retry forever
notify_poll_interval_ms=5_000, # poll fallback when NOTIFY is silent
),
)
FieldTypeDefaultDescription
db_retry_initial_msint500Initial backoff delay (100–60,000ms)
db_retry_max_msint30000Maximum backoff delay (500–300,000ms)
db_retry_max_attemptsint0Max retry attempts; 0 = infinite (0–10,000)
notify_poll_interval_msint5000Poll interval when NOTIFY is silent (1,000–300,000ms)

Backoff uses exponential growth (initial_ms * 2^attempt) with ±25% jitter, capped at db_retry_max_ms. Backoff resets after each successful loop iteration.

Workers need to know where tasks are defined:

# In instance.py
# Dotted module paths (recommended)
app.discover_tasks(["myapp.tasks", "myapp.jobs.worker_tasks"])
# Or file paths
app.discover_tasks(["tasks.py", "more_tasks.py"])

This list is passed to child processes for import.

Error TypeHandling
Task exceptionWrapped in TaskResult(err=…)
Serialization errorWORKER_SERIALIZATION_ERROR
Task not foundWORKER_RESOLUTION_ERROR
Child crashDetected via heartbeat, marked FAILED

Run multiple worker processes for horizontal scaling:

Terminal window
# On machine 1
horsies worker myapp.instance:app --processes=8
# On machine 2
horsies worker myapp.instance:app --processes=8
  • Each worker claims independently
  • SKIP LOCKED prevents duplicate claims
  • Advisory locks serialize certain operations
  • cluster_wide_cap limits total concurrency