Skip to content

Database Schema

Primary task storage table.

ColumnTypeDescription
idVARCHAR(36) PKUUID task identifier
task_nameVARCHAR(255)Registered task name
queue_nameVARCHAR(100)Queue assignment
priorityINT1-100, lower = higher priority
argsTEXTJSON-serialized positional args
kwargsTEXTJSON-serialized keyword args
statusVARCHARPENDING/CLAIMED/RUNNING/COMPLETED/FAILED/CANCELLED/EXPIRED (stored as VARCHAR via native_enum=False)

These values correspond to TaskStatus in the API (see Task Lifecycle for terminal states). | sent_at | TIMESTAMP | Immutable call-site timestamp (when .send()/.schedule() was called) | | enqueued_at | TIMESTAMP | Mutable dispatch timestamp (when task becomes claimable; updated on retry) | | is_workflow_task | BOOLEAN | Whether the task row belongs to a workflow node; used to skip workflow-only checks on plain tasks | | claimed_at | TIMESTAMP | When worker claimed task | | started_at | TIMESTAMP | When execution started | | completed_at | TIMESTAMP | When task completed | | failed_at | TIMESTAMP | When task failed | | result | TEXT | JSON-serialized TaskResult | | failed_reason | TEXT | Human-readable failure message | | error_code | TEXT | Final TaskError.error_code for failed tasks (NULL for successful and non-terminal tasks) | | claimed | BOOLEAN | Claiming flag | | claimed_by_worker_id | VARCHAR(255) | Worker identifier | | good_until | TIMESTAMP | Task expiry deadline | | retry_count | INT | Current retry attempt | | max_retries | INT | Maximum retries allowed | | next_retry_at | TIMESTAMP | Next retry time | | task_options | TEXT | Serialized TaskOptions | | worker_pid | INT | Executing process ID | | worker_hostname | VARCHAR(255) | Executing machine | | worker_process_name | VARCHAR(255) | Process identifier | | claim_expires_at | TIMESTAMP | Claim lease expiry deadline | | finalizing_at | TIMESTAMPTZ | Child has finished user code and parent finalization is in progress | | finalizing_by_worker_id | TEXT | Worker responsible for parent finalization | | enqueue_sha | VARCHAR(64) | SHA-256 digest for idempotent retry | | created_at | TIMESTAMP | Row creation time | | updated_at | TIMESTAMP | Last update time |

Indexes: queue_name, status, claimed, good_until, next_retry_at, error_code (partial, WHERE NOT NULL)

Immutable per-attempt execution history. One row per finished execution attempt (success, failure, or worker crash). Written atomically in the same transaction as the task state transition.

Both error_code and attempt history are only guaranteed from deployment of the schema changes in version 0.1.0a26 onward. No historical backfill is performed.

ColumnTypeDescription
idBIGSERIAL PKAuto-increment
task_idVARCHAR(36) FKReferences horsies_tasks(id) with ON DELETE CASCADE
attemptINT1-based attempt number (retry_count + 1 at finalization time)
outcomeVARCHAR(32)COMPLETED, FAILED, or WORKER_FAILURE (CHECK constraint enforced)
will_retryBOOLEANWhether a retry was scheduled after this attempt
started_atTIMESTAMPTZWhen the attempt started running
finished_atTIMESTAMPTZWhen the attempt finished
error_codeTEXTTaskError.error_code for this attempt (NULL on success)
error_messageTEXTTaskError.message for this attempt (NULL on success)
failed_reasonTEXTWorker-level failure reason (NULL for domain errors and successes)
worker_idVARCHAR(255)Worker that executed this attempt
worker_hostnameVARCHAR(255)Machine hostname
worker_pidINTProcess ID
worker_process_nameVARCHAR(255)Process identifier
created_atTIMESTAMPTZRow creation time

Constraints: UNIQUE (task_id, attempt), CHECK (outcome IN ('COMPLETED', 'FAILED', 'WORKER_FAILURE'))

Indexes: error_code (partial, WHERE NOT NULL), finished_at DESC

Pre-execution aborts (CLAIM_LOST, OWNERSHIP_UNCONFIRMED, WORKFLOW_CHECK_FAILED, WORKFLOW_STOPPED) do not create attempt rows.

Task liveness tracking.

ColumnTypeDescription
idINT PKAuto-increment
task_idVARCHAR(36)Associated task
sender_idVARCHAR(255)Worker/process ID
roleVARCHAR(20)‘claimer’ or ‘runner’
sent_atTIMESTAMPHeartbeat time
hostnameVARCHAR(255)Machine hostname
pidINTProcess ID

Indexes: (task_id, role, sent_at DESC)

Worker monitoring snapshots (timeseries).

ColumnTypeDescription
idINT PKAuto-increment
worker_idVARCHAR(255)Worker identifier
snapshot_atTIMESTAMPSnapshot time
hostnameVARCHAR(255)Machine hostname
pidINTMain process ID
processesINTWorker process count
queuesVARCHAR[]Subscribed queues
tasks_runningINTCurrent running count
tasks_claimedINTCurrent claimed count
memory_usage_mbFLOATParent worker process resident memory
children_memory_mbFLOATSummed resident memory of executor child processes
cpu_percentFLOATCPU usage
memory_percentFLOATMemory usage percentage
max_claim_batchINTMax tasks claimed per batch
max_claim_per_workerINTMax tasks claimable per worker
cluster_wide_capINTCluster-wide in-flight cap
queue_prioritiesJSONBQueue priority configuration
queue_max_concurrencyJSONBPer-queue concurrency limits
recovery_configJSONBRecovery configuration snapshot
worker_started_atTIMESTAMPWorker start time

Scheduler execution tracking.

ColumnTypeDescription
schedule_nameVARCHAR(255) PKSchedule identifier
last_run_atTIMESTAMPLast execution time
next_run_atTIMESTAMPNext scheduled time
last_task_idVARCHAR(36)Most recent task ID
run_countINTTotal executions
config_hashVARCHAR(64)Configuration hash
updated_atTIMESTAMPLast state update
CREATE FUNCTION horsies_notify_task_changes()
RETURNS trigger AS $$
BEGIN
IF TG_OP = 'INSERT' AND NEW.status = 'PENDING' THEN
PERFORM pg_notify('task_new', NEW.id);
PERFORM pg_notify('task_queue_' || NEW.queue_name, NEW.id);
ELSIF TG_OP = 'UPDATE' AND OLD.status != NEW.status THEN
IF NEW.status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXPIRED') THEN
PERFORM pg_notify('task_done', NEW.id);
END IF;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER horsies_task_notify_trigger
AFTER INSERT OR UPDATE ON horsies_tasks
FOR EACH ROW
EXECUTE FUNCTION horsies_notify_task_changes();

Treat database write access as task-execution privilege: anyone who can INSERT into horsies_tasks can run any registered task with any kwargs that pass its typed signature, and can read stored results and tracebacks. There is no separate authentication layer between the tables and the workers (the same model as Celery with the JSON serializer). Task names resolve only against the in-process registry — rows cannot trigger arbitrary imports or code loading — but scope DB credentials accordingly.

Tables created automatically via SQLAlchemy’s Base.metadata.create_all():

await conn.run_sync(Base.metadata.create_all)

Protected by advisory lock to prevent race conditions.

The worker’s reaper loop prunes old rows automatically every hour based on RecoveryConfig retention settings:

TableConfig fieldDefaultCondition
horsies_heartbeatsheartbeat_retention_hours24hsent_at older than threshold
horsies_worker_statesworker_state_retention_hours7 dayssnapshot_at older than threshold
horsies_tasksterminal_record_retention_hours30 daysTerminal status + oldest timestamp older than threshold
horsies_task_attemptsCascade-deleted when parent task row is deleted
horsies_workflowsterminal_record_retention_hours30 daysTerminal status + oldest timestamp older than threshold
horsies_workflow_tasksterminal_record_retention_hours30 daysParent workflow is terminal and older than threshold

Terminal statuses: COMPLETED, FAILED, CANCELLED, EXPIRED. Set any retention field to None to disable cleanup for that category. See Recovery Config for configuration details.

horsies/core/models/task_pg.py