Skip to content

Error Handling

Handle errors explicitly through dedicated handlers or explicit returns. Avoid raising exceptions - they break control flow and obscure error paths.

For exhaustive error code reference, see errors.

This document means to be a pattern guide rather than definitive way to handle your errors. Adjust to your needs.

CategorySourceExamplesAuto-Retry?
Retrieval errorshandle.get()WAIT_TIMEOUT, BROKER_ERRORNo
Execution errorsTask raised exception or returned invalid valueTASK_EXCEPTION, WORKER_CRASHEDIf in auto_retry_for
Domain errorsUser returned error"RATE_LIMITED", "VALIDATION_FAILED"If in auto_retry_for

Handle errors when:

  • The error is a retrieval error (task may still complete)
  • The error is not in auto_retry_for (won’t be retried)
  • The task has exhausted retries (final failure)

The library handles retries automatically when auto_retry_for matches the error:

from horsies import RetryPolicy, TaskResult, TaskError
@app.task(
"fetch_api_data",
retry_policy=RetryPolicy.exponential(
base_seconds=30,
max_retries=3,
auto_retry_for=["TASK_EXCEPTION", "RATE_LIMITED"],
),
)
def fetch_api_data(url: str) -> TaskResult[dict, TaskError]:
# If this returns a RATE_LIMITED error or raises an unhandled exception,
# the worker automatically retries up to 3 times
...

To avoid try/except boilerplate for mapping exceptions to error codes, use exception_mapper:

@app.task(
"fetch_api_data",
retry_policy=RetryPolicy.exponential(
base_seconds=30,
max_retries=3,
auto_retry_for=["RATE_LIMITED", "TIMEOUT"],
),
exception_mapper={
RateLimitError: "RATE_LIMITED",
TimeoutError: "TIMEOUT",
},
)
def fetch_api_data(url: str) -> TaskResult[dict, TaskError]:
# Unhandled RateLimitError/TimeoutError automatically mapped and retried
...

See retry-policy for configuration details.

Handling Upstream Failures in Chained Tasks

Section titled “Handling Upstream Failures in Chained Tasks”

Tasks wired with args_from receive the upstream TaskResult, not just the raw success value. Use this workflow example as the canonical guard pattern:

from dataclasses import dataclass
from horsies import (
Horsies,
AppConfig,
PostgresConfig,
TaskResult,
TaskError,
TaskNode,
OnError,
Ok,
Err,
)
config = AppConfig(
broker=PostgresConfig(
database_url="postgresql+psycopg://user:password@localhost:5432/mydb",
),
)
app = Horsies(config)
@dataclass
class ProductRecord:
product_id: str
name: str
price_cents: int
@app.task("fetch_product_page")
def fetch_product_page(url: str) -> TaskResult[str, TaskError]:
if "missing" in url:
return TaskResult(err=TaskError(
error_code="FETCH_FAILED",
message=f"Could not fetch {url}",
))
return TaskResult(ok="<html>...</html>")
@app.task("parse_product_page")
def parse_product_page(
page_result: TaskResult[str, TaskError],
) -> TaskResult[ProductRecord, TaskError]:
if page_result.is_err():
return TaskResult(err=TaskError(
error_code="UPSTREAM_FAILED",
message="Cannot parse product page: fetch failed",
))
return TaskResult(ok=ProductRecord(
product_id="product-123",
name="Widget Pro",
price_cents=1999,
))
@app.task("save_product_record")
def save_product_record(
product_result: TaskResult[ProductRecord, TaskError],
) -> TaskResult[None, TaskError]:
if product_result.is_err():
return TaskResult(err=TaskError(
error_code="UPSTREAM_FAILED",
message="Cannot save product: parse failed",
))
product = product_result.ok_value
_ = product # persist to storage
return TaskResult(ok=None)
fetch: TaskNode[str] = TaskNode(
fn=fetch_product_page,
kwargs={'url': "https://example.com/missing"},
node_id="fetch_product_page",
)
parse: TaskNode[ProductRecord] = TaskNode(
fn=parse_product_page,
waits_for=[fetch],
args_from={"page_result": fetch},
allow_failed_deps=True,
node_id="parse_product_page",
)
save: TaskNode[None] = TaskNode(
fn=save_product_record,
waits_for=[parse],
args_from={"product_result": parse},
allow_failed_deps=True,
node_id="save_product_record",
)
spec = app.workflow(
name="product_ingest",
tasks=[fetch, parse, save],
output=save,
on_error=OnError.FAIL,
)
match spec.start():
case Ok(handle):
handle.get(timeout_ms=30000)
case Err(start_err):
raise RuntimeError(f"Start failed: {start_err.code}")

This pattern only runs when the downstream task actually executes. With join="all" and allow_failed_deps=False (default), any failed or skipped dependency causes the task to be SKIPPED, so this guard code never runs. With join="any" or join="quorum", the task can still run if the success threshold is met. See Workflow Semantics for default join rules and failure propagation.

Prefer explicit returns over raising exceptions:

from horsies import RetrievalCode, Ok, Err
from instance import my_task
def process_task() -> str | None:
match my_task.send(10, 20):
case Err(send_err):
print(f"Send failed: {send_err.code} - {send_err.message}")
return None
case Ok(handle):
pass
result = handle.get(timeout_ms=5000)
if result.is_err():
error = result.err_value
match error.error_code:
case RetrievalCode.WAIT_TIMEOUT:
# Task may still be running
# check status again using task id if you need absolute decisions
return None
case RetrievalCode.TASK_NOT_FOUND:
return None
case _:
# Pass to centralized handler
handle_error(error)
return None
return result.ok_value

Create a handler that returns actions, not just logs.

Don’t raise exceptions for flow control.

# Wrong - exceptions break control flow
if result.is_err():
raise RuntimeError(f"Task failed: {result.err_value.error_code}")
# Correct - explicit return and handling
if result.is_err():
handle_error(result.err_value)
return None

Don’t ignore errors after logging.

# Wrong - logs but continues as if nothing happened
if result.is_err():
print(result.err_value.message)
# Code continues...
# Correct - handle and return explicitly
if result.is_err():
handle_error(result.err_value)
return result.err_value.error_code

Don’t manually retry errors that should be auto-retried.

# Wrong - duplicates library retry logic
if error.error_code == "RATE_LIMITED":
manually_schedule_retry(task_id)
# Correct - configure auto_retry_for on the retry policy
@app.task(
"my_task",
retry_policy=RetryPolicy.fixed([60, 120, 300], auto_retry_for=["RATE_LIMITED"]),
)
def my_task() -> TaskResult[str, TaskError]:
...