Skip to content

Error Handling

Handle errors explicitly through pattern matching and explicit returns. Avoid panicking for flow control — panics break control flow and obscure error paths.

For exhaustive error code reference, see errors.

This document means to be a pattern guide rather than definitive way to handle your errors. Adjust to your needs.

CategorySourceExamplesAuto-Retry?
Retrieval/transient errorshandle.get() or broker readsWaitTimeout, TaskNotFound, BrokerErrorNo
Execution errorsTask panicked or worker/runtime failedUnhandledError, WorkerCrashedIf in auto_retry_for
Domain errorsYour task returned TaskError::new(...)"RATE_LIMITED", "VALIDATION_FAILED"If in auto_retry_for

Handle errors when:

  • The error is a retrieval error (task may still complete)
  • The error is not in auto_retry_for (won’t be retried)
  • The task has exhausted retries (final failure)

The library handles retries automatically when auto_retry_for matches the error:

#[task(
"fetch_api_data",
auto_retry_for = ["RATE_LIMITED", "TIMEOUT"],
retry_policy = RetryPolicy::exponential(30, 3, true)?
)]
async fn fetch_api_data(input: ApiInput) -> Result<ApiResponse, TaskError> {
// If this returns a RATE_LIMITED error, the worker automatically
// retries up to 3 times with exponential backoff
match call_api(&input.url).await {
Ok(response) => Ok(response),
Err(e) if e.is_rate_limited() => {
Err(TaskError::new("RATE_LIMITED", "Rate limited"))
}
Err(e) if e.is_timeout() => {
Err(TaskError::new("TIMEOUT", "Request timed out"))
}
Err(e) => Err(TaskError::new("API_ERROR", e.to_string())),
}
}

See retry-policy for configuration details.

Handling Upstream Failures in Chained Tasks

Section titled “Handling Upstream Failures in Chained Tasks”

Tasks wired with args_from in workflows receive the deserialized upstream result. When using allow_failed_deps, the task may receive an error result from the upstream node:

#[task("parse_product_page")]
async fn parse_product_page(
page_result: TaskResult<String>,
) -> Result<ProductRecord, TaskError> {
match page_result {
TaskResult::Err(err) => {
Err(TaskError::new(
"UPSTREAM_FAILED",
format!("Cannot parse: fetch failed - {:?}", err.error_code),
))
}
TaskResult::Ok(html) => {
Ok(ProductRecord {
product_id: "product-123".into(),
name: "Widget Pro".into(),
price_cents: 1999,
})
}
}
}

This pattern only runs when the downstream task actually executes. With JoinType::All and allow_failed_deps = false (default), any failed dependency causes the task to be Skipped. See Workflow Semantics for join rules and failure propagation.

use std::time::Duration;
use horsies::TaskResult;
async fn process_task(input: MyInput) -> Option<String> {
let handle = match my_task::send(input).await {
Ok(h) => h,
Err(send_err) => {
println!("Send failed: {} - {}", send_err.code, send_err.message);
return None;
}
};
let result = handle.get(Some(Duration::from_secs(5))).await;
match result {
TaskResult::Ok(value) => Some(value),
TaskResult::Err(err) => {
if err.is_transient() {
// Task may still be running, check status again
println!("Transient error, task may complete later");
} else {
handle_error(&err);
}
None
}
}
}

Don’t panic for flow control.

// Wrong - panics break control flow
if result.is_err() {
panic!("Task failed: {:?}", result.unwrap_err().error_code);
}
// Correct - explicit return and handling
if result.is_err() {
handle_error(&result.unwrap_err());
return None;
}

Don’t ignore errors after logging.

// Wrong - logs but continues as if nothing happened
if result.is_err() {
println!("{:?}", result.unwrap_err().message);
}
// Code continues...
// Correct - handle and return explicitly
match result {
TaskResult::Err(err) => {
handle_error(&err);
return Err(err);
}
TaskResult::Ok(value) => { /* use value */ }
}

Don’t manually retry errors that should be auto-retried.

// Wrong - duplicates library retry logic
if error.error_code == Some(TaskErrorCode::User("RATE_LIMITED".into())) {
manually_schedule_retry(task_id);
}
// Correct - configure auto_retry_for on the task definition
#[task(
"my_task",
auto_retry_for = ["RATE_LIMITED"],
retry_policy = RetryPolicy::fixed(vec![60, 120, 300], true)?
)]
async fn my_task() -> Result<String, TaskError> { ... }