Skip to content

Database Schema

Primary task storage table.

ColumnTypeDescription
idVARCHAR(36) PKUUID task identifier
task_nameVARCHAR(255)Registered task name
queue_nameVARCHAR(100)Queue assignment
priorityINT1-100, lower = higher priority
argsTEXTJSON-serialized positional args
kwargsTEXTJSON-serialized keyword args
statusVARCHARPENDING/CLAIMED/RUNNING/COMPLETED/FAILED/CANCELLED/EXPIRED (stored as VARCHAR via native_enum=False)

These values correspond to TaskStatus in the API (see Task Lifecycle for terminal states). | sent_at | TIMESTAMP | Immutable call-site timestamp (when .send()/.schedule() was called) | | enqueued_at | TIMESTAMP | Mutable dispatch timestamp (when task becomes claimable; updated on retry) | | claimed_at | TIMESTAMP | When worker claimed task | | started_at | TIMESTAMP | When execution started | | completed_at | TIMESTAMP | When task completed | | failed_at | TIMESTAMP | When task failed | | result | TEXT | JSON-serialized TaskResult | | failed_reason | TEXT | Human-readable failure message | | error_code | TEXT | Final TaskError.error_code for failed tasks (NULL for successful, non-terminal, or worker-level failures without a TaskResult) | | claimed | BOOLEAN | Claiming flag | | claimed_by_worker_id | VARCHAR(255) | Worker identifier | | good_until | TIMESTAMP | Task expiry deadline | | retry_count | INT | Current retry attempt | | max_retries | INT | Maximum retries allowed | | next_retry_at | TIMESTAMP | Next retry time | | task_options | TEXT | Serialized TaskOptions | | worker_pid | INT | Executing process ID | | worker_hostname | VARCHAR(255) | Executing machine | | worker_process_name | VARCHAR(255) | Process identifier | | claim_expires_at | TIMESTAMP | Claim lease expiry deadline | | enqueue_sha | VARCHAR(64) | SHA-256 digest for idempotent retry | | created_at | TIMESTAMP | Row creation time | | updated_at | TIMESTAMP | Last update time |

Indexes: queue_name, status, claimed, good_until, next_retry_at, error_code (partial, WHERE NOT NULL)

Immutable per-attempt execution history. One row per finished execution attempt (success, failure, or worker crash). Written atomically in the same transaction as the task state transition.

Both error_code and attempt history are only guaranteed from deployment of the schema changes in version 0.1.0a26 onward. No historical backfill is performed.

ColumnTypeDescription
idBIGSERIAL PKAuto-increment
task_idVARCHAR(36) FKReferences horsies_tasks(id) with ON DELETE CASCADE
attemptINT1-based attempt number (retry_count + 1 at finalization time)
outcomeVARCHAR(32)COMPLETED, FAILED, or WORKER_FAILURE (CHECK constraint enforced)
will_retryBOOLEANWhether a retry was scheduled after this attempt
started_atTIMESTAMPTZWhen the attempt started running
finished_atTIMESTAMPTZWhen the attempt finished
error_codeTEXTTaskError.error_code for this attempt (NULL on success)
error_messageTEXTTaskError.message for this attempt (NULL on success)
failed_reasonTEXTWorker-level failure reason (NULL for domain errors and successes)
worker_idVARCHAR(255)Worker that executed this attempt
worker_hostnameVARCHAR(255)Machine hostname
worker_pidINTProcess ID
worker_process_nameVARCHAR(255)Process identifier
created_atTIMESTAMPTZRow creation time

Constraints: UNIQUE (task_id, attempt), CHECK (outcome IN ('COMPLETED', 'FAILED', 'WORKER_FAILURE'))

Indexes: error_code (partial, WHERE NOT NULL), finished_at DESC

Pre-execution aborts (CLAIM_LOST, OWNERSHIP_UNCONFIRMED, WORKFLOW_CHECK_FAILED, WORKFLOW_STOPPED) do not create attempt rows.

Task liveness tracking.

ColumnTypeDescription
idINT PKAuto-increment
task_idVARCHAR(36)Associated task
sender_idVARCHAR(255)Worker/process ID
roleVARCHAR(20)‘claimer’ or ‘runner’
sent_atTIMESTAMPHeartbeat time
hostnameVARCHAR(255)Machine hostname
pidINTProcess ID

Indexes: (task_id, role, sent_at DESC)

Worker monitoring snapshots (timeseries).

ColumnTypeDescription
idINT PKAuto-increment
worker_idVARCHAR(255)Worker identifier
snapshot_atTIMESTAMPSnapshot time
hostnameVARCHAR(255)Machine hostname
pidINTMain process ID
processesINTWorker process count
queuesVARCHAR[]Subscribed queues
tasks_runningINTCurrent running count
tasks_claimedINTCurrent claimed count
memory_usage_mbFLOATMemory consumption
cpu_percentFLOATCPU usage
memory_percentFLOATMemory usage percentage
max_claim_batchINTMax tasks claimed per batch
max_claim_per_workerINTMax tasks claimable per worker
cluster_wide_capINTCluster-wide in-flight cap
queue_prioritiesJSONBQueue priority configuration
queue_max_concurrencyJSONBPer-queue concurrency limits
recovery_configJSONBRecovery configuration snapshot
worker_started_atTIMESTAMPWorker start time

Scheduler execution tracking.

ColumnTypeDescription
schedule_nameVARCHAR(255) PKSchedule identifier
last_run_atTIMESTAMPLast execution time
next_run_atTIMESTAMPNext scheduled time
last_task_idVARCHAR(36)Most recent task ID
run_countINTTotal executions
config_hashVARCHAR(64)Configuration hash
updated_atTIMESTAMPLast state update
CREATE FUNCTION horsies_notify_task_changes()
RETURNS trigger AS $$
BEGIN
IF TG_OP = 'INSERT' AND NEW.status = 'PENDING' THEN
PERFORM pg_notify('task_new', NEW.id);
PERFORM pg_notify('task_queue_' || NEW.queue_name, NEW.id);
ELSIF TG_OP = 'UPDATE' AND OLD.status != NEW.status THEN
IF NEW.status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXPIRED') THEN
PERFORM pg_notify('task_done', NEW.id);
END IF;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER horsies_task_notify_trigger
AFTER INSERT OR UPDATE ON horsies_tasks
FOR EACH ROW
EXECUTE FUNCTION horsies_notify_task_changes();

Tables created automatically via SQLAlchemy’s Base.metadata.create_all():

await conn.run_sync(Base.metadata.create_all)

Protected by advisory lock to prevent race conditions.

The worker’s reaper loop prunes old rows automatically every hour based on RecoveryConfig retention settings:

TableConfig fieldDefaultCondition
horsies_heartbeatsheartbeat_retention_hours24hsent_at older than threshold
horsies_worker_statesworker_state_retention_hours7 dayssnapshot_at older than threshold
horsies_tasksterminal_record_retention_hours30 daysTerminal status + oldest timestamp older than threshold
horsies_task_attemptsCascade-deleted when parent task row is deleted
horsies_workflowsterminal_record_retention_hours30 daysTerminal status + oldest timestamp older than threshold
horsies_workflow_tasksterminal_record_retention_hours30 daysParent workflow is terminal and older than threshold

Terminal statuses: COMPLETED, FAILED, CANCELLED, EXPIRED. Set any retention field to None to disable cleanup for that category. See Recovery Config for configuration details.

horsies/core/models/task_pg.py