Workflow Semantics
Overview
Section titled “Overview”Workflows in Horsies are Directed Acyclic Graphs (DAGs) where:
- Nodes are tasks to execute
- Edges are dependencies between tasks
- Execution proceeds as dependencies are satisfied
Workflow Status Lifecycle
Section titled “Workflow Status Lifecycle”PENDING → RUNNING → COMPLETED ↘ FAILED ↘ PAUSED (if on_error="pause") ↘ CANCELLED (manual)Terminal workflow statuses: COMPLETED, FAILED, CANCELLED. (PAUSED is non-terminal.)
Task Status Lifecycle
Section titled “Task Status Lifecycle”PENDING → READY → ENQUEUED → RUNNING → COMPLETED ↘ FAILED ↘ SKIPPED (if upstream failed/skipped)Terminal task statuses (within workflows): COMPLETED, FAILED, SKIPPED.
Status Enums
Section titled “Status Enums”WorkflowStatus values:
| Enum | Value | Terminal |
|---|---|---|
PENDING | "PENDING" | No |
RUNNING | "RUNNING" | No |
COMPLETED | "COMPLETED" | Yes |
FAILED | "FAILED" | Yes |
PAUSED | "PAUSED" | No |
CANCELLED | "CANCELLED" | Yes |
WorkflowTaskStatus values:
| Enum | Value | Terminal |
|---|---|---|
PENDING | "PENDING" | No |
READY | "READY" | No |
ENQUEUED | "ENQUEUED" | No |
RUNNING | "RUNNING" | No |
COMPLETED | "COMPLETED" | Yes |
FAILED | "FAILED" | Yes |
SKIPPED | "SKIPPED" | Yes |
Use is_terminal or the frozenset constants to check terminal status programmatically:
from horsies import ( WorkflowStatus, WorkflowTaskStatus, WORKFLOW_TERMINAL_STATES, WORKFLOW_TASK_TERMINAL_STATES,)
WorkflowStatus.COMPLETED.is_terminal # TrueWorkflowStatus.PAUSED.is_terminal # False
WorkflowTaskStatus.SKIPPED.is_terminal # TrueWorkflowTaskStatus.ENQUEUED.is_terminal # False
# Frozensets for use in queries or filtersWORKFLOW_TERMINAL_STATES # frozenset({COMPLETED, FAILED, CANCELLED})WORKFLOW_TASK_TERMINAL_STATES # frozenset({COMPLETED, FAILED, SKIPPED})Retries and Crash Recovery
Section titled “Retries and Crash Recovery”Task retries inside workflows
Section titled “Task retries inside workflows”Workflow tasks use the same retry mechanism as standalone tasks:
- A task retries only when it has a
retry_policyand the failure matchesauto_retry_for(configured onRetryPolicy). auto_retry_foris a required field onRetryPolicy, so retry triggers are always co-located with retry timing.- While retrying, the workflow task remains
RUNNINGuntil the final attempt completes.
Worker crash / restart
Section titled “Worker crash / restart”If a worker crashes mid-task, recovery behaves as follows:
- CLAIMED task (never started): requeued safely.
- RUNNING task (started): retried if the task has
WORKER_CRASHEDinauto_retry_forand retries remaining; otherwise markedFAILEDwithWORKER_CRASHED. - Workflow reconciliation: if
tasks.statusis terminal but the correspondingworkflow_tasks.statusis still non-terminal, recovery triggers the normal completion path to updateworkflow_tasks, propagate to dependents, and finalize the workflow.
When a task is terminal but its result is missing, recovery synthesizes errors:
TASK_CANCELLEDfor cancelled tasksRESULT_NOT_AVAILABLEfor completed tasks missing resultsWORKER_CRASHEDfor failed tasks missing results
Failure Semantics
Section titled “Failure Semantics”When is a workflow marked FAILED?
Section titled “When is a workflow marked FAILED?”A workflow is marked FAILED if any task fails (when on_error="fail", the default).
The failure flow:
- Task fails → workflow stores error, but status remains
RUNNING - DAG continues resolving (dependents may be SKIPPED or run with
allow_failed_deps) - Once all tasks reach terminal state → workflow marked
FAILED
This design allows recovery handlers (allow_failed_deps=True) to execute before the workflow finalizes.
on_error Policies
Section titled “on_error Policies”| Policy | Behavior |
|---|---|
fail (default) | Store error, continue DAG resolution, mark FAILED when complete |
pause | Immediately pause workflow, block new enqueues, await resume |
Failure Propagation and Short-Circuiting
Section titled “Failure Propagation and Short-Circuiting”Horsies does not stop the entire workflow on first failure. It resolves the DAG to terminal state and applies failure rules per dependency. Short-circuiting happens along the failed path, not globally.
Default behavior (join="all", allow_failed_deps=False, on_error="fail"):
- When a dependency fails or is skipped, any downstream task that requires all dependencies is SKIPPED.
- That skip propagates to its dependents.
- Other branches that do not depend on the failed task still run.
- The workflow is marked FAILED after all tasks are terminal.
Example:
A → B → CA → D- If A fails:
- B is SKIPPED
- C is SKIPPED
- D still runs (independent branch)
- Workflow ends as FAILED
To run recovery logic downstream:
Set allow_failed_deps=True on the downstream task. It will run and receive the failed TaskResult.
Join rules change the short-circuit behavior:
join="any": a task can run if any dependency succeeds, even if others fail.join="quorum": a task can run if the success threshold is met, even with some failures.
Pause behaves differently:
on_error="pause"does not kill running tasks.- It blocks new enqueues and waits for
resume()orcancel().
Result access is separate:
WorkflowHandle.get()returns the workflow-levelTaskResult(output value or error, not a status enum).WorkflowHandle.result_for()is non-blocking and may returnRESULT_NOT_READYif the task has not completed.
Upstream Failure and args_from
Section titled “Upstream Failure and args_from”When an upstream task fails and the downstream task uses join="all" (the default):
allow_failed_deps=False(default): The downstream task is SKIPPED. It does not execute and does not receive the failed result.allow_failed_deps=True: The downstream task runs and receives the upstreamTaskResultwithis_err() == True.
on_error="fail" does not short-circuit the DAG. The workflow stores the error and continues resolving dependents — downstream tasks are SKIPPED (or run if allow_failed_deps=True) before the workflow is marked FAILED.
With join="any", a downstream task runs when any dependency succeeds, so a single upstream failure does not cause a skip unless all dependencies fail. With join="quorum", the task runs when min_success dependencies succeed, so failures only matter if they make the threshold unreachable.
from horsies import ( Horsies, AppConfig, PostgresConfig, TaskNode, TaskResult, TaskError,)
config = AppConfig( broker=PostgresConfig( database_url="postgresql+psycopg://user:password@localhost:5432/mydb", ),)app = Horsies(config)
@app.task("produce")def produce() -> TaskResult[int, TaskError]: return TaskResult(ok=1)
@app.task("process")def process(data: TaskResult[int, TaskError]) -> TaskResult[str, TaskError]: if data.is_err(): return TaskResult(err=TaskError( error_code="UPSTREAM_FAILED", message="Upstream task failed", )) return TaskResult(ok=str(data.ok_value))
node_a: TaskNode[int] = TaskNode(fn=produce)
# Default (join="all", allow_failed_deps=False): downstream is SKIPPED when upstream failsnode_b_skip: TaskNode[str] = TaskNode( fn=process, waits_for=[node_a], args_from={"data": node_a},)
# With allow_failed_deps: downstream runs and receives the failed TaskResultnode_b_recover: TaskNode[str] = TaskNode( fn=process, waits_for=[node_a], args_from={"data": node_a}, allow_failed_deps=True,)The manual is_err() check is the intended pattern when allow_failed_deps=True. Without allow_failed_deps, the task never executes on upstream failure — no error handling code is needed.
PAUSE Semantics (Cooperative Stop)
Section titled “PAUSE Semantics (Cooperative Stop)”When a task fails with on_error="pause":
- Immediate status change: Workflow status becomes
PAUSED - New enqueues blocked: No new tasks will transition from PENDING → READY or READY → ENQUEUED
- Running tasks may complete: Tasks already running will finish (cooperative, no force-kill)
- Already-enqueued tasks: Tasks may still be claimed briefly, but paused workflows are filtered post-claim and those tasks are unclaimed (they do not execute until resume)
- Resume required: Workflow remains PAUSED until an explicit resume operation
What PAUSE blocks:
- Tasks cannot become READY while workflow is PAUSED
- READY tasks cannot be ENQUEUED while workflow is PAUSED
- Retries are blocked (retries are new enqueues)
- Downstream task propagation is halted
What PAUSE allows:
- Already-running tasks complete normally
- Tasks already claimed/started before PAUSE will finish (cooperative stop)
Resume
Section titled “Resume”A paused workflow can be resumed via WorkflowHandle.resume() or WorkflowHandle.resume_async().
start_result = await spec.start_async()handle = start_result.ok_value
# ... workflow pauses due to task failure ...
# Resume the workflowresumed = await handle.resume_async() # Returns HandleResult[bool] — unwrap with .ok_valueResume behavior:
- Guard: Only works if workflow status is
PAUSED. ReturnsFalse(no-op) for other states. - Transition: Sets workflow status from
PAUSED→RUNNING - Re-evaluate PENDING tasks: Checks if dependencies are terminal, marks READY if so
- Enqueue READY tasks: All READY tasks are enqueued with their dependency results
What resume does NOT do:
- Does not restart failed tasks (they remain FAILED)
- Does not affect already-running tasks
- Does not change COMPLETED/FAILED/CANCELLED workflows
Dependency Semantics
Section titled “Dependency Semantics”Join Modes
Section titled “Join Modes”TaskNode supports three join modes via the join parameter:
| Join Mode | Behavior |
|---|---|
all (default) | Task runs when ALL dependencies are terminal |
any | Task runs when ANY dependency succeeds (COMPLETED) |
quorum | Task runs when at least min_success dependencies succeed |
Note: COMPLETED means the dependency returned TaskResult.ok. A dependency that finished execution but returned TaskResult.err is FAILED, not COMPLETED.
AND-join (join=“all”)
Section titled “AND-join (join=“all”)”Task runs when: ALL dependencies are in terminal state (COMPLETED/FAILED/SKIPPED)Behavior matrix:
| Dependency State | allow_failed_deps=False | allow_failed_deps=True |
|---|---|---|
| All COMPLETED | Task runs normally | Task runs normally |
| Any FAILED | Task is SKIPPED | Task runs with failed TaskResult |
| Any SKIPPED | Task is SKIPPED | Task runs with UPSTREAM_SKIPPED sentinel |
OR-join (join=“any”)
Section titled “OR-join (join=“any”)”# Task runs when any dependency succeedsaggregator = TaskNode( fn=aggregate, waits_for=[branch_a, branch_b, branch_c], join="any",)Semantics:
- Task becomes READY when any dependency is COMPLETED
- Task is SKIPPED if all dependencies fail/skip (none succeeded)
- Failed/skipped deps do NOT block the task (unlike join=“all”)
Quorum Join (join=“quorum”)
Section titled “Quorum Join (join=“quorum”)”# Task runs when at least 2 of 3 dependencies succeedquorum_task = TaskNode( fn=quorum_handler, waits_for=[replica_a, replica_b, replica_c], join="quorum", min_success=2,)Semantics:
- Task becomes READY when
min_successdependencies are COMPLETED - Task is SKIPPED if it becomes impossible to reach threshold
- Example: 2 of 3 deps failed, but need 2 successes → impossible → SKIPPED
min_successmust be >= 1 and <= number of dependencies
SKIPPED Propagation
Section titled “SKIPPED Propagation”When a task is SKIPPED, all its dependents are also SKIPPED (unless they have allow_failed_deps=True). This cascades through the DAG.
A (FAILED) → B (SKIPPED) → C (SKIPPED) → D (SKIPPED)allow_failed_deps=True
Section titled “allow_failed_deps=True”Tasks with allow_failed_deps=True receive the TaskResult from dependencies:
- FAILED deps: Receive the actual
TaskResultwith the original error - SKIPPED deps: Receive a sentinel
TaskResultwitherror_code=UPSTREAM_SKIPPED
from horsies import OutcomeCode, RetrievalCode
@app.task(task_name="recovery_handler")def recovery_handler(input_result: TaskResult[Data, TaskError]) -> TaskResult[...]: if input_result.is_err(): err = input_result.err_value if err.error_code == OutcomeCode.UPSTREAM_SKIPPED: # Dependency was skipped (upstream failure) return TaskResult(ok={"status": "skipped_upstream"}) # Handle actual failure return TaskResult(ok={"recovered": True, "error": err}) return TaskResult(ok={"passed_through": input_result.ok_value})UPSTREAM_SKIPPED Sentinel
Section titled “UPSTREAM_SKIPPED Sentinel”When a dependency is SKIPPED (due to upstream failure), tasks with allow_failed_deps=True receive a sentinel TaskResult instead of missing kwargs:
TaskResult(err=TaskError( error_code=OutcomeCode.UPSTREAM_SKIPPED, message="Upstream dependency was SKIPPED", data={"dependency_index": <index>},))This applies to both args_from kwargs and workflow_ctx_from context data.
DAG Examples
Section titled “DAG Examples”Linear Chain
Section titled “Linear Chain”A → B → C → D- If A fails: B, C, D are SKIPPED
- Workflow: FAILED
Fan-out / Fan-in
Section titled “Fan-out / Fan-in” ┌→ B ─┐A ──┼→ C ─┼→ E └→ D ─┘- If B fails: E is SKIPPED (waiting on B)
- C and D still complete
- Workflow: FAILED
Diamond with Partial Failure
Section titled “Diamond with Partial Failure” ┌→ B ─┐ A ──┤ ├→ D └→ C ─┘- If B fails: D is SKIPPED (waits for both B and C)
- Even though C succeeds, D doesn’t run
- Workflow: FAILED
Diamond with Recovery
Section titled “Diamond with Recovery” ┌→ B ─┐ A ──┤ ├→ D (allow_failed_deps=True) └→ C ─┘- If B fails: D still runs (receives B’s error, C’s success)
- D can implement recovery logic
- Workflow: FAILED (but D produced a result)
Note:
handle.get()will return the workflow-level failure. To access D’s result in this scenario, usehandle.results()["D#3"](where 3 is D’s task index) or inspect the specific task.
Multi-Branch with Nested Convergence
Section titled “Multi-Branch with Nested Convergence” ┌── ca ── e₁ │ ┌── c ──────┤ │ └── cb ── e₂a ── b ─┤ │ ┌── da ── e₃ └── d ──────┤ └── db ── e₄Current behavior if c fails:
ca,cb→ SKIPPEDe₁,e₂→ SKIPPEDd,da,db,e₃,e₄→ may complete successfully- Workflow: FAILED (because
cfailed)
Limitations
Section titled “Limitations”Not Supported: Dynamic Task Generation
Section titled “Not Supported: Dynamic Task Generation”Scenario: “Generate tasks at runtime based on input data”
# Wanted (not implemented):@workflowdef process_items(items: list[str]): for item in items: yield TaskNode(fn=process_item, kwargs={'item': item})Current: DAG is static at submission time. All tasks must be defined before workflow starts.
Support Matrix
Section titled “Support Matrix”| Pattern | Supported | Notes |
|---|---|---|
| Linear chain | Yes | Basic sequential execution |
| Fan-out (parallel branches) | Yes | Independent parallel execution |
| Fan-in (convergence) | Yes | AND-join: all branches must complete |
| Diamond pattern | Yes | Classic DAG pattern |
| AND-join with error handling | Yes | allow_failed_deps=True |
| OR-join (any branch succeeds) | Yes | join="any" parameter |
| Quorum join (N of M succeed) | Yes | join="quorum" with min_success |
| Partial success workflow | Yes | Via success_policy with success cases |
| Dynamic task generation | No | DAG is static at submission time |
Retry Policies in Workflows
Section titled “Retry Policies in Workflows”Workflow tasks honor the same retry policies configured on the @task decorator. When a task with a retry_policy is used in a workflow, those settings (including auto_retry_for) are preserved and applied when the workflow engine enqueues the task.
from horsies import RetryPolicy
@app.task( retry_policy=RetryPolicy.exponential( base_seconds=1, max_retries=3, auto_retry_for=["TASK_EXCEPTION", "NETWORK_ERROR"], ),)def fetch_data(url: str) -> TaskResult[dict, TaskError]: ...
# This task will retry up to 3 times when used in a workflownode = TaskNode(fn=fetch_data, kwargs={'url': "https://api.example.com"})Behavior:
max_retriesfrom the retry policy is applied to the underlying taskauto_retry_forerror codes (onRetryPolicy) trigger automatic retries on matching failures- Retries happen at the task level (worker handles retry scheduling)
- A task that fails after exhausting retries propagates failure to the workflow
Success Cases (Partial Success Policy)
Section titled “Success Cases (Partial Success Policy)”By default, any task failure marks the workflow as FAILED. The success_policy option allows defining explicit success criteria, so a workflow can be COMPLETED even if some tasks fail.
SuccessCase and SuccessPolicy
Section titled “SuccessCase and SuccessPolicy”from horsies import SuccessPolicy, SuccessCase
# Define success casespolicy = SuccessPolicy( cases=[ SuccessCase(required=[task_a]), # Succeed if A completes SuccessCase(required=[task_b]), # OR if B completes ], optional=[cleanup_task], # May fail without failing workflow)
workflow = app.workflow( "my_workflow", tasks=[task_a, task_b, cleanup_task], success_policy=policy,)Semantics
Section titled “Semantics”- Case evaluation: A
SuccessCaseis satisfied if ALL itsrequiredtasks are COMPLETED. - Workflow success: Workflow is COMPLETED if any SuccessCase is satisfied.
- SKIPPED counts as not satisfied: A SKIPPED task does not satisfy a required condition.
- Optional tasks: Tasks in
optionalcan fail without affecting success evaluation. - Default behavior: If
success_policyis None, any task failure → FAILED (unchanged).
Example: Shipping Workflow
Section titled “Example: Shipping Workflow”A package delivery workflow where multiple delivery outcomes are acceptable:
# Define taskspickup = TaskNode(fn=pickup_package)deliver_recipient = TaskNode(fn=deliver_to_recipient, waits_for=[pickup])deliver_neighbor = TaskNode(fn=deliver_to_neighbor, waits_for=[pickup])deliver_locker = TaskNode(fn=deliver_to_locker, waits_for=[pickup])notify = TaskNode(fn=send_notification, waits_for=[pickup])
# Success policy: any delivery method is acceptablepolicy = SuccessPolicy( cases=[ SuccessCase(required=[deliver_recipient]), SuccessCase(required=[deliver_neighbor]), SuccessCase(required=[deliver_locker]), ], optional=[notify], # Notification can fail)
spec = app.workflow( name="ship_package", tasks=[pickup, deliver_recipient, deliver_neighbor, deliver_locker, notify], success_policy=policy,)Behavior:
- If
deliver_to_recipientsucceeds → workflow COMPLETED - If
deliver_to_recipientfails butdeliver_to_neighborsucceeds → workflow COMPLETED - If all delivery methods fail → workflow FAILED
- If
notifyfails but a delivery succeeds → workflow COMPLETED
Error Handling with Success Policy
Section titled “Error Handling with Success Policy”When no success case is satisfied:
- If a required task from any case FAILED, the workflow error contains that task’s error.
- If no required task FAILED (all SKIPPED), the error is
WORKFLOW_SUCCESS_CASE_NOT_MET.
Best Practices
Section titled “Best Practices”1. Use allow_failed_deps for Recovery
Section titled “1. Use allow_failed_deps for Recovery”# Primary task that can failprimary = TaskNode(fn=fetch_data)
# Recovery handler receives the errorrecovery = TaskNode( fn=recovery_handler, allow_failed_deps=True, waits_for=[primary], args_from={"primary": primary},)2. Design for Failure Visibility
Section titled “2. Design for Failure Visibility”Structure DAGs so failures are visible in the final result:
# Final aggregator that reports all branch outcomesfinal = TaskNode( fn=summarize_results, allow_failed_deps=True, waits_for=[branch_a, branch_b, branch_c],)3. Use on_error=“pause” for Critical Workflows
Section titled “3. Use on_error=“pause” for Critical Workflows”workflow = app.workflow( "critical_pipeline", tasks=[...], on_error="pause", # Stop for manual review on failure)4. Explicit Output Task
Section titled “4. Explicit Output Task”# Designate which task's result is the workflow outputworkflow = app.workflow( "pipeline", tasks=[a, b, c, final], output=final, # workflow.get() returns final's result)Result Keying
Section titled “Result Keying”Results from WorkflowHandle.results() and WorkflowHandle.get() (when no output task is specified) use unique keys in the format node_id.
This ensures no result collisions when the same task function is used multiple times in a workflow.
# Same task used twicefetch_a = TaskNode(fn=fetch_data, kwargs={'url': "url_a"}) # index 0fetch_b = TaskNode(fn=fetch_data, kwargs={'url': "url_b"}) # index 1
spec = WorkflowSpec(name="parallel_fetch", tasks=[fetch_a, fetch_b], broker=broker)start_result = await spec.start_async()handle = start_result.ok_value
# Wait for workflow completion firstawait handle.get_async(timeout_ms=30000)
# Now safe to access individual results (node_id keys)results_handle = handle.results()results = results_handle.ok_value # HandleResult[dict[...]] → unwrapresult_a = results["parallel_fetch:0"] # First fetchresult_b = results["parallel_fetch:1"] # Second fetch
# Typed access using handle.result_for()# NOTE: result_for() / result_for_async() are non-blocking single queries.# Always wait for completion first.typed_a = handle.result_for(fetch_a)typed_b = handle.result_for(fetch_b)WorkflowContext (Type-Safe Result Access)
Section titled “WorkflowContext (Type-Safe Result Access)”Tasks can access upstream results via WorkflowContext using the type-safe result_for(...) method.
Enabling WorkflowContext
Section titled “Enabling WorkflowContext”- Set
workflow_ctx_fromon the TaskNode to specify which upstream results to include - Declare
workflow_ctx: WorkflowContext | Noneparameter in the task function
Important: workflow_ctx_from is per-task. The context is built fresh for each task that opts in, and does not persist or propagate to downstream tasks.
Scope: Inside a task function, only results from nodes listed in workflow_ctx_from are accessible via workflow_ctx.result_for(). Accessing a node not in the list raises KeyError with "TaskNode id '{id}' not in workflow context".
This is distinct from WorkflowHandle.result_for(), which queries the database for any node in the workflow and returns RESULT_NOT_READY if the task hasn’t completed.
from horsies import WorkflowContext, TaskNode, TaskResult, TaskError
node_a: TaskNode[int] = TaskNode(fn=fetch_data, kwargs={'url': "url"})node_b: TaskNode[str] = TaskNode(fn=transform, waits_for=[node_a], args_from={"data": node_a})node_c: TaskNode[Summary] = TaskNode( fn=aggregate, waits_for=[node_a, node_b], workflow_ctx_from=[node_a, node_b], # Include both in context)
@app.task(task_name="aggregate")def aggregate(workflow_ctx: WorkflowContext | None = None) -> TaskResult[Summary, TaskError]: if workflow_ctx is None: return TaskResult(err=TaskError(error_code="NO_CTX", message="Missing context"))
# Type-safe access: result_for returns TaskResult[T, TaskError] matching the node's type a_result: TaskResult[int, TaskError] = workflow_ctx.result_for(node_a) b_result: TaskResult[str, TaskError] = workflow_ctx.result_for(node_b)
if a_result.is_err(): return TaskResult(err=a_result.err_value)
return TaskResult(ok=Summary( data=a_result.ok_value, transformed=b_result.ok_value if b_result.is_ok() else None, ))Type Safety
Section titled “Type Safety”result_for(node: TaskNode[T] | NodeKey[T]) returns TaskResult[T, TaskError] where T matches the node’s generic parameter. This enables static type checking:
# Type checker knows these typesa_result: TaskResult[int, TaskError] = ctx.result_for(node_a) # node_a is TaskNode[int]b_result: TaskResult[str, TaskError] = ctx.result_for(node_b) # node_b is TaskNode[str]
if a_result.is_ok(): value: int = a_result.ok_value # Type checker knows this is intNodeKey (Stable Lookup)
Section titled “NodeKey (Stable Lookup)”WorkflowContext looks up results by node_id, not object identity. This means you can safely use
TaskNode instances or typed NodeKey objects:
key_a = node_a.key() # NodeKey[int]result = workflow_ctx.result_for(key_a)For dynamic workflows where node references are not available inside the task function, construct a NodeKey from the string ID:
from horsies import ( Horsies, AppConfig, PostgresConfig, WorkflowContext, NodeKey, TaskResult, TaskError,)
config = AppConfig( broker=PostgresConfig( database_url="postgresql+psycopg://user:password@localhost:5432/mydb", ),)app = Horsies(config)
@app.task("fan_in")def fan_in(workflow_ctx: WorkflowContext | None = None) -> TaskResult[str, TaskError]: if workflow_ctx is None: return TaskResult(err=TaskError(error_code="NO_CTX", message="Missing context"))
i: int = 0 result = workflow_ctx.result_for(NodeKey(f"save_{i}")) if result.is_err(): return TaskResult(err=result.err_value) return TaskResult(ok="ok")The string-based NodeKey pattern is common for fan-in tasks that gather results from dynamically created nodes.
If a TaskNode lacks node_id, result_for() raises:
RuntimeError: TaskNode node_id is not set. Ensure WorkflowSpec assigns node_idor provide an explicit node_id.node_id is optional on TaskNode. If omitted, it is auto-assigned during
WorkflowSpec construction using the format {slugified_workflow_name}:{task_index}.
Workflow names can contain any characters (including spaces). The slugify()
function converts spaces to underscores and removes invalid characters:
from horsies import slugify
# Workflow name with spacesspec = app.workflow(name="My Data Pipeline", tasks=[a, b])# Auto-generated node_ids: "My_Data_Pipeline:0", "My_Data_Pipeline:1"
# Manual slugify for custom usesafe_name = slugify("Hello World!") # "Hello_World"You can also provide an explicit node_id for stable external references.
Explicit node_ids must match the pattern [A-Za-z0-9_\-:.]+.
WorkflowMeta (Metadata Only)
Section titled “WorkflowMeta (Metadata Only)”For tasks that only need workflow metadata without result access, use WorkflowMeta:
from horsies import WorkflowMeta
@app.task(task_name="my_task")def my_task(workflow_meta: WorkflowMeta | None = None) -> TaskResult[str, TaskError]: if workflow_meta: print(f"Running in workflow {workflow_meta.workflow_id}") print(f"Task index: {workflow_meta.task_index}") print(f"Task name: {workflow_meta.task_name}") return TaskResult(ok="done")WorkflowMeta is auto-injected if the task declares the parameter. Unlike WorkflowContext, it doesn’t require workflow_ctx_from.
Comparison: args_from vs workflow_ctx_from
Section titled “Comparison: args_from vs workflow_ctx_from”| Feature | args_from | workflow_ctx_from |
|---|---|---|
| Use case | Direct typed parameters | Access multiple upstream results |
| Type safety | Full (typed parameter) | Full (via result_for(node)) |
| Function signature | Explicit parameters | Single WorkflowContext param |
| Best for | 1-3 dependencies | Many dependencies, aggregators |
Prefer args_from for simple cases - it’s more explicit and keeps data flow obvious.
Use workflow_ctx_from when you need to access many upstream results or want workflow metadata.
Important: When using args_from or workflow_ctx_from, positional args are not allowed.
Put static values in kwargs instead.
Important: kwargs and args_from keys must be disjoint. If the same key appears in both,
validation fails with HRS-021 (WORKFLOW_KWARGS_ARGS_FROM_OVERLAP).
For type-safe node construction with static arguments, see Typed Node Builder.
args_from: What the Receiving Function Gets
Section titled “args_from: What the Receiving Function Gets”args_from delivers the upstream task’s full TaskResult[T, TaskError] — not just the raw T.
from horsies import ( Horsies, AppConfig, PostgresConfig, TaskNode, TaskResult, TaskError,)
config = AppConfig( broker=PostgresConfig( database_url="postgresql+psycopg://user:password@localhost:5432/mydb", ),)app = Horsies(config)
@app.task("produce_number")def produce_number() -> TaskResult[int, TaskError]: return TaskResult(ok=42)
# transform receives TaskResult[int, TaskError], not int:@app.task("transform")def transform(data: TaskResult[int, TaskError]) -> TaskResult[str, TaskError]: if data.is_err(): return TaskResult(err=data.err_value) value: int = data.ok_value return TaskResult(ok=str(value))
node_a: TaskNode[int] = TaskNode(fn=produce_number)node_b: TaskNode[str] = TaskNode( fn=transform, waits_for=[node_a], args_from={"data": node_a},)Things to Avoid
Section titled “Things to Avoid”Fire-and-Forget with result_for()
Section titled “Fire-and-Forget with result_for()”result_for() is non-blocking. It queries the database once and returns immediately. If the task hasn’t completed, it returns TaskResult(err=TaskError(error_code=RESULT_NOT_READY)).
Don’t do this:
# Fire-and-forget: start workflow without waitingstart_result = spec.start()handle = start_result.ok_value
# Immediately call result_for() - task likely hasn't completed yetresult = handle.result_for(node) # Returns RESULT_NOT_READY errorDo this instead:
start_result = spec.start()handle = start_result.ok_value
# Option 1: Wait for workflow completion firsthandle.get(timeout_ms=30000)result = handle.result_for(node) # `RESULT_NOT_READY` is still possible depending on your workflow definition
# Option 2: Check if result is readyresult = handle.result_for(node)if result.is_err() and result.err_value.error_code == RetrievalCode.RESULT_NOT_READY: # Handle not-ready case: poll, wait, or skip, decide for your use case do_something()Why this matters:
- Workflows can be long-running (minutes to hours)
result_for()does not block or poll - it’s a single database query- Use
handle.get(timeout_ms=...)to wait for completion before accessing individual task results
For class and method signatures, see Workflow API.