Skip to content

Sending Tasks

Enqueue tasks with .send(), .send_async(), or .schedule(). All return a TaskSendResult[TaskHandle[T]] — a Result type that is either Ok(TaskHandle) on success or Err(TaskSendError) on failure.

from horsies import Ok, Err
from instance import my_task
match my_task.send(arg1, arg2, key="value"):
case Ok(handle):
print(f"Task submitted: {handle.task_id}")
case Err(send_err):
print(f"Send failed: {send_err.code} - {send_err.message}")
from horsies import Ok, Err
async def my_endpoint():
match await my_task.send_async(arg1, arg2):
case Ok(handle):
return {"task_id": handle.task_id}
case Err(send_err):
return {"error": send_err.message}

send_async() only enqueues the task. Use handle.get_async() if you want to wait for completion.

from horsies import Ok, Err
match my_task.schedule(60, arg1, arg2):
case Ok(handle):
print(f"Scheduled: {handle.task_id}")
case Err(err):
print(f"Schedule failed: {err.code}")
from horsies import Ok, Err
match my_task.send(arg1, arg2):
case Ok(handle):
# Blocking wait
result = handle.get()
# With timeout (milliseconds)
result = handle.get(timeout_ms=5000)
# Async wait
result = await handle.get_async(timeout_ms=5000)
case Err(err):
print(f"Send failed: {err.code}")

get_async() waits via broker notifications (LISTEN/NOTIFY) with a polling fallback.

# Send without waiting for result -- discard the TaskSendResult
my_task.send(arg1, arg2)

Arguments must be JSON-serializable. Pydantic models and dataclass instances are supported directly and rehydrated on the worker side.

from horsies import Ok, Err
match process.send(data={"key": "value", "nested": {"a": 1}}, items=[1, 2, 3]):
case Ok(handle):
result = handle.get()
case Err(err):
print(f"Send failed: {err.code}")
# Pydantic models - pass the instance to preserve type metadata
order = Order(id=123, items=["a", "b"])
match process_order.send(order=order):
case Ok(handle):
result = handle.get()
case Err(err):
print(f"Send failed: {err.code}")

Pydantic models and dataclasses must be defined in importable modules (not __main__ and not inside functions).

# Runs immediately in current process
result = my_task("arg1", "arg2")

Direct calls bypass the queue entirely. Library features do not apply:

  • No retries (retry_policy)
  • No persistence (task not recorded in database)
  • No worker distribution
  • No scheduling

Use only for unit testing. For production, always use .send() or .send_async().

Don’t call .send() at module level.

tasks.py
# Wrong - returns Err(TaskSendError(SEND_SUPPRESSED)) during worker import
result = my_task.send("test") # Err(SEND_SUPPRESSED)
# Correct - call from functions/endpoints
def process():
match my_task.send("test"):
case Ok(handle):
...
case Err(err):
...

Don’t pass non-serializable objects.

# Wrong
my_task.send(connection=db_connection)
# Correct
my_task.send(connection_url=str(db_connection.url))

When .send() fails with ENQUEUE_FAILED (a transient broker error), use the retry methods to replay the exact same payload without re-supplying arguments. The enqueue_sha on the stored TaskSendPayload guarantees the retry carries the identical serialized payload.

from horsies import Ok, Err
match my_task.send(arg1, arg2):
case Ok(handle):
result = handle.get()
case Err(err) if err.retryable:
match my_task.retry_send(err):
case Ok(handle):
result = handle.get()
case Err(retry_err):
print(f"Retry failed: {retry_err.code}")
case Err(err):
print(f"Permanent failure: {err.code}")

Retry methods only accept ENQUEUE_FAILED errors. Passing SEND_SUPPRESSED, VALIDATION_FAILED, or PAYLOAD_MISMATCH returns Err(TaskSendError(VALIDATION_FAILED)).

Set resend_on_transient_err=True in AppConfig to have the library automatically retry transient enqueue failures before returning the error:

config = AppConfig(
resend_on_transient_err=True,
# ...
)

.send(*args, **kwargs) -> TaskSendResult[TaskHandle[T]]

Section titled “.send(*args, **kwargs) -> TaskSendResult[TaskHandle[T]]”

Enqueue task for immediate execution.

ParameterTypeDescription
*argstask argsPositional arguments for the task
**kwargstask kwargsKeyword arguments for the task

Returns: TaskSendResult[TaskHandle[T]]Ok(TaskHandle) on success, Err(TaskSendError) on failure.

.send_async(*args, **kwargs) -> TaskSendResult[TaskHandle[T]]

Section titled “.send_async(*args, **kwargs) -> TaskSendResult[TaskHandle[T]]”

Async variant of .send(). Use in async code (FastAPI, etc.). This does not execute the task locally; it only enqueues.

Returns: TaskSendResult[TaskHandle[T]]

.schedule(delay, *args, **kwargs) -> TaskSendResult[TaskHandle[T]]

Section titled “.schedule(delay, *args, **kwargs) -> TaskSendResult[TaskHandle[T]]”

Enqueue task for delayed execution.

ParameterTypeDescription
delayintSeconds to wait before task becomes claimable
*argstask argsPositional arguments for the task
**kwargstask kwargsKeyword arguments for the task

Returns: TaskSendResult[TaskHandle[T]]

.retry_send(error) -> TaskSendResult[TaskHandle[T]]

Section titled “.retry_send(error) -> TaskSendResult[TaskHandle[T]]”

Retry a failed send using the stored payload from the error. Only valid for ENQUEUE_FAILED errors.

ParameterTypeDescription
errorTaskSendErrorThe error from a previous .send() call

Returns: TaskSendResult[TaskHandle[T]]

.retry_send_async(error) -> TaskSendResult[TaskHandle[T]]

Section titled “.retry_send_async(error) -> TaskSendResult[TaskHandle[T]]”

Async variant of .retry_send().

.retry_schedule(error) -> TaskSendResult[TaskHandle[T]]

Section titled “.retry_schedule(error) -> TaskSendResult[TaskHandle[T]]”

Retry a failed schedule using the stored payload. Only valid for ENQUEUE_FAILED errors that originated from .schedule().

ParameterTypeDescription
errorTaskSendErrorThe error from a previous .schedule() call

Returns: TaskSendResult[TaskHandle[T]]

Type alias: Result[T, TaskSendError]. The Ok side is TaskHandle[T] when returned from send methods.

Property/MethodTypeDescription
.is_ok()boolTrue if send succeeded
.is_err()boolTrue if send failed
.ok_valueTThe TaskHandle; raises ValueError if error
.err_valueTaskSendErrorThe error; raises ValueError if success

Use is_ok(result) / is_err(result) from horsies as type-narrowing guards.

FieldTypeDescription
codeTaskSendErrorCodeFailure category
messagestrHuman-readable description
retryableboolWhether the caller can retry with the same payload
task_idstr | NoneGenerated task ID (None for SEND_SUPPRESSED, VALIDATION_FAILED)
payloadTaskSendPayload | NoneSerialized envelope for replay (None when no serialization happened)
exceptionBaseException | NoneThe original cause, if any
CodeDescriptionRetryable
SEND_SUPPRESSEDSend suppressed during worker import/discoveryNo
VALIDATION_FAILEDArgument serialization or validation failedNo
ENQUEUE_FAILEDBroker/database failure during enqueueYes
PAYLOAD_MISMATCHRetry payload SHA does not match (payload was altered)No
Property/MethodTypeDescription
.task_idstrUnique task identifier
.get(timeout_ms=None)TaskResult[T, TaskError]Wait for result (blocking)
.get_async(timeout_ms=None)TaskResult[T, TaskError]Wait for result (async)
.info(include_result=False, include_failed_reason=False)BrokerResult[TaskInfo | None]Fetch task metadata from broker
.info_async(include_result=False, include_failed_reason=False)BrokerResult[TaskInfo | None]Async variant of .info()