Skip to content

Retry Policy

from horsies import RetryPolicy
@app.task(
"flaky_task",
retry_policy=RetryPolicy.fixed([60, 300, 900], auto_retry_for=["TRANSIENT_ERROR"]),
)
def flaky_task() -> TaskResult[str, TaskError]:
# Will retry up to 3 times with delays: 1min, 5min, 15min
...
FieldTypeDefaultDescription
auto_retry_forlist[str | BuiltInTaskCode](required)Error codes that trigger automatic retries
max_retriesint3Number of retry attempts (1-20)
intervalslist[int][60, 300, 900]Delay intervals in seconds
backoff_strategystr”fixed""fixed” or “exponential”
jitterboolTrueAdd random variation to delays

Uses exact intervals from the list. The list length must match max_retries.

# Retry 3 times: wait 1min, then 5min, then 15min
RetryPolicy.fixed([60, 300, 900], auto_retry_for=["TRANSIENT_ERROR"])
# Equivalent to:
RetryPolicy(
max_retries=3,
intervals=[60, 300, 900],
backoff_strategy='fixed',
auto_retry_for=["TRANSIENT_ERROR"],
)

Uses a base interval multiplied by 2^(attempt-1).

# Base 30s: retry at 30s, 60s, 120s, 240s, 480s
RetryPolicy.exponential(base_seconds=30, max_retries=5, auto_retry_for=["TRANSIENT_ERROR"])
# Equivalent to:
RetryPolicy(
max_retries=5,
intervals=[30], # Single base interval
backoff_strategy='exponential',
auto_retry_for=["TRANSIENT_ERROR"],
)

When jitter=True (default), delays are randomized by +/-25%:

  • 60 second base -> 45-75 seconds actual
  • Prevents thundering herd when many tasks retry simultaneously
# Disable jitter for predictable delays
RetryPolicy.fixed([60, 300, 900], auto_retry_for=["TRANSIENT_ERROR"], jitter=False)

Retries only happen when specific conditions are met. Configure via auto_retry_for on RetryPolicy:

@app.task(
"api_call",
retry_policy=RetryPolicy.fixed([30, 60, 120], auto_retry_for=["RATE_LIMITED", "SERVICE_UNAVAILABLE"]),
)
def api_call() -> TaskResult[dict, TaskError]:
...

auto_retry_for accepts:

  • Error codes from TaskError: "RATE_LIMITED", "SERVICE_UNAVAILABLE"
  • Library error codes: "UNHANDLED_EXCEPTION", "WORKER_CRASHED"
  • Codes must use UPPER_SNAKE_CASE (exception class names like "TimeoutError" are rejected)

Map unhandled exceptions to error codes without try/except boilerplate. When a task raises an exception, the mapper matches the exact exception class (type(exc)).

@app.task(
"call_api",
retry_policy=RetryPolicy.fixed([30, 60, 120], auto_retry_for=["TIMEOUT", "CONNECTION_ERROR"]),
exception_mapper={
TimeoutError: "TIMEOUT",
ConnectionError: "CONNECTION_ERROR",
},
)
def call_api() -> TaskResult[dict, TaskError]:
# No try/except needed — TimeoutError becomes "TIMEOUT" automatically
response = requests.get("https://api.example.com", timeout=10)
return TaskResult(ok=response.json())

Set a global mapper on AppConfig to apply to all tasks:

config = AppConfig(
broker=PostgresConfig(database_url="postgresql+psycopg://..."),
exception_mapper={
TimeoutError: "TIMEOUT",
ConnectionError: "CONNECTION_ERROR",
PermissionError: "PERMISSION_DENIED",
},
default_unhandled_error_code="UNHANDLED_EXCEPTION",
)

When an unhandled exception is caught:

  1. Per-task exception_mapper (exact class lookup)
  2. Global AppConfig.exception_mapper (exact class lookup)
  3. Per-task default_unhandled_error_code
  4. Global AppConfig.default_unhandled_error_code (defaults to "UNHANDLED_EXCEPTION")

Per-task mapper entries take priority over global. If the task function returns TaskResult(err=...) explicitly, the mapper is never invoked.

Only exact class matches count — subclasses are not matched. If you need to handle a subclass, map it explicitly.

  1. Task fails with matching error code
  2. Worker checks retry_count < max_retries
  3. If retries remaining, task status set to PENDING
  4. next_retry_at calculated from retry policy
  5. Task not claimable until next_retry_at passes
  6. Worker sends delayed notification to trigger claiming

Each step writes an immutable attempt row to horsies_task_attempts. A retried failure creates an attempt with will_retry=True and outcome='FAILED'. The final attempt (whether success or terminal failure) has will_retry=False. During retries, horsies_tasks.error_code remains NULL — it is only set when the task reaches its final terminal state.

Use handle.info(include_attempts=True) to inspect the full attempt timeline. See Retrieving Results for details.

FieldDescription
retry_countCurrent number of retry attempts
max_retriesMaximum attempts allowed
next_retry_atWhen task becomes claimable again

Access via database or result:

result = handle.get()
if result.is_err():
error = result.err_value
# Check if retries exhausted
if "retry" in str(error.data):
print("All retries exhausted")

The policy validates consistency:

# This raises ValueError:
RetryPolicy(
max_retries=3,
intervals=[60, 300], # Only 2 intervals for 3 retries
backoff_strategy='fixed',
auto_retry_for=["TRANSIENT_ERROR"],
)
# This also raises ValueError:
RetryPolicy(
max_retries=3,
intervals=[60, 300, 900], # Multiple intervals
backoff_strategy='exponential', # Exponential needs exactly 1 interval
auto_retry_for=["TRANSIENT_ERROR"],
)
@app.task(
"call_external_api",
retry_policy=RetryPolicy.exponential(
base_seconds=60,
max_retries=5,
auto_retry_for=["RATE_LIMITED", "SERVICE_UNAVAILABLE"],
),
)
def call_external_api() -> TaskResult[dict, TaskError]:
try:
response = requests.get("https://api.example.com")
if response.status_code == 429:
return TaskResult(err=TaskError(error_code="RATE_LIMITED"))
return TaskResult(ok=response.json())
except requests.Timeout:
return TaskResult(err=TaskError(error_code="TIMEOUT"))
@app.task(
"update_inventory",
retry_policy=RetryPolicy.fixed([1, 2, 5], auto_retry_for=["DEADLOCK"]), # Quick retries
)
def update_inventory(item_id: int, delta: int) -> TaskResult[None, TaskError]:
try:
db.update_stock(item_id, delta)
return TaskResult(ok=None)
except DeadlockDetected:
return TaskResult(err=TaskError(error_code="DEADLOCK", message="Deadlock detected"))