Architecture
Architecture
Section titled “Architecture”Horsies is a PostgreSQL-backed distributed task queue and workflow engine with real-time dispatch via LISTEN/NOTIFY. Producers enqueue tasks, workers claim and execute them using tokio, and results flow back through the database.
Overview
Section titled “Overview”The system consists of four main components:
- Producer: Application code that sends tasks via
.send() - PostgreSQL Broker: Stores tasks, results, and coordinates via LISTEN/NOTIFY
- Worker: Claims and executes tasks in a tokio runtime
- Scheduler: (Optional) Enqueues tasks on a schedule
Component Relationships
Section titled “Component Relationships”Producer PostgreSQL Worker(s) │ │ │ │ .send() │ │ ├────────────────────────>│ INSERT task │ │ │ │ │ │ NOTIFY task_new ───────────>│ │ │ │ │ │<──────── CLAIM task ────────┤ │ │ │ │ │ │ Execute │ │ │ │ │<──────── UPDATE result ─────┤ │ │ │ │ .get() │ NOTIFY task_done │ │<────────────────────────│ │Design Decisions
Section titled “Design Decisions”PostgreSQL as the Only Backend
Section titled “PostgreSQL as the Only Backend”Unlike systems that require a separate message broker (RabbitMQ, Redis, NATS), horsies uses PostgreSQL for everything:
- Task storage: The
horsies_taskstable holds all task data - Message passing: LISTEN/NOTIFY replaces a separate message broker
- Coordination: Advisory locks prevent race conditions
- State tracking: Heartbeats and worker states are database tables
Tokio-Based Worker
Section titled “Tokio-Based Worker”Workers use a single-process, tokio-based async runtime:
- Async tasks run via
tokio::spawn - Blocking tasks (CPU-bound) run via
tokio::task::spawn_blockingwithcatch_unwindfor panic safety - Per-worker concurrency gated by a
tokio::sync::Semaphore; cluster-wide and queue level concurrency enforced at claim time via the database - Panics are isolated per-task — a panicking task produces a
TaskErrorwithout taking down the worker
Structured Result Contract
Section titled “Structured Result Contract”All tasks return Result<T, TaskError>, mandating consistent handling of tasks and their errors across the codebase.
- Built-in errors:
TaskErrorCode::BuiltIncovers infrastructure failures —RetrievalCode::WaitTimeout,OperationalErrorCode::BrokerError, etc. These are produced by the library itself. - User-defined errors:
TaskError::new("YOUR_CODE", "message")for domain-specific failures. Your error codes are preserved through serialization and available for programmatic matching on the consumer side.
Real-Time via LISTEN/NOTIFY
Section titled “Real-Time via LISTEN/NOTIFY”Workers primarily receive tasks via NOTIFY, not polling:
- PostgreSQL triggers send NOTIFY on task INSERT
- Workers subscribe via
PgListenerand wake immediately - Result waiters listen on the
task_donechannel - A configurable fallback poll interval (
notify_poll_interval_ms, default 5s) ensures progress if NOTIFY is temporarily unavailable. Runtime recovery comes from a combination of sqlx reconnect behavior and Horsies worker retry/poll fallback logic
User-Facing Components
Section titled “User-Facing Components”| Component | Description |
|---|---|
Horsies | Application instance — configure and register tasks here |
#[task] / #[blocking_task] | Proc macros to define tasks |
TaskFunction<A, T> | Typed handle for sending tasks |
TaskHandle<T> | Returned by .send() — use to retrieve results |
TaskResult<T> | Return type wrapper — contains success or error |
Worker | Started via app.run_worker() |
Scheduler | Started via app.run_scheduler() |
Data Flow
Section titled “Data Flow”Task Submission
Section titled “Task Submission”- Producer calls
task_name::send(args).await - Queue and arguments are validated
- Task row inserted into
horsies_taskstable with statusPENDING - PostgreSQL trigger fires NOTIFY to wake workers
TaskHandle<T>returned to producer
Task Execution
Section titled “Task Execution”- Worker receives NOTIFY and wakes
- Worker claims task atomically via
FOR UPDATE SKIP LOCKED - Task dispatched via
tokio::spawn(async) orspawn_blocking(blocking) - Task function executes
- Result stored in database, status updated to
COMPLETEDorFAILED
Result Retrieval
Section titled “Result Retrieval”- Producer calls
handle.get(timeout).await - If result cached on handle, return immediately
- Otherwise, listen for
task_donenotification or poll database - Return
TaskResult<T>with success value or error
Concurrency Model
Section titled “Concurrency Model”- Cluster level: Optional
cluster_wide_caplimits total in-flight work across the cluster - Worker level:
concurrencysetting limits concurrent executions per worker - Queue level:
max_concurrencyper custom queue (Custom mode only) - Claiming:
max_claim_batchprevents one worker from starving others
See Concurrency for details.