Skip to content

Database Schema

Primary task storage table.

ColumnTypeDescription
idVARCHAR(36) PKUUID task identifier
task_nameVARCHAR(255)Registered task name
queue_nameVARCHAR(100)Queue assignment
priorityINT1-100, lower = higher priority
argsTEXTJSON-serialized positional args
kwargsTEXTJSON-serialized keyword args
statusVARCHARPENDING/CLAIMED/RUNNING/COMPLETED/FAILED/CANCELLED/EXPIRED (stored as VARCHAR)

These values correspond to TaskStatus in the API (see Task Lifecycle for terminal states).

ColumnTypeDescription
sent_atTIMESTAMPImmutable call-site timestamp (when .send() / .schedule() was called)
enqueued_atTIMESTAMPMutable dispatch timestamp (when task becomes claimable; updated on retry)
claimed_atTIMESTAMPWhen worker claimed task
started_atTIMESTAMPWhen execution started
completed_atTIMESTAMPWhen task completed
failed_atTIMESTAMPWhen task failed
resultTEXTJSON-serialized TaskResult
failed_reasonTEXTHuman-readable failure message
error_codeTEXTFinal TaskError.error_code for failed tasks (NULL for successful, non-terminal, or worker-level failures without a TaskResult)
claimedBOOLEANClaiming flag
claimed_by_worker_idVARCHAR(255)Worker identifier
good_untilTIMESTAMPTask expiry deadline
retry_countINTCurrent retry attempt
max_retriesINTMaximum retries allowed
next_retry_atTIMESTAMPNext retry time
task_optionsTEXTSerialized TaskOptions
worker_pidINTExecuting process ID
worker_hostnameVARCHAR(255)Executing machine
worker_process_nameVARCHAR(255)Process identifier
claim_expires_atTIMESTAMPClaim lease expiry deadline
enqueue_shaVARCHAR(64)SHA-256 digest for idempotent retry
created_atTIMESTAMPRow creation time
updated_atTIMESTAMPLast update time

Indexes: queue_name, status, claimed, good_until, next_retry_at, error_code (partial, WHERE NOT NULL)

Immutable per-attempt execution history. One row per finished execution attempt (success, failure, or worker crash). Written atomically in the same transaction as the task state transition.

ColumnTypeDescription
idBIGSERIAL PKAuto-increment
task_idVARCHAR(36) FKReferences horsies_tasks(id) with ON DELETE CASCADE
attemptINT1-based attempt number (retry_count + 1 at finalization time)
outcomeVARCHAR(32)COMPLETED, FAILED, or WORKER_FAILURE (CHECK constraint enforced)
will_retryBOOLEANWhether a retry was scheduled after this attempt
started_atTIMESTAMPTZWhen the attempt started running
finished_atTIMESTAMPTZWhen the attempt finished
error_codeTEXTTaskError.error_code for this attempt (NULL on success)
error_messageTEXTTaskError.message for this attempt (NULL on success)
failed_reasonTEXTWorker-level failure reason (NULL for domain errors and successes)
worker_idVARCHAR(255)Worker that executed this attempt
worker_hostnameVARCHAR(255)Machine hostname
worker_pidINTProcess ID
worker_process_nameVARCHAR(255)Process identifier
created_atTIMESTAMPTZRow creation time

Constraints: UNIQUE (task_id, attempt), CHECK (outcome IN ('COMPLETED', 'FAILED', 'WORKER_FAILURE'))

Indexes: error_code (partial, WHERE NOT NULL), finished_at DESC

Pre-execution aborts (CLAIM_LOST, OWNERSHIP_UNCONFIRMED, WORKFLOW_CHECK_FAILED, WORKFLOW_STOPPED) do not create attempt rows.

Task liveness tracking.

ColumnTypeDescription
idINT PKAuto-increment
task_idVARCHAR(36)Associated task
sender_idVARCHAR(255)Worker/process ID
roleVARCHAR(20)‘claimer’ or ‘runner’
sent_atTIMESTAMPHeartbeat time
hostnameVARCHAR(255)Machine hostname
pidINTProcess ID

Indexes: (task_id, role, sent_at DESC)

Worker monitoring snapshots (timeseries).

ColumnTypeDescription
idINT PKAuto-increment
worker_idVARCHAR(255)Worker identifier
snapshot_atTIMESTAMPSnapshot time
hostnameVARCHAR(255)Machine hostname
pidINTMain process ID
processesINTWorker concurrency level
queuesVARCHAR[]Subscribed queues
tasks_runningINTCurrent running count
tasks_claimedINTCurrent claimed count
memory_usage_mbFLOATMemory consumption
cpu_percentFLOATCPU usage
memory_percentFLOATMemory usage percentage
max_claim_batchINTMax tasks claimed per batch
max_claim_per_workerINTMax tasks claimable per worker
cluster_wide_capINTCluster-wide in-flight cap
queue_prioritiesJSONBQueue priority configuration
queue_max_concurrencyJSONBPer-queue concurrency limits
recovery_configJSONBRecovery configuration snapshot
worker_started_atTIMESTAMPWorker start time

Scheduler execution tracking.

ColumnTypeDescription
schedule_nameVARCHAR(255) PKSchedule identifier
last_run_atTIMESTAMPLast execution time
next_run_atTIMESTAMPNext scheduled time
last_task_idVARCHAR(36)Most recent task ID
run_countINTTotal executions
config_hashVARCHAR(64)Configuration hash
updated_atTIMESTAMPLast state update

Workflow instance rows.

ColumnTypeDescription
idVARCHAR(36) PKWorkflow UUID
nameVARCHAR(255)Workflow name
statusVARCHARRUNNING, COMPLETED, FAILED, PAUSED, CANCELLED
on_errorVARCHARFAIL or PAUSE
output_task_indexINTOutput node index
success_policyTEXTSerialized success policy JSON
definition_keyVARCHAR(255)Stable workflow definition key
parent_workflow_idVARCHAR(36)Parent workflow if this is a child
parent_task_indexINTParent workflow task index for child workflows
depthINTWorkflow nesting depth
root_workflow_idVARCHAR(36)Root workflow ID
sent_atTIMESTAMPWorkflow creation timestamp
started_atTIMESTAMPWhen workflow entered running state
completed_atTIMESTAMPCompletion timestamp
updated_atTIMESTAMPLast update time
created_atTIMESTAMPRow creation time

Per-node state for workflow execution.

ColumnTypeDescription
idVARCHAR(36) PKRow identifier
workflow_idVARCHAR(36) FKParent workflow
task_indexINTNode index within the workflow
node_idVARCHAR(255)Stable node identifier
task_nameVARCHAR(255)Registered task name or __sub_workflow:* marker
task_argsTEXTSerialized positional input JSON
task_kwargsTEXTSerialized keyword input JSON
queue_nameVARCHAR(100)Resolved queue
priorityINTResolved priority
dependenciesINT[]Upstream dependency indices
args_fromJSONBField-name to dependency-index mapping
workflow_ctx_fromTEXT[]Node IDs used for workflow context
allow_failed_depsBOOLEANWhether downstream runs after upstream failure
join_typeVARCHARALL, ANY, QUORUM
min_successINTQuorum threshold
task_optionsTEXTSerialized task options JSON
statusVARCHARPENDING, READY, ENQUEUED, RUNNING, COMPLETED, FAILED, SKIPPED
is_subworkflowBOOLEANWhether this node is a child workflow
sub_workflow_nameVARCHAR(255)Child workflow name if subworkflow
sub_definition_keyVARCHAR(255)Child workflow definition key if subworkflow
task_idVARCHAR(36)Linked row in horsies_tasks for task nodes
sub_workflow_idVARCHAR(36)Linked row in horsies_workflows for subworkflow nodes
started_atTIMESTAMPWhen the node was enqueued/launched
completed_atTIMESTAMPCompletion timestamp
created_atTIMESTAMPRow creation time
updated_atTIMESTAMPLast update time
CREATE FUNCTION horsies_notify_task_changes()
RETURNS trigger AS $$
BEGIN
IF TG_OP = 'INSERT' AND NEW.status = 'PENDING' THEN
PERFORM pg_notify('task_new', NEW.id);
PERFORM pg_notify('task_queue_' || NEW.queue_name, NEW.id);
ELSIF TG_OP = 'UPDATE' AND OLD.status != NEW.status THEN
IF NEW.status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXPIRED') THEN
PERFORM pg_notify('task_done', NEW.id);
END IF;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER horsies_task_notify_trigger
AFTER INSERT OR UPDATE ON horsies_tasks
FOR EACH ROW
EXECUTE FUNCTION horsies_notify_task_changes();

Schema is managed through embedded SQL migrations. High-level Horsies entry points ensure the schema automatically on first use.

let broker = PostgresBroker::connect_with(config).await?;
broker.ensure_schema_initialized().await?;

Applications already using Horsies typically do not need a separate migration step:

let broker = app.get_broker().await?;
// schema already ensured by the high-level app path
broker.health_check().await?;

The worker’s reaper loop prunes old rows automatically every hour based on RecoveryConfig retention settings:

TableConfig fieldDefaultCondition
horsies_heartbeatsheartbeat_retention_hours24hsent_at older than threshold
horsies_worker_statesworker_state_retention_hours7 dayssnapshot_at older than threshold
horsies_tasksterminal_record_retention_hours30 daysTerminal status + oldest timestamp older than threshold
horsies_task_attemptsCascade-deleted when parent task row is deleted
horsies_workflowsterminal_record_retention_hours30 daysTerminal status + oldest timestamp older than threshold
horsies_workflow_tasksterminal_record_retention_hours30 daysParent workflow is terminal and older than threshold

Terminal statuses: COMPLETED, FAILED, CANCELLED, EXPIRED. Set any retention field to None to disable cleanup for that category. See Recovery Config for configuration details.

horsies/src/broker/postgres.rs (schema creation and queries)