Skip to content

Workflow Semantics

Workflows in horsies are DAGs:

  • nodes are tasks or sub-workflows
  • edges are dependencies
  • readiness is computed from dependency state, join mode, and failure policy
Pending -> Running -> Completed
-> Failed
-> Paused
-> Cancelled

Terminal workflow statuses:

  • Completed
  • Failed
  • Cancelled

Paused is non-terminal.

Pending -> Ready -> Enqueued -> Running -> Completed
-> Failed
Pending/Ready/Enqueued/Running -> Skipped

Terminal workflow-task statuses:

  • Completed
  • Failed
  • Skipped

Rust currently supports two workflow error policies:

PolicyBehavior
FailContinue DAG resolution, then mark workflow failed
PausePause immediately and block new enqueues until resume

There is no public Continue variant in the Rust API.

With the default settings:

  • join_all()
  • allow_failed_deps(false)
  • on_error = OnError::Fail

the behavior is:

  1. a task failure does not instantly terminate the whole workflow
  2. downstream tasks that require the failed dependency are skipped
  3. independent branches continue
  4. once the DAG reaches terminal state, the workflow becomes Failed

Example:

A -> B -> C
A -> D

If A fails:

  • B is skipped
  • C is skipped
  • D still runs if it is otherwise runnable
  • workflow ends as Failed

waits_for(...) means:

  • do not consider this node runnable until the dependency reaches terminal state

It does not mean “require success”. Success requirements come from join mode and allow_failed_deps.

Join ModeMeaning
join_all()default; wait for all dependencies to become terminal
join_any()run when any dependency completes successfully
join_quorum(min)run when at least min dependencies complete successfully
Upstream stateallow_failed_deps(false)allow_failed_deps(true)
all completedrunsruns
any failedskippedruns, receives failed TaskResult
any skippedskippedruns, receives UpstreamSkipped sentinel
let aggregate = builder.task(
aggregate_results::node()?
.waits_for(branch_a)
.waits_for(branch_b)
.waits_for(branch_c)
.join_any(),
);

Behavior:

  • becomes ready when any dependency completes successfully
  • is skipped if all dependencies fail or skip
let quorum = builder.task(
quorum_handler::node()?
.waits_for(replica_a)
.waits_for(replica_b)
.waits_for(replica_c)
.join_quorum(2),
);

Behavior:

  • becomes ready when min dependencies complete successfully
  • is skipped if the threshold becomes impossible to reach

This lets a downstream node run even when upstream dependencies failed or were skipped.

The downstream task receives full TaskResult<T> values and can implement fallback or recovery logic itself.

use horsies::{task, TaskError, TaskResult};
#[task("recovery_handler")]
async fn recovery_handler(primary_result: TaskResult<String>) -> Result<String, TaskError> {
match primary_result {
TaskResult::Ok(v) => Ok(v),
TaskResult::Err(_err) => Ok("fallback".into()),
}
}

arg_from(...) injects the upstream result as a TaskResult<S>, not the raw success value.

let process = builder.task(
process_user::node()?
.waits_for(fetch)
.arg_from(process_user::params::user(), fetch),
);

That means the receiving task should declare:

#[horsies::task("process_user")]
async fn process_user(user: TaskResult<User>) -> Result<ProcessedUser, TaskError> {
// ...
}

workflow_ctx_from(...) selects upstream nodes whose results should be available through WorkflowContext.

let enrich = builder.task(
enrich_user::node()?
.waits_for(fetch)
.workflow_ctx_from([fetch]),
);

Important:

  • context sources are ref-based
  • they do not add dependencies automatically
  • every context source still needs to appear in waits_for(...) / waits_for_all(...)

Workflow tasks use the same retry policy model as standalone tasks:

  • retry only when auto_retry_for matches
  • respect retry_policy

Crash recovery:

  • claimed but never-started work is requeued safely
  • running work may retry if policy allows, otherwise it fails
  • workflow reconciliation then applies the normal completion path

By default, any required task failure means the workflow ends Failed.

SuccessPolicy allows explicit partial-success rules.

use horsies::{SuccessCase, SuccessPolicy};
let policy = SuccessPolicy {
cases: vec![
SuccessCase {
required_indices: vec![0],
name: Some("delivery_a".into()),
},
SuccessCase {
required_indices: vec![1],
name: Some("delivery_b".into()),
},
],
optional_indices: Some(vec![2]),
};
builder.success_policy(policy);

Semantics:

  1. a case is satisfied when all its required nodes completed
  2. the workflow succeeds if any case is satisfied
  3. optional indices do not affect success

Dynamic task generation during workflow execution is not supported. The DAG is fixed at submission time.