Architecture
Architecture
Section titled “Architecture”Horsies is a PostgreSQL-backed distributed task queue and workflow engine with real-time dispatch via LISTEN/NOTIFY. Producers enqueue tasks, workers claim and execute them in isolated processes, and results flow back through the database.
Async APIs (send_async, get_async, start_async, etc.) are producer-side I/O helpers; they do not change where tasks execute.
Overview
Section titled “Overview”The system consists of four main components:
- Producer: Application code that sends tasks via
.send()or.send_async() - PostgreSQL Broker: Stores tasks, results, and coordinates via LISTEN/NOTIFY
- Worker: Claims and executes tasks in a process pool
- Scheduler: (Optional) Enqueues tasks on a schedule
Component Relationships
Section titled “Component Relationships”Producer PostgreSQL Worker(s) │ │ │ │ .send() / .send_async()│ │ ├────────────────────────>│ INSERT task │ │ │ │ │ │ NOTIFY task_new ───────────>│ │ │ │ │ │<──────── CLAIM task ────────┤ │ │ │ │ │ │ Execute │ │ │ │ │<──────── UPDATE result ─────┤ │ │ │ │ .get() / .get_async() │ NOTIFY task_done │ │<────────────────────────│ │Design Decisions
Section titled “Design Decisions”PostgreSQL as the Only Backend
Section titled “PostgreSQL as the Only Backend”Unlike Celery (which typically uses Redis or RabbitMQ), horsies uses PostgreSQL for everything:
- Task storage: The
taskstable holds all task data - Message passing: LISTEN/NOTIFY replaces a separate message broker
- Coordination: Advisory locks prevent race conditions
- State tracking: Heartbeats and worker states are database tables
Process Pool Execution
Section titled “Process Pool Execution”Workers use Python’s ProcessPoolExecutor rather than threads:
- Each task runs in an isolated child process
- GIL contention is avoided for CPU-bound tasks
- Crashed tasks don’t take down the worker
- Child processes send their own heartbeats
Type-Safe Results
Section titled “Type-Safe Results”All tasks must return TaskResult[T, TaskError]:
- Forces explicit error handling at definition time
- Distinguishes domain errors (returned by task) from infrastructure errors (thrown by library)
- Enables typed result retrieval
Real-Time via LISTEN/NOTIFY
Section titled “Real-Time via LISTEN/NOTIFY”Workers don’t poll for tasks:
- PostgreSQL triggers send NOTIFY on task INSERT
- Workers subscribe and wake immediately
- Result waiters listen on
task_donechannel - Fallback polling handles edge cases (lost notifications)
User-Facing Components
Section titled “User-Facing Components”| Component | Description |
|---|---|
Horsies | Application instance - configure and register tasks here |
@app.task | Decorator to define tasks |
TaskHandle | Returned by .send() - use to retrieve results |
TaskResult | Return type of all tasks - contains success or error |
Worker (CLI) | Started via horsies worker command |
Scheduler (CLI) | Started via horsies scheduler command |
Data Flow
Section titled “Data Flow”Task Submission
Section titled “Task Submission”- Producer calls
task.send(args) - Queue and arguments are validated
- Task row inserted into
taskstable with statusPENDING - PostgreSQL trigger fires NOTIFY to wake workers
TaskHandlereturned to producer
Task Execution
Section titled “Task Execution”- Worker receives NOTIFY and wakes
- Worker claims task atomically
- Task dispatched to child process in the pool
- Child process executes task function
- Result stored in database, status updated to
COMPLETEDorFAILED
Result Retrieval
Section titled “Result Retrieval”- Producer calls
handle.get() - If result cached on handle, return immediately
- Otherwise, listen for
task_donenotification or poll database - Return
TaskResultwith success value or error
Concurrency Model
Section titled “Concurrency Model”- Cluster level: Optional
cluster_wide_caplimits total RUNNING tasks - Worker level:
processessetting limits concurrent executions per worker - Queue level:
max_concurrencyper custom queue (CUSTOM mode only) - Claiming:
max_claim_batchprevents one worker from starving others
See Concurrency for details.