Skip to content

Workflow API

Use this page for the exact method signatures and return types used by workflows.

Attribute / MethodType / SignatureDescription
.namestrWorkflow name
.tasksSequence[TaskNode[Any] | SubWorkflowNode[Any]]All nodes in the DAG
.on_errorOnErrorError policy (FAIL or PAUSE)
.outputTaskNode[Any] | SubWorkflowNode[Any] | NoneOutput node for handle.get()
.success_policySuccessPolicy | NoneCustom success criteria
.resend_on_transient_errboolAuto-retry transient start errors (default False)
.start(workflow_id=None)(str | None) -> WorkflowStartResult[WorkflowHandle[T]]Start workflow (sync)
.start_async(workflow_id=None)(str | None) -> WorkflowStartResult[WorkflowHandle[T]]Start workflow (async)
.retry_start(error)(WorkflowStartError) -> WorkflowStartResult[WorkflowHandle[T]]Retry a failed start (sync)
.retry_start_async(error)(WorkflowStartError) -> WorkflowStartResult[WorkflowHandle[T]]Retry a failed start (async)

For cases where you have a WorkflowSpec and a PostgresBroker separately (e.g., in tests or custom orchestration), use the module-level functions:

from horsies import start_workflow, start_workflow_async
# Sync
result = start_workflow(spec, broker, workflow_id="custom-id")
# Async
result = await start_workflow_async(spec, broker)
FunctionSignature
start_workflow(spec, broker, workflow_id=None, sent_at=None, *, resend_on_transient_err=False)-> WorkflowStartResult[WorkflowHandle[T]]
start_workflow_async(spec, broker, workflow_id=None, sent_at=None, *, resend_on_transient_err=False)-> WorkflowStartResult[WorkflowHandle[T]]

These are equivalent to spec.start() / spec.start_async() but allow passing a broker explicitly instead of relying on the one attached to the spec.

WorkflowStartResult[T] = Result[T, WorkflowStartError]

Returned by .start(), .start_async(), start_workflow(), and start_workflow_async().

OutcomeTypeDescription
SuccessOk(WorkflowHandle[T])Workflow created and root tasks enqueued
FailureErr(WorkflowStartError)Start failed with categorized error

WorkflowStartError fields:

FieldTypeDescription
codeWorkflowStartErrorCodeFailure category
messagestrHuman-readable description
retryableboolWhether caller can safely retry
workflow_namestrWorkflow spec name
workflow_idstrGenerated workflow ID (always populated)
exceptionBaseException | NoneOriginal cause

WorkflowStartErrorCode values:

CodeRetryableWhen
BROKER_NOT_CONFIGUREDNospec.start() called without broker
VALIDATION_FAILEDNoDAG validation, serialization, or args error
ENQUEUE_FAILEDMaybeSchema init or DB transaction failed
INTERNAL_FAILEDNoSync bridge or unexpected exception

Usage:

from horsies import Ok, Err
match spec.start():
case Ok(handle):
result = handle.get(timeout_ms=30000) # Wait for completion
if result.is_ok():
print(f"Success: {result.ok_value}")
else:
print(f"Task failed: {result.err_value.error_code}")
case Err(err) if err.retryable:
# Transient infra failure — retry with stored workflow_id
match spec.retry_start(err):
case Ok(handle):
result = handle.get(timeout_ms=30000)
if result.is_err():
print(f"Task failed: {result.err_value.error_code}")
case Err(retry_err):
print(f"Retry failed: {retry_err.code} - {retry_err.message}")
case Err(err):
print(f"[{err.code}] {err.message}")

Idempotency caveat: retry_start is best-effort idempotent by workflow_id. Unlike task send, workflow start does not verify payload identity on collision. Safe for transient DB retries within the same deploy. Cross-version spec drift on the same workflow_id will silently return the existing handle.

Attribute / MethodType / SignatureDescription
.workflow_idstrWorkflow UUID
.status() / .status_async()-> HandleResult[WorkflowStatus]Current workflow status
.get() / .get_async()(timeout_ms: int | None) -> TaskResult[OutT, TaskError]Block until completion or timeout
.results() / .results_async()-> HandleResult[dict[str, TaskResult[Any, TaskError]]]All results keyed by node_id
.result_for() / .result_for_async()(TaskNode[T] | NodeKey[T]) -> TaskResult[T, TaskError]Single node result (non-blocking)
.tasks() / .tasks_async()-> HandleResult[list[WorkflowTaskInfo]]Status of all workflow tasks
.cancel() / .cancel_async()-> HandleResult[None]Cancel workflow
.pause() / .pause_async()-> HandleResult[bool]Pause running workflow
.resume() / .resume_async()-> HandleResult[bool]Resume paused workflow
AttributeTypeDescription
.node_idstr | NoneNode identifier (may be None for legacy rows)
.indexintTask position in the DAG
.namestrTask name
.statusWorkflowTaskStatusCurrent task status
.resultTaskResult[Any, TaskError] | NoneTask result if stored (COMPLETED/FAILED; SKIPPED often None)
.started_atdatetime | NoneWhen execution started
.completed_atdatetime | NoneWhen execution completed

The .node() method on task functions returns a NodeFactory for type-safe TaskNode creation.

MethodSignatureDescription
.node(...)(**workflow_opts) -> NodeFactory[P, T]Create a factory with workflow options
NodeFactory(...)(*args, **kwargs) -> TaskNode[T]Call factory with typed task arguments

Workflow options (all keyword-only, all optional):

ParameterTypeDefaultDescription
waits_forSequence[TaskNode | SubWorkflowNode]NoneDependencies
args_fromdict[str, TaskNode | SubWorkflowNode]NoneResult injection mapping
workflow_ctx_fromSequence[TaskNode | SubWorkflowNode]NoneContext sources
node_idstrNoneStable identifier
queuestrNoneQueue override
priorityintNonePriority override
allow_failed_depsboolFalseRun despite failed deps
joinLiteral['all', 'any', 'quorum']'all'Dependency join semantics
min_successintNoneRequired for join='quorum'
good_untildatetimeNoneTask expiry deadline

See Typed Node Builder for usage examples.

Register a function that builds a WorkflowSpec for validation during horsies check. Registered builders are executed under send suppression (no tasks are enqueued) so the returned WorkflowSpec can be fully validated — DAG structure, kwargs against function signatures, args_from type compatibility, etc.

from horsies import Horsies, WorkflowSpec
app = Horsies(config)
# Zero-parameter builder — called automatically during check
@app.workflow_builder()
def build_etl_pipeline() -> WorkflowSpec:
return app.workflow(
name="etl",
tasks=[...],
definition_key="myapp.etl.v1",
)
# Parameterized builder — provide cases= for check to exercise
@app.workflow_builder(cases=[
{"region": "us-east"},
{"region": "eu-west"},
])
def build_regional_pipeline(region: str) -> WorkflowSpec:
return app.workflow(
name=f"pipeline-{region}",
tasks=[...],
definition_key=f"myapp.pipeline.{region}.v1",
)
ParameterTypeRequiredDescription
caseslist[dict[str, Any]] | NoneNoKwarg dicts to invoke the builder with during check. Required when the builder has parameters without defaults.

Errors:

CodeWhen
HRS-027Parameterized builder missing cases=
HRS-029Builder raises an exception, returns non-WorkflowSpec, or returns a spec missing required validation like definition_key
HRS-030Function returns WorkflowSpec but lacks @app.workflow_builder

For the guarantee model and CI usage, see Startup Validation.