Skip to content

Sending Tasks

Enqueue tasks with ::send() or ::schedule(). Both return TaskSendResult<TaskHandle<T>> — either Ok(TaskHandle) on success or Err(TaskSendError) on failure.

match my_task::send(input).await {
Ok(handle) => println!("Task submitted: {}", handle.task_id()),
Err(send_err) => println!("Send failed: {} - {}", send_err.code, send_err.message),
}
use std::time::Duration;
match my_task::schedule(Duration::from_secs(60), input).await {
Ok(handle) => println!("Scheduled: {}", handle.task_id()),
Err(err) => println!("Schedule failed: {}", err.code),
}

Use TaskSendOptions when the deadline is dynamic for this particular send:

use chrono::{Duration, Utc};
use horsies::TaskSendOptions;
let deadline = Utc::now() + Duration::minutes(5);
let handle = my_task::with_options(
TaskSendOptions::new().good_until(deadline),
)
.send(input)
.await?;

For scheduled sends:

let handle = my_task::with_options(
TaskSendOptions::new().good_until(deadline),
)
.schedule(std::time::Duration::from_secs(60), input)
.await?;

good_until is an absolute UTC deadline. It is not relative to the schedule delay. If the scheduled task has not started by that instant, it expires instead of running.

For workflow tasks, prefer setting the deadline on the node when building the workflow spec:

let node = my_task::node()?
.set_input(input)?
.good_until(deadline);
use std::time::Duration;
match my_task::send(input).await {
Ok(handle) => {
// Wait with timeout
let result = handle.get(Some(Duration::from_secs(5))).await;
// Wait indefinitely
let result = handle.get(None).await;
}
Err(err) => println!("Send failed: {}", err.code),
}
// Send without waiting for result — drop the handle
let _ = my_task::send(input).await;

Arguments must implement Serialize and Deserialize:

use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
struct OrderInput {
id: i64,
items: Vec<String>,
metadata: HashMap<String, serde_json::Value>,
}
match process_order::send(OrderInput {
id: 123,
items: vec!["a".into(), "b".into()],
metadata: HashMap::new(),
}).await {
Ok(handle) => { /* ... */ }
Err(err) => println!("Send failed: {}", err.code),
}

When send() fails with EnqueueFailed (a transient broker error), use the retry methods to replay the exact same payload. The enqueue_sha on the stored TaskSendPayload guarantees the retry carries the identical serialized payload.

match my_task::send(input).await {
Ok(handle) => { /* ... */ }
Err(send_err) if send_err.retryable => {
let task_fn = my_task::handle(&rt)?;
match task_fn.retry_send(&send_err).await {
Ok(handle) => println!("Retry succeeded: {}", handle.task_id()),
Err(retry_err) => println!("Retry failed: {}", retry_err.code),
}
}
Err(send_err) => {
println!("Permanent failure: {}", send_err.code);
}
}

Set resend_on_transient_err = true in AppConfig to have the library automatically retry transient enqueue failures (up to 3 times with exponential backoff):

let config = AppConfig {
resend_on_transient_err: true,
..AppConfig::for_database_url("postgresql://...")
};

task_name::send(args) -> TaskSendResult<TaskHandle<T>>

Section titled “task_name::send(args) -> TaskSendResult<TaskHandle<T>>”

Enqueue task for immediate execution.

ParameterTypeDescription
argsA: SerializeTask arguments

task_name::schedule(delay, args) -> TaskSendResult<TaskHandle<T>>

Section titled “task_name::schedule(delay, args) -> TaskSendResult<TaskHandle<T>>”

Enqueue task for delayed execution.

ParameterTypeDescription
delayDurationTime to wait before task becomes claimable
argsA: SerializeTask arguments
MethodReturnsDescription
.send(args)TaskSendResult<TaskHandle<T>>Enqueue immediately
.schedule(delay, args)TaskSendResult<TaskHandle<T>>Enqueue with delay
.with_options(options)TaskFunctionSendOptions<'_, A, T>Bind per-send options
.retry_send(&err)TaskSendResult<TaskHandle<T>>Retry failed send
.retry_schedule(&err)TaskSendResult<TaskHandle<T>>Retry failed schedule
.task_name()&strTask name
.queue_name()&strAssigned queue
.priority()u32Effective priority
TaskSendOptions::new().good_until(deadline)

good_until is the last valid time for the task to begin execution. A task whose deadline passes while still Pending or Claimed transitions to Expired without running user code.

Type alias: Result<T, TaskSendError>.

FieldTypeDescription
codeTaskSendErrorCodeFailure category
messageStringHuman-readable description
retryableboolWhether the caller can retry with the same payload
task_idOption<String>Generated task ID
payloadOption<TaskSendPayload>Serialized envelope for replay
CodeDescriptionRetryable
SendSuppressedSend suppressed during check phaseNo
ValidationFailedArgument serialization or validation failedNo
EnqueueFailedBroker/database failure during enqueueYes
PayloadMismatchRetry payload SHA does not matchNo
MethodReturnsDescription
.task_id()&strUnique task identifier
.get(timeout)TaskResult<T>Wait for result
.info(include_result, include_failed_reason, include_attempts)BrokerResult<Option<TaskInfo>>Fetch task metadata