Skip to content

Defining Tasks

Define tasks by decorating functions with @app.task() and returning TaskResult[T, TaskError]. For argument serialization rules, see sending-tasks.

Note: Task functions must be synchronous (def). async def task functions are not supported yet. Use .send_async() / .get_async() on the producer side for non-blocking I/O.

from horsies import Horsies, AppConfig, PostgresConfig, TaskResult, TaskError
config = AppConfig(
broker=PostgresConfig(database_url="postgresql+psycopg://..."),
)
app = Horsies(config)
@app.task("process_order")
def process_order(order_id: int) -> TaskResult[str, TaskError]:
# Do work
return TaskResult(ok=f"Order {order_id} processed")
@app.task("compute")
def compute(x: int) -> TaskResult[int, TaskError]:
return TaskResult(ok=x * 2)

For void tasks:

@app.task("fire_and_forget")
def fire_and_forget() -> TaskResult[None, TaskError]:
do_something()
return TaskResult(ok=None)

Domain errors (expected failures):

@app.task("validate_input")
def validate_input(data: dict) -> TaskResult[dict, TaskError]:
if not data.get("email"):
return TaskResult(err=TaskError(
error_code="MISSING_EMAIL",
message="Email is required",
data={"field": "email"},
))
return TaskResult(ok=data)

Unhandled exceptions are wrapped automatically:

@app.task("might_crash")
def might_crash() -> TaskResult[str, TaskError]:
raise ValueError("Something went wrong")
# Becomes TaskResult(err=TaskError(
# error_code=OperationalErrorCode.UNHANDLED_EXCEPTION,
# message="Unhandled exception in task might_crash: ...",
# ))

With explicit try/except:

from horsies import RetryPolicy
@app.task(
"some_api_call",
retry_policy=RetryPolicy.exponential(
base_seconds=30,
max_retries=5,
auto_retry_for=["TIMEOUT", "CONNECTION_ERROR"],
),
)
def some_api_call() -> TaskResult[dict, TaskError]:
try:
result = call_external_api()
return TaskResult(ok=result)
except TimeoutError:
return TaskResult(err=TaskError(error_code="TIMEOUT", message="Request timed out"))
except ConnectionError:
return TaskResult(err=TaskError(error_code="CONNECTION_ERROR", message="Connection failed"))

Or with the exception mapper (no try/except needed):

@app.task(
"some_api_call",
retry_policy=RetryPolicy.exponential(
base_seconds=30,
max_retries=5,
auto_retry_for=["TIMEOUT", "CONNECTION_ERROR"],
),
exception_mapper={
TimeoutError: "TIMEOUT",
ConnectionError: "CONNECTION_ERROR",
},
)
def some_api_call() -> TaskResult[dict, TaskError]:
result = call_external_api()
return TaskResult(ok=result)

For backoff strategies, jitter, and auto-retry triggers, see Retry Policy.

@app.task("urgent_task", queue_name="critical")
def urgent_task() -> TaskResult[str, TaskError]:
...
# Dotted module paths (recommended)
app.discover_tasks(["myapp.tasks", "myapp.jobs.worker_tasks"])
# Or file paths
app.discover_tasks(["tasks.py", "more_tasks.py"])

discover_tasks imports the exact entries listed. Dotted module paths use importlib.import_module(), while .py entries are imported by file path — it does not recursively scan submodules. To discover tasks in myapp.tasks.scraping, either list it explicitly or export the decorated functions from myapp.tasks.__init__.py.

Don’t omit the return type annotation.

# Wrong - raises TypeError at definition time
@app.task("bad_task")
def bad_task():
return {"status": "done"}
# Correct
@app.task("good_task")
def good_task() -> TaskResult[dict, TaskError]:
return TaskResult(ok={"status": "done"})

Don’t reuse task names.

@app.task("duplicate_name")
def task_one() -> TaskResult[str, TaskError]:
...
@app.task("duplicate_name") # Raises error
def task_two() -> TaskResult[str, TaskError]:
...
ParameterTypeRequiredDescription
task_namestrYesUnique task identifier
queue_namestrNoTarget queue (CUSTOM mode only)
retry_policyRetryPolicyNoRetry timing, backoff, and auto-retry triggers
good_untildatetime | NoneNoTask expiry deadline (set at definition time)
exception_mapperdict[type[BaseException], str]NoMaps exception classes to error codes
default_unhandled_error_codestrNoError code for unmapped exceptions (overrides global)

Returns: Decorated function that can be called directly or via .send() / .send_async().