Error Handling
Error Handling
Section titled “Error Handling”Handle errors explicitly through dedicated handlers or explicit returns. Avoid raising exceptions - they break control flow and obscure error paths.
For exhaustive error code reference, see errors.
This document means to be a pattern guide rather than definitive way to handle your errors. Adjust to your needs.
Why and When
Section titled “Why and When”Error Categories
Section titled “Error Categories”| Category | Source | Examples | Auto-Retry? |
|---|---|---|---|
| Retrieval errors | handle.get() | WAIT_TIMEOUT, BROKER_ERROR | No |
| Execution errors | Task raised exception or returned invalid value | TASK_EXCEPTION, WORKER_CRASHED | If in auto_retry_for |
| Domain errors | User returned error | "RATE_LIMITED", "VALIDATION_FAILED" | If in auto_retry_for |
When to Handle Errors
Section titled “When to Handle Errors”Handle errors when:
- The error is a retrieval error (task may still complete)
- The error is not in
auto_retry_for(won’t be retried) - The task has exhausted retries (final failure)
How To
Section titled “How To”Configure Automatic Retries
Section titled “Configure Automatic Retries”The library handles retries automatically when auto_retry_for matches the error:
from horsies import RetryPolicy, TaskResult, TaskError
@app.task( "fetch_api_data", retry_policy=RetryPolicy.exponential( base_seconds=30, max_retries=3, auto_retry_for=["TASK_EXCEPTION", "RATE_LIMITED"], ),)def fetch_api_data(url: str) -> TaskResult[dict, TaskError]: # If this returns a RATE_LIMITED error or raises an unhandled exception, # the worker automatically retries up to 3 times ...To avoid try/except boilerplate for mapping exceptions to error codes, use exception_mapper:
@app.task( "fetch_api_data", retry_policy=RetryPolicy.exponential( base_seconds=30, max_retries=3, auto_retry_for=["RATE_LIMITED", "TIMEOUT"], ), exception_mapper={ RateLimitError: "RATE_LIMITED", TimeoutError: "TIMEOUT", },)def fetch_api_data(url: str) -> TaskResult[dict, TaskError]: # Unhandled RateLimitError/TimeoutError automatically mapped and retried ...See retry-policy for configuration details.
Handling Upstream Failures in Chained Tasks
Section titled “Handling Upstream Failures in Chained Tasks”Tasks wired with args_from receive the upstream TaskResult, not just the raw success value. Use this workflow example as the canonical guard pattern:
from dataclasses import dataclass
from horsies import ( Horsies, AppConfig, PostgresConfig, TaskResult, TaskError, TaskNode, OnError, Ok, Err,)
config = AppConfig( broker=PostgresConfig( database_url="postgresql+psycopg://user:password@localhost:5432/mydb", ),)app = Horsies(config)
@dataclassclass ProductRecord: product_id: str name: str price_cents: int
@app.task("fetch_product_page")def fetch_product_page(url: str) -> TaskResult[str, TaskError]: if "missing" in url: return TaskResult(err=TaskError( error_code="FETCH_FAILED", message=f"Could not fetch {url}", )) return TaskResult(ok="<html>...</html>")
@app.task("parse_product_page")def parse_product_page( page_result: TaskResult[str, TaskError],) -> TaskResult[ProductRecord, TaskError]: if page_result.is_err(): return TaskResult(err=TaskError( error_code="UPSTREAM_FAILED", message="Cannot parse product page: fetch failed", )) return TaskResult(ok=ProductRecord( product_id="product-123", name="Widget Pro", price_cents=1999, ))
@app.task("save_product_record")def save_product_record( product_result: TaskResult[ProductRecord, TaskError],) -> TaskResult[None, TaskError]: if product_result.is_err(): return TaskResult(err=TaskError( error_code="UPSTREAM_FAILED", message="Cannot save product: parse failed", )) product = product_result.ok_value _ = product # persist to storage return TaskResult(ok=None)
fetch: TaskNode[str] = TaskNode( fn=fetch_product_page, kwargs={'url': "https://example.com/missing"}, node_id="fetch_product_page",)parse: TaskNode[ProductRecord] = TaskNode( fn=parse_product_page, waits_for=[fetch], args_from={"page_result": fetch}, allow_failed_deps=True, node_id="parse_product_page",)save: TaskNode[None] = TaskNode( fn=save_product_record, waits_for=[parse], args_from={"product_result": parse}, allow_failed_deps=True, node_id="save_product_record",)
spec = app.workflow( name="product_ingest", tasks=[fetch, parse, save], output=save, on_error=OnError.FAIL,)match spec.start(): case Ok(handle): handle.get(timeout_ms=30000) case Err(start_err): raise RuntimeError(f"Start failed: {start_err.code}")This pattern only runs when the downstream task actually executes. With join="all" and allow_failed_deps=False (default), any failed or skipped dependency causes the task to be SKIPPED, so this guard code never runs. With join="any" or join="quorum", the task can still run if the success threshold is met. See Workflow Semantics for default join rules and failure propagation.
Handle Errors with Explicit Returns
Section titled “Handle Errors with Explicit Returns”Prefer explicit returns over raising exceptions:
from horsies import RetrievalCode, Ok, Errfrom instance import my_task
def process_task() -> str | None: match my_task.send(10, 20): case Err(send_err): print(f"Send failed: {send_err.code} - {send_err.message}") return None case Ok(handle): pass
result = handle.get(timeout_ms=5000)
if result.is_err(): error = result.err_value match error.error_code: case RetrievalCode.WAIT_TIMEOUT: # Task may still be running # check status again using task id if you need absolute decisions return None case RetrievalCode.TASK_NOT_FOUND: return None case _: # Pass to centralized handler handle_error(error) return None
return result.ok_valueCentralized Error Handler
Section titled “Centralized Error Handler”Create a handler that returns actions, not just logs.
Things to Avoid
Section titled “Things to Avoid”Don’t raise exceptions for flow control.
# Wrong - exceptions break control flowif result.is_err(): raise RuntimeError(f"Task failed: {result.err_value.error_code}")
# Correct - explicit return and handlingif result.is_err(): handle_error(result.err_value) return NoneDon’t ignore errors after logging.
# Wrong - logs but continues as if nothing happenedif result.is_err(): print(result.err_value.message)# Code continues...
# Correct - handle and return explicitlyif result.is_err(): handle_error(result.err_value) return result.err_value.error_codeDon’t manually retry errors that should be auto-retried.
# Wrong - duplicates library retry logicif error.error_code == "RATE_LIMITED": manually_schedule_retry(task_id)
# Correct - configure auto_retry_for on the retry policy@app.task( "my_task", retry_policy=RetryPolicy.fixed([60, 120, 300], auto_retry_for=["RATE_LIMITED"]),)def my_task() -> TaskResult[str, TaskError]: ...