Hi agent! If you need to work with horsies-rust, consider using the docs locally. # Horsies (Rust) > PostgreSQL-backed background task queue and workflow engine for Rust. > Docs: https://suleymanozkeskin.github.io/horsies-rust > Source: https://github.com/suleymanozkeskin/horsies-rust > Crates: horsies, horsies-macros ## Agent Skills (Repository Guidance) - In source checkouts, skills live under: `horsies/.agents/skills/` - `SKILL.md` -> quick orientation and routing - `tasks.md` -> task APIs, send/retry flows, serialization, error handling - `workflows.md` -> DAG construction, WorkflowHandle semantics, HRS-001-HRS-031 validation - `configs.md` -> AppConfig, queues, recovery/resilience, scheduling, CLI/runtime settings ## Quick Reference ### Horsies (app instance) - app = Horsies::new(AppConfig { ... })? - #[task("name")] async fn(args) -> Result - #[task("name", queue = "q")] async fn(args) -> Result - #[blocking_task("name")] fn(args) -> Result - #[task("name", auto_retry_for = ["CODE"], retry_policy = RetryPolicy::fixed(...))] - task_name::register(&mut app)? -> TaskFunction - task_name::send(args).await -> TaskSendResult> - task_name::schedule(delay, args).await -> TaskSendResult> - task_name::with_options(TaskSendOptions::new().good_until(deadline)).send(args).await - app.discover(vec![|app| task::register(app).map(|_| ())])? - app.check()? (static validation) - app.check_live().await? (static validation + broker connect + schema ensure + DB ping) - app.run_worker().await? - app.run_worker_with(WorkerConfig { ... }).await? - app.run_scheduler().await? - app.get_broker().await? -> Arc ### AppConfig - queue_mode: QueueMode (Default or Custom) - custom_queues: Option> (required for Custom) - broker: PostgresConfig - cluster_wide_cap: Option (max RUNNING tasks across cluster) - prefetch_buffer: u32 (0 = hard cap, >0 = soft cap with lease) - claim_lease_ms: Option (required when prefetch_buffer > 0) - max_claim_renew_age_ms: u32 (default 180_000) - recovery: RecoveryConfig - resilience: WorkerResilienceConfig - schedule: Option - resend_on_transient_err: bool (default false) ### PostgresConfig - database_url: String (must start with postgresql:// or postgres://) - pool_pre_ping: bool (default true) - pool_size: u32 (default 30) - max_overflow: u32 (default 30) - pool_timeout: u32 (default 30) - pool_recycle: u32 (default 1800) - echo: bool (default false) - PostgresConfig::from_url(url) convenience constructor ### QueueMode & CustomQueueConfig - QueueMode::Default -> single "default" queue, FIFO - QueueMode::Custom -> named queues with priority + max_concurrency - CustomQueueConfig { name, priority (1-100), max_concurrency } ### TaskResult (wire format) - TaskResult::Ok(T) -> success - TaskResult::Err(TaskError) -> failure - .is_ok(), .is_err(), .unwrap(), .unwrap_err(), .ok(), .map(), .and_then() - .is_transient() -> true for retrieval/broker errors - JSON: {"__type": "ok"|"err", "value": ...} ### TaskError - error_code: Option - message: Option - data: Option - cause: Option (wire key: "exception") - TaskError::new(code, message) -> user-defined error - TaskError::builtin(code, message) -> library error ### TaskErrorCode - TaskErrorCode::BuiltIn(BuiltInTaskCode) -> library codes - TaskErrorCode::User(String) -> user-defined codes ### BuiltInTaskCode (4 families) - OperationalErrorCode: UnhandledError, TaskError, WorkerCrashed, BrokerError, WorkerResolutionError, WorkerSerializationError, ResultDeserializationError, WorkflowEnqueueFailed, SubworkflowLoadFailed - ContractCode: ArgumentTypeMismatch, ReturnTypeMismatch, WorkflowCtxMissingId - RetrievalCode: WaitTimeout, TaskNotFound, WorkflowNotFound, ResultNotAvailable, ResultNotReady - OutcomeCode: TaskCancelled, TaskExpired, WorkflowPaused, WorkflowFailed, WorkflowCancelled, UpstreamSkipped, SubworkflowFailed, WorkflowSuccessCaseNotMet, WorkflowStopped, SendSuppressed ### TaskSendResult = Result - TaskSendError { code, message, retryable, task_id, payload } - TaskSendErrorCode: SendSuppressed, ValidationFailed, EnqueueFailed, PayloadMismatch ### TaskFunction - .send(args).await -> TaskSendResult> - .schedule(delay, args).await -> TaskSendResult> - .with_options(TaskSendOptions::new().good_until(deadline)).send(args).await - .retry_send(&err).await -> TaskSendResult> - .retry_schedule(&err).await -> TaskSendResult> - .task_name(), .queue_name(), .priority() - .node() -> TaskNode (for workflow building) ### TaskHandle - .task_id() -> &str - .get(timeout: Option).await -> TaskResult - .info(include_result, include_failed_reason, include_attempts).await -> BrokerResult> ### RetryPolicy - RetryPolicy::fixed(intervals: Vec, jitter: bool)? -> fixed backoff - RetryPolicy::exponential(base_seconds: u32, max_retries: u32, jitter: bool)? -> exponential backoff - max_retries: u32 (1-20), intervals: Vec (seconds, 1-86400 each) - BackoffStrategy::Fixed | BackoffStrategy::Exponential ### TaskRuntime - rt.state::() -> Result, TaskError> (access shared state) - rt.start::(spec).await -> WorkflowStartResult> ### WorkflowDefinition trait - type Output; type Params; - fn name() -> &'static str - fn definition_key() -> &'static str - fn on_error() -> OnError { OnError::Fail } - fn define(builder: &mut WorkflowSpecBuilder) -> Result ### WorkflowSpecBuilder - WorkflowSpecBuilder::new(name) - .task(TaskNode) -> TypedNodeRef - .sub_workflow(SubWorkflowNode) -> TypedNodeRef - .definition_key(key), .on_error(policy), .output(node_ref), .success_policy(policy) - .build() -> Result ### TaskNode (workflow node builder) - task_name::node()? -> TaskNode - .node_id(id), .args_json(json), .kwargs_json(json) - .waits_for(dep), .waits_for_all(deps) // accepts typed refs via Into - .set_input(value)?, .set(task_name::params::field(), value)? - .arg_from(task_name::params::field(), dep) - .join_any(), .join_quorum(min_success) - .allow_failed_deps(), .queue(q), .priority(p), .good_until(dt) ### SubWorkflowNode (workflow node builder) - SubWorkflowNode::new(sub_workflow_name) -> Self - SubWorkflowNode::from_definition::() -> SubWorkflowNode - .node_id(id), .waits_for(dep), .waits_for_all(deps) - .args_json(json), .kwargs_json(json), .set_input(value)?, .set(field, value)?, .queue(q), .priority(p) ### WorkflowFunction (registered workflow handle) - .start().await -> WorkflowStartResult> - .start_with_id(id).await - .retry_start(&err).await ### WorkflowTemplate (parameterized workflow) - .start(params).await -> WorkflowStartResult> - .start_with_id(params, id).await ### WorkflowHandle - .workflow_id() -> &str - .get(timeout).await -> HandleResult> - .status().await -> HandleResult - .results().await -> HandleResult>> ### Global workflow dispatch - horsies::start_workflow::().await (zero params) - horsies::start_workflow_with::(params).await ### Workflow lifecycle - cancel_workflow(pool, workflow_id).await - pause_workflow(pool, workflow_id).await - resume_workflow(pool, workflow_id, registry).await - recover_stuck_workflows(pool, registry, ...).await ### WorkflowStatus - Pending, Running, Completed, Failed, Cancelled, Paused - WORKFLOW_TERMINAL_STATES: [Completed, Failed, Cancelled] ### WorkflowTaskStatus - Pending, Ready, Enqueued, Running, Completed, Failed, Skipped, Cancelled ### TaskStatus - Pending, Claimed, Running, Completed, Failed, Cancelled, Expired - TASK_TERMINAL_STATES: [Completed, Failed, Cancelled, Expired] ### ScheduleConfig - enabled: bool (default true) - schedules: Vec - check_interval_seconds: u32 (default 1, range 1-60) ### TaskSchedule - name, task_name, pattern: SchedulePattern, args, kwargs: serde_json::Value - queue_name: Option, enabled: bool, timezone: String, catch_up_missed: bool ### SchedulePattern (enum) - Interval(IntervalSchedule) { seconds, minutes, hours, days: Option } - Hourly(HourlySchedule) { minute, second: u32 } - Daily(DailySchedule) { time: NaiveTime } - Weekly(WeeklySchedule) { days: Vec, time: NaiveTime } - Monthly(MonthlySchedule) { day: u32, time: NaiveTime } ### RecoveryConfig (defaults) - auto_requeue_stale_claimed: true, claimed_stale_threshold_ms: 120_000 - auto_fail_stale_running: true, running_stale_threshold_ms: 300_000 - check_interval_ms: 30_000 - runner_heartbeat_interval_ms: 30_000, claimer_heartbeat_interval_ms: 30_000 - heartbeat_retention_hours: Some(24), worker_state_retention_hours: Some(168) - terminal_record_retention_hours: Some(720) ### WorkerResilienceConfig (defaults) - db_retry_initial_ms: 500 (range 100-60_000) - db_retry_max_ms: 30_000 (range 500-300_000) - db_retry_max_attempts: 0 (infinite, range 0-10_000) - notify_poll_interval_ms: 5_000 (range 1_000-300_000) ### WorkerConfig - queues: Vec (default ["default"]) - concurrency: u32 (default available_parallelism) - max_claim_batch: u32 (default 2), max_claim_per_worker: u32 (default 0 = auto) - queue_priorities, queue_max_concurrency: HashMap - coalesce_notifies: u32 (default 100) - loglevel: LogLevel (Debug, Info, Warning, Error) ### PostgresBroker - PostgresBroker::connect_with(config).await? - PostgresBroker::from_pool(pool) - .migrate().await, .ensure_schema_initialized().await, .health_check().await - .enqueue(...).await -> BrokerResult - .send_task::(resolved, args_json, kwargs_json, task_options).await - .get_result::(task_id, timeout).await -> BrokerResult> - .get_task_info(task_id, include_result, include_failed_reason).await - .get_task_attempts(task_id).await -> BrokerResult> - Type alias: Broker = PostgresBroker ### HRS Error Codes (startup errors) - HRS-001-099: Workflow validation (WorkflowNoName, WorkflowNoNodes, WorkflowCycleDetected, etc.) - HRS-100-199: Task definition (TaskInvalidOptions) - HRS-200-299: Configuration (BrokerInvalidUrl) - HRS-300-399: Registry (TaskNotRegistered, TaskDuplicateName) ### Database Tables - horsies_tasks (28 columns), horsies_task_attempts, horsies_heartbeats - horsies_worker_states, horsies_schedule_state - NOTIFY trigger on task INSERT ### Utilities - mask_database_url(url) -> String (replaces password with ***) - init_tracing(level: LogLevel) (respects RUST_LOG env var) ## Sitemap - /horsies-rust/ (home) - /horsies-rust/quick-start/getting-started/ - /horsies-rust/quick-start/01-configuring-horsies/ - /horsies-rust/quick-start/02-producing-tasks/ - /horsies-rust/quick-start/03-defining-workflows/ - /horsies-rust/quick-start/04-scheduling/ - /horsies-rust/quick-start/05-workflow-patterns/ - /horsies-rust/questions-and-answers/ - /horsies-rust/concepts/architecture/ - /horsies-rust/concepts/task-lifecycle/ - /horsies-rust/concepts/result-handling/ - /horsies-rust/concepts/queue-modes/ - /horsies-rust/concepts/workflows/workflow-semantics/ - /horsies-rust/concepts/workflows/workflow-api/ - /horsies-rust/concepts/workflows/subworkflows/ - /horsies-rust/tasks/defining-tasks/ - /horsies-rust/tasks/sending-tasks/ - /horsies-rust/tasks/error-handling/ - /horsies-rust/tasks/errors/ - /horsies-rust/tasks/retrieving-results/ - /horsies-rust/tasks/retry-policy/ - /horsies-rust/configuration/app-config/ - /horsies-rust/configuration/broker-config/ - /horsies-rust/configuration/recovery-config/ - /horsies-rust/workers/worker-architecture/ - /horsies-rust/workers/concurrency/ - /horsies-rust/workers/heartbeats-recovery/ - /horsies-rust/monitoring/syce-overview/ - /horsies-rust/monitoring/broker-methods/ - /horsies-rust/scheduling/scheduler-overview/ - /horsies-rust/scheduling/schedule-patterns/ - /horsies-rust/scheduling/schedule-config/ - /horsies-rust/cli/ - /horsies-rust/internals/database-schema/ - /horsies-rust/internals/serialization/