Sending Tasks
Sending Tasks
Section titled “Sending Tasks”Enqueue tasks with ::send() or ::schedule(). Both return TaskSendResult<TaskHandle<T>> — either Ok(TaskHandle) on success or Err(TaskSendError) on failure.
How To
Section titled “How To”Send a Task
Section titled “Send a Task”match my_task::send(input).await { Ok(handle) => println!("Task submitted: {}", handle.task_id()), Err(send_err) => println!("Send failed: {} - {}", send_err.code, send_err.message),}Delay Execution
Section titled “Delay Execution”use std::time::Duration;
match my_task::schedule(Duration::from_secs(60), input).await { Ok(handle) => println!("Scheduled: {}", handle.task_id()), Err(err) => println!("Schedule failed: {}", err.code),}Set a Deadline
Section titled “Set a Deadline”Use TaskSendOptions when the deadline is dynamic for this particular send:
use chrono::{Duration, Utc};use horsies::TaskSendOptions;
let deadline = Utc::now() + Duration::minutes(5);
let handle = my_task::with_options( TaskSendOptions::new().good_until(deadline),).send(input).await?;For scheduled sends:
let handle = my_task::with_options( TaskSendOptions::new().good_until(deadline),).schedule(std::time::Duration::from_secs(60), input).await?;good_until is an absolute UTC deadline. It is not relative to the schedule delay.
If the scheduled task has not started by that instant, it expires instead of running.
For workflow tasks, prefer setting the deadline on the node when building the workflow spec:
let node = my_task::node()? .set_input(input)? .good_until(deadline);Wait for Result
Section titled “Wait for Result”use std::time::Duration;
match my_task::send(input).await { Ok(handle) => { // Wait with timeout let result = handle.get(Some(Duration::from_secs(5))).await;
// Wait indefinitely let result = handle.get(None).await; } Err(err) => println!("Send failed: {}", err.code),}Fire and Forget
Section titled “Fire and Forget”// Send without waiting for result — drop the handlelet _ = my_task::send(input).await;Pass Complex Arguments
Section titled “Pass Complex Arguments”Arguments must implement Serialize and Deserialize:
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]struct OrderInput { id: i64, items: Vec<String>, metadata: HashMap<String, serde_json::Value>,}
match process_order::send(OrderInput { id: 123, items: vec!["a".into(), "b".into()], metadata: HashMap::new(),}).await { Ok(handle) => { /* ... */ } Err(err) => println!("Send failed: {}", err.code),}Retrying Failed Sends
Section titled “Retrying Failed Sends”When send() fails with EnqueueFailed (a transient broker error), use the retry methods to replay the exact same payload. The enqueue_sha on the stored TaskSendPayload guarantees the retry carries the identical serialized payload.
match my_task::send(input).await { Ok(handle) => { /* ... */ } Err(send_err) if send_err.retryable => { let task_fn = my_task::handle(&rt)?; match task_fn.retry_send(&send_err).await { Ok(handle) => println!("Retry succeeded: {}", handle.task_id()), Err(retry_err) => println!("Retry failed: {}", retry_err.code), } } Err(send_err) => { println!("Permanent failure: {}", send_err.code); }}Automatic Retry via Config
Section titled “Automatic Retry via Config”Set resend_on_transient_err = true in AppConfig to have the library automatically retry transient enqueue failures (up to 3 times with exponential backoff):
let config = AppConfig { resend_on_transient_err: true, ..AppConfig::for_database_url("postgresql://...")};API Reference
Section titled “API Reference”task_name::send(args) -> TaskSendResult<TaskHandle<T>>
Section titled “task_name::send(args) -> TaskSendResult<TaskHandle<T>>”Enqueue task for immediate execution.
| Parameter | Type | Description |
|---|---|---|
args | A: Serialize | Task arguments |
task_name::schedule(delay, args) -> TaskSendResult<TaskHandle<T>>
Section titled “task_name::schedule(delay, args) -> TaskSendResult<TaskHandle<T>>”Enqueue task for delayed execution.
| Parameter | Type | Description |
|---|---|---|
delay | Duration | Time to wait before task becomes claimable |
args | A: Serialize | Task arguments |
TaskFunction<A, T>
Section titled “TaskFunction<A, T>”| Method | Returns | Description |
|---|---|---|
.send(args) | TaskSendResult<TaskHandle<T>> | Enqueue immediately |
.schedule(delay, args) | TaskSendResult<TaskHandle<T>> | Enqueue with delay |
.with_options(options) | TaskFunctionSendOptions<'_, A, T> | Bind per-send options |
.retry_send(&err) | TaskSendResult<TaskHandle<T>> | Retry failed send |
.retry_schedule(&err) | TaskSendResult<TaskHandle<T>> | Retry failed schedule |
.task_name() | &str | Task name |
.queue_name() | &str | Assigned queue |
.priority() | u32 | Effective priority |
TaskSendOptions
Section titled “TaskSendOptions”TaskSendOptions::new().good_until(deadline)good_until is the last valid time for the task to begin execution. A task whose deadline passes while still Pending or Claimed transitions to Expired without running user code.
TaskSendResult<T>
Section titled “TaskSendResult<T>”Type alias: Result<T, TaskSendError>.
TaskSendError
Section titled “TaskSendError”| Field | Type | Description |
|---|---|---|
code | TaskSendErrorCode | Failure category |
message | String | Human-readable description |
retryable | bool | Whether the caller can retry with the same payload |
task_id | Option<String> | Generated task ID |
payload | Option<TaskSendPayload> | Serialized envelope for replay |
TaskSendErrorCode
Section titled “TaskSendErrorCode”| Code | Description | Retryable |
|---|---|---|
SendSuppressed | Send suppressed during check phase | No |
ValidationFailed | Argument serialization or validation failed | No |
EnqueueFailed | Broker/database failure during enqueue | Yes |
PayloadMismatch | Retry payload SHA does not match | No |
TaskHandle<T>
Section titled “TaskHandle<T>”| Method | Returns | Description |
|---|---|---|
.task_id() | &str | Unique task identifier |
.get(timeout) | TaskResult<T> | Wait for result |
.info(include_result, include_failed_reason, include_attempts) | BrokerResult<Option<TaskInfo>> | Fetch task metadata |