Skip to content

Result Handling

Every task must return Result<T, TaskError>. On the wire and when retrieved, results are wrapped in TaskResult<T>:

pub enum TaskResult<T> {
Ok(T),
Err(TaskError),
}

A TaskResult contains exactly one of:

  • Ok(value): The success value (type T)
  • Err(error): The error value (type TaskError)
use horsies::{task, TaskError};
#[task("divide")]
async fn divide(input: DivideInput) -> Result<f64, TaskError> {
if input.b == 0 {
return Err(TaskError::new(
"DIVISION_BY_ZERO",
"Cannot divide by zero",
));
}
Ok(input.a as f64 / input.b as f64)
}
use std::time::Duration;
use horsies::TaskResult;
match divide::send(input).await {
Ok(handle) => {
let result: TaskResult<f64> = handle.get(Some(Duration::from_secs(5))).await;
match result {
TaskResult::Ok(value) => println!("Result: {}", value),
TaskResult::Err(err) => {
println!("Error: {:?} - {:?}", err.error_code, err.message);
}
}
}
Err(send_err) => {
println!("Send failed: {} - {}", send_err.code, send_err.message);
}
}

TaskError is a struct with optional fields:

FieldTypePurpose
error_codeOption<TaskErrorCode>Identifies the error type
messageOption<String>Human-readable description
dataOption<serde_json::Value>Additional context
causeOption<serde_json::Value>Original cause (serialized as "exception" on wire)
Err(TaskError {
error_code: Some(TaskErrorCode::User("VALIDATION_FAILED".into())),
message: Some("Input validation failed".into()),
data: Some(serde_json::json!({"field": "email", "reason": "invalid format"})),
cause: None,
})

Convenience constructors:

// User-defined error
TaskError::new("INVALID_AMOUNT", "Amount must be positive")
// Built-in error
TaskError::builtin(OperationalErrorCode::BrokerError, "connection lost")

When the library itself encounters an error (not your task code), it uses one of four error code families via TaskErrorCode::BuiltIn(BuiltInTaskCode).

CodeWhen
UnhandledErrorTask panicked or hit an uncaught error
TaskErrorTask function returned an error
WorkerCrashedWorker died (detected via heartbeat)
BrokerErrorBroker failed while retrieving the result
WorkerResolutionErrorWorker couldn’t find the task in registry
WorkerSerializationErrorResult couldn’t be serialized
ResultDeserializationErrorStored result JSON is corrupt or could not be deserialized
WorkflowEnqueueFailedWorkflow node failed during enqueue/build
SubworkflowLoadFailedSubworkflow definition could not be loaded
CodeWhen
ArgumentTypeMismatchArguments don’t match the expected type
ReturnTypeMismatchReturned value doesn’t match declared type
WorkflowCtxMissingIdWorkflow context is missing required ID

Returned by handle.get() for issues retrieving results:

CodeWhen
WaitTimeoutTimeout elapsed while waiting for result (task may still be running)
TaskNotFoundTask ID doesn’t exist in database
WorkflowNotFoundWorkflow ID doesn’t exist in database
ResultNotAvailableResult cache is empty
ResultNotReadyResult not yet available; task is still running
CodeWhen
TaskCancelledTask was cancelled before completion
TaskExpiredTask expired before execution started
WorkflowPausedWorkflow was paused
WorkflowFailedWorkflow failed
WorkflowCancelledWorkflow was cancelled
UpstreamSkippedUpstream task in workflow was skipped
SubworkflowFailedSubworkflow failed
WorkflowSuccessCaseNotMetWorkflow success condition was not satisfied
WorkflowStoppedWorkflow was stopped
SendSuppressedSend was suppressed (e.g. during check phase)

Domain errors: Your task returns an error for business logic reasons.

#[task("transfer_funds")]
async fn transfer_funds(input: TransferInput) -> Result<String, TaskError> {
if input.amount <= 0.0 {
// This is a domain error - expected, handled
return Err(TaskError::new(
"INVALID_AMOUNT",
"Amount must be positive",
));
}
Ok("Transfer complete".into())
}

Built-in errors: Something went wrong in the infrastructure.

#[task("buggy_task")]
async fn buggy_task() -> Result<String, TaskError> {
panic!("Oops"); // Becomes UnhandledError
}

Both cases result in TaskResult::Err(...), but the error codes differ.

let result = handle.get(None).await;
// Check state
result.is_ok() // true if success
result.is_err() // true if error
result.is_transient() // true if retrieval/broker error (may resolve on retry)
result.is_terminal() // true if final (not transient)
// Access values (panics if wrong variant)
result.unwrap() // Returns Ok value
result.unwrap_err() // Returns TaskError
// Safe access
result.ok() // Option<T>
// Transform
result.map(|v| v.to_string()) // Map success value
result.and_then(|v| /* ... */) // Chain operations