Workflow Semantics
Overview
Section titled “Overview”Workflows in horsies are DAGs:
- nodes are tasks or sub-workflows
- edges are dependencies
- readiness is computed from dependency state, join mode, and failure policy
Workflow Status Lifecycle
Section titled “Workflow Status Lifecycle”Pending -> Running -> Completed -> Failed -> Paused -> CancelledTerminal workflow statuses:
CompletedFailedCancelled
Paused is non-terminal.
Workflow Task Status Lifecycle
Section titled “Workflow Task Status Lifecycle”Pending -> Ready -> Enqueued -> Running -> Completed -> FailedPending/Ready/Enqueued/Running -> SkippedTerminal workflow-task statuses:
CompletedFailedSkipped
OnError
Section titled “OnError”Rust currently supports two workflow error policies:
| Policy | Behavior |
|---|---|
Fail | Continue DAG resolution, then mark workflow failed |
Pause | Pause immediately and block new enqueues until resume |
There is no public Continue variant in the Rust API.
Failure Semantics
Section titled “Failure Semantics”With the default settings:
join_all()allow_failed_deps(false)on_error = OnError::Fail
the behavior is:
- a task failure does not instantly terminate the whole workflow
- downstream tasks that require the failed dependency are skipped
- independent branches continue
- once the DAG reaches terminal state, the workflow becomes
Failed
Example:
A -> B -> CA -> DIf A fails:
Bis skippedCis skippedDstill runs if it is otherwise runnable- workflow ends as
Failed
Dependency Semantics
Section titled “Dependency Semantics”waits_for(...)
Section titled “waits_for(...)”waits_for(...) means:
- do not consider this node runnable until the dependency reaches terminal state
It does not mean “require success”. Success requirements come from join mode and allow_failed_deps.
Join Modes
Section titled “Join Modes”| Join Mode | Meaning |
|---|---|
join_all() | default; wait for all dependencies to become terminal |
join_any() | run when any dependency completes successfully |
join_quorum(min) | run when at least min dependencies complete successfully |
All-join
Section titled “All-join”| Upstream state | allow_failed_deps(false) | allow_failed_deps(true) |
|---|---|---|
| all completed | runs | runs |
| any failed | skipped | runs, receives failed TaskResult |
| any skipped | skipped | runs, receives UpstreamSkipped sentinel |
Any-join
Section titled “Any-join”let aggregate = builder.task( aggregate_results::node()? .waits_for(branch_a) .waits_for(branch_b) .waits_for(branch_c) .join_any(),);Behavior:
- becomes ready when any dependency completes successfully
- is skipped if all dependencies fail or skip
Quorum
Section titled “Quorum”let quorum = builder.task( quorum_handler::node()? .waits_for(replica_a) .waits_for(replica_b) .waits_for(replica_c) .join_quorum(2),);Behavior:
- becomes ready when
mindependencies complete successfully - is skipped if the threshold becomes impossible to reach
allow_failed_deps(true)
Section titled “allow_failed_deps(true)”This lets a downstream node run even when upstream dependencies failed or were skipped.
The downstream task receives full TaskResult<T> values and can implement fallback or recovery logic itself.
use horsies::{task, TaskError, TaskResult};
#[task("recovery_handler")]async fn recovery_handler(primary_result: TaskResult<String>) -> Result<String, TaskError> { match primary_result { TaskResult::Ok(v) => Ok(v), TaskResult::Err(_err) => Ok("fallback".into()), }}Data Flow Semantics
Section titled “Data Flow Semantics”.arg_from(...)
Section titled “.arg_from(...)”arg_from(...) injects the upstream result as a TaskResult<S>, not the raw success value.
let process = builder.task( process_user::node()? .waits_for(fetch) .arg_from(process_user::params::user(), fetch),);That means the receiving task should declare:
#[horsies::task("process_user")]async fn process_user(user: TaskResult<User>) -> Result<ProcessedUser, TaskError> { // ...}.workflow_ctx_from(...)
Section titled “.workflow_ctx_from(...)”workflow_ctx_from(...) selects upstream nodes whose results should be available through WorkflowContext.
let enrich = builder.task( enrich_user::node()? .waits_for(fetch) .workflow_ctx_from([fetch]),);Important:
- context sources are ref-based
- they do not add dependencies automatically
- every context source still needs to appear in
waits_for(...)/waits_for_all(...)
Retries and Crash Recovery
Section titled “Retries and Crash Recovery”Workflow tasks use the same retry policy model as standalone tasks:
- retry only when
auto_retry_formatches - respect
retry_policy
Crash recovery:
- claimed but never-started work is requeued safely
- running work may retry if policy allows, otherwise it fails
- workflow reconciliation then applies the normal completion path
Success Policies
Section titled “Success Policies”By default, any required task failure means the workflow ends Failed.
SuccessPolicy allows explicit partial-success rules.
use horsies::{SuccessCase, SuccessPolicy};
let policy = SuccessPolicy { cases: vec![ SuccessCase { required_indices: vec![0], name: Some("delivery_a".into()), }, SuccessCase { required_indices: vec![1], name: Some("delivery_b".into()), }, ], optional_indices: Some(vec![2]),};
builder.success_policy(policy);Semantics:
- a case is satisfied when all its required nodes completed
- the workflow succeeds if any case is satisfied
- optional indices do not affect success
Limits
Section titled “Limits”Dynamic task generation during workflow execution is not supported. The DAG is fixed at submission time.