Skip to content

Defining Tasks

Define tasks by decorating functions with @app.task() and returning TaskResult[T, TaskError]. For argument serialization rules, see sending-tasks.

Note: Task functions must be synchronous (def). async def task functions are not supported yet. Use .send_async() / .get_async() on the producer side for non-blocking I/O.

from horsies import Horsies, AppConfig, PostgresConfig, TaskResult, TaskError, JsonValue
config = AppConfig(
broker=PostgresConfig(database_url="postgresql+psycopg://..."),
)
app = Horsies(config)
@app.task("process_order")
def process_order(*, order_id: int) -> TaskResult[str, TaskError]:
# Do work
return TaskResult(ok=f"Order {order_id} processed")

Every task parameter must be keyword-only: declare them after a bare *,. The leading * forces callers to pass arguments by name.

@app.task("send_email")
def send_email(*, to: str, subject: str) -> TaskResult[None, TaskError]:
...

The queue transmits arguments by name only — the signature is the wire contract. A positionally-bindable parameter (def f(order_id: int)) is rejected at registration, so the error surfaces when the app loads, not at send time. With keyword-only parameters, a positional call (process_order.send(1)) also fails the type checker at the call site rather than at runtime.

@app.task("compute")
def compute(*, x: int) -> TaskResult[int, TaskError]:
return TaskResult(ok=x * 2)

For void tasks:

@app.task("fire_and_forget")
def fire_and_forget() -> TaskResult[None, TaskError]:
do_something()
return TaskResult(ok=None)

Domain errors (expected failures):

@app.task("validate_input")
def validate_input(*, data: dict[str, JsonValue]) -> TaskResult[dict[str, JsonValue], TaskError]:
if not data.get("email"):
return TaskResult(err=TaskError(
error_code="MISSING_EMAIL",
message="Email is required",
data={"field": "email"},
))
return TaskResult(ok=data)

Unhandled exceptions are wrapped automatically:

@app.task("might_crash")
def might_crash() -> TaskResult[str, TaskError]:
raise ValueError("Something went wrong")
# Becomes TaskResult(err=TaskError(
# error_code=OperationalErrorCode.UNHANDLED_EXCEPTION,
# message="Unhandled exception in task might_crash: ...",
# ))

With explicit try/except:

from horsies import RetryPolicy, TaskResult, TaskError, JsonValue
@app.task(
"some_api_call",
retry_policy=RetryPolicy.exponential(
base_seconds=30,
max_retries=5,
auto_retry_for=["TIMEOUT", "CONNECTION_ERROR"],
),
)
def some_api_call() -> TaskResult[dict[str, JsonValue], TaskError]:
try:
result = call_external_api()
return TaskResult(ok=result)
except TimeoutError:
return TaskResult(err=TaskError(error_code="TIMEOUT", message="Request timed out"))
except ConnectionError:
return TaskResult(err=TaskError(error_code="CONNECTION_ERROR", message="Connection failed"))

Or with the exception mapper (no try/except needed):

@app.task(
"some_api_call",
retry_policy=RetryPolicy.exponential(
base_seconds=30,
max_retries=5,
auto_retry_for=["TIMEOUT", "CONNECTION_ERROR"],
),
exception_mapper={
TimeoutError: "TIMEOUT",
ConnectionError: "CONNECTION_ERROR",
},
)
def some_api_call() -> TaskResult[dict[str, JsonValue], TaskError]:
result = call_external_api()
return TaskResult(ok=result)

For backoff strategies, jitter, and auto-retry triggers, see Retry Policy.

@app.task("urgent_task", queue_name="critical")
def urgent_task() -> TaskResult[str, TaskError]:
...

timeout_ms bounds task execution time, measured from the moment a worker dispatches the task to a child process. Minimum 1000 (1 second).

@app.task("resize_video", timeout_ms=120_000)
def resize_video(*, video_id: str) -> TaskResult[str, TaskError]:
...

On expiry, the worker kills the child process and the task fails with TASK_TIMEOUT. If the deadline elapses before user code starts, the task is requeued instead. Timeouts are terminal by default; opt into retries by adding "TASK_TIMEOUT" to auto_retry_for:

@app.task(
"resize_video",
timeout_ms=120_000,
retry_policy=RetryPolicy.fixed([60, 300], auto_retry_for=["TASK_TIMEOUT"]),
)
def resize_video(*, video_id: str) -> TaskResult[str, TaskError]:
...

Killing the child restarts the worker’s process pool: sibling tasks running on the same worker are interrupted and recovered through crash recovery (requeued, or retried per their own policies). Size timeout_ms as a hang backstop well above normal runtime, not as routine control flow.

# Dotted module paths (recommended)
app.discover_tasks(["myapp.tasks", "myapp.jobs.worker_tasks"])
# Or file paths
app.discover_tasks(["tasks.py", "more_tasks.py"])

discover_tasks imports the exact entries listed. Dotted module paths use importlib.import_module(), while .py entries are imported by file path — it does not recursively scan submodules. To discover tasks in myapp.tasks.scraping, either list it explicitly or export the decorated functions from myapp.tasks.__init__.py.

Don’t omit the return type annotation.

# Wrong - raises TypeError at definition time
@app.task("bad_task")
def bad_task():
return {"status": "done"}
# Correct
@app.task("good_task")
def good_task() -> TaskResult[dict[str, JsonValue], TaskError]:
return TaskResult(ok={"status": "done"})

Don’t reuse task names.

@app.task("duplicate_name")
def task_one() -> TaskResult[str, TaskError]:
...
@app.task("duplicate_name") # Raises error
def task_two() -> TaskResult[str, TaskError]:
...
ParameterTypeRequiredDescription
task_namestrYesUnique task identifier
queue_namestrNoTarget queue (CUSTOM mode only)
retry_policyRetryPolicyNoRetry timing, backoff, and auto-retry triggers
exception_mapperdict[type[BaseException], str]NoMaps exception classes to error codes
default_unhandled_error_codestrNoError code for unmapped exceptions (overrides global)
timeout_msintNoExecution time limit in milliseconds (>= 1000), measured from dispatch. On expiry the child process is killed and the task fails with TASK_TIMEOUT

Returns: Decorated function that can be called directly or via .send() / .send_async().

good_until is not accepted here — task expiry deadlines are a per-send concern. See Sending Tasks — Set a Task Deadline.