Skip to content

AppConfig

from horsies import Horsies, AppConfig, PostgresConfig, QueueMode
config = AppConfig(
queue_mode=QueueMode.DEFAULT,
broker=PostgresConfig(database_url="postgresql+psycopg://..."),
)
app = Horsies(config)
FieldTypeDefaultDescription
queue_modeQueueModeDEFAULTQueue operating mode
custom_queueslist[CustomQueueConfig]NoneQueue definitions (CUSTOM mode only)
brokerPostgresConfigrequiredDatabase connection settings
cluster_wide_capintNoneMax in-flight tasks cluster-wide
prefetch_bufferint00 = hard cap mode, >0 = soft cap with prefetch
claim_lease_msintNoneClaim lease duration (required if prefetch_buffer > 0; optional override in hard cap mode)
max_claim_renew_age_msint180000Max age (ms) of a CLAIMED task that heartbeat will renew. Older claims are left to expire, preventing indefinite renewal of orphaned tasks. Must be >= effective claim lease
recoveryRecoveryConfigdefaultsCrash recovery settings
resilienceWorkerResilienceConfigdefaultsWorker retry/backoff and notify polling
scheduleScheduleConfigNoneScheduled task configuration
exception_mapperExceptionMapperExceptionMapper()Maps exceptions to TaskError codes
default_unhandled_error_codestr"UNHANDLED_EXCEPTION"Error code for uncaught exceptions
resend_on_transient_errboolFalseAuto-retry transient enqueue failures in workflow start
config = AppConfig(
queue_mode=QueueMode.DEFAULT,
broker=PostgresConfig(...),
# custom_queues must be None or omitted
)
from horsies import CustomQueueConfig
config = AppConfig(
queue_mode=QueueMode.CUSTOM,
custom_queues=[
CustomQueueConfig(name="high", priority=1, max_concurrency=10),
CustomQueueConfig(name="low", priority=100, max_concurrency=3),
],
broker=PostgresConfig(...),
)

See Queue Modes for details.

Limit total in-flight tasks across all workers:

config = AppConfig(
queue_mode=QueueMode.DEFAULT,
broker=PostgresConfig(...),
cluster_wide_cap=100, # Max 100 in-flight tasks (RUNNING + CLAIMED)
)

Set to None (default) for unlimited.

Note: When cluster_wide_cap is set, the system operates in hard cap mode (counts RUNNING + CLAIMED tasks). This ensures strict enforcement and fair distribution across workers.

Control whether workers can prefetch tasks beyond their running capacity:

# Hard cap mode (default) - strict enforcement, fair distribution
config = AppConfig(
broker=PostgresConfig(...),
prefetch_buffer=0, # No prefetch, workers only claim what they can run
)
# Soft cap mode - allows prefetch with lease expiry
config = AppConfig(
broker=PostgresConfig(...),
prefetch_buffer=4, # Prefetch up to 4 extra tasks per worker
claim_lease_ms=5000, # Prefetched claims expire after 5 seconds
)

Important: cluster_wide_cap cannot be used with prefetch_buffer > 0. If you need a global cap, hard cap mode is required.

See Concurrency for detailed explanation of hard vs soft cap modes.

Override crash recovery defaults:

from horsies import RecoveryConfig
config = AppConfig(
broker=PostgresConfig(...),
recovery=RecoveryConfig(
auto_requeue_stale_claimed=True,
claimed_stale_threshold_ms=120_000, # 2 minutes
auto_fail_stale_running=True,
running_stale_threshold_ms=300_000, # 5 minutes
),
)

See Recovery Config for all options.

Configure worker retry/backoff and NOTIFY fallback polling:

from horsies import WorkerResilienceConfig
config = AppConfig(
broker=PostgresConfig(...),
resilience=WorkerResilienceConfig(
db_retry_initial_ms=500,
db_retry_max_ms=30_000,
db_retry_max_attempts=0, # 0 = infinite
notify_poll_interval_ms=5_000,
),
)

Enable scheduled tasks:

from horsies import ScheduleConfig, TaskSchedule, DailySchedule
from datetime import time
config = AppConfig(
broker=PostgresConfig(...),
schedule=ScheduleConfig(
enabled=True,
check_interval_seconds=1,
schedules=[
TaskSchedule(
name="daily-cleanup",
task_name="cleanup_old_data",
pattern=DailySchedule(time=time(3, 0, 0)),
),
],
),
)

See Scheduler Overview for details.

AppConfig validates consistency at creation:

  • CUSTOM mode requires non-empty custom_queues
  • DEFAULT mode must not have custom_queues
  • Queue names must be unique
  • cluster_wide_cap must be positive if set
  • prefetch_buffer must be non-negative
  • claim_lease_ms is required when prefetch_buffer > 0
  • claim_lease_ms is optional when prefetch_buffer = 0 (overrides the default 60s lease)
  • cluster_wide_cap cannot be combined with prefetch_buffer > 0

Multiple validation errors within the same phase are collected and reported together (compiler-style), rather than stopping at the first error.

Use app.check() to run phased validation before starting a worker or scheduler. This is the programmatic equivalent of the horsies check CLI command.

errors = app.check(live=True)
if errors:
for err in errors:
print(err)
else:
print("All validations passed")

Signature:

def check(self, *, live: bool = False) -> list[HorsiesError]

Phases:

PhaseWhat it validatesGating
1. ConfigAppConfig, RecoveryConfig, ScheduleConfig consistencyValidated at construction (implicit pass)
2. Task importsImports all discovered task modules, collects import/registration errorsErrors stop progression to Phases 4-6
3. WorkflowsWorkflowSpec DAG validation (triggered during module imports)Collected alongside Phase 2
3.1 Workflow definitionsImported WorkflowDefinition subclasses declare a valid definition_keyErrors stop progression to Phases 3.2-6
3.2 Workflow buildersExecutes @app.workflow_builder functions, validates returned specsErrors stop progression to Phases 3.3-6
3.3 Undecorated buildersDetects top-level functions returning WorkflowSpec without @app.workflow_builderErrors stop progression to Phases 5-6
5. Runtime policy safetyValidates retry/exception-mapper policy metadata after importsErrors stop progression to Phase 6
6. Broker (if live=True)Connects to PostgreSQL and runs SELECT 1Only runs if Phases 2-5 pass

Returns an empty list when all validations pass, or a list of HorsiesError instances with Rust-style formatting.

CLI equivalent:

Terminal window
horsies check myapp.instance:app # Phases 1-5
horsies check myapp.instance:app --live # Phases 1-6

Register workflow builder functions for check-phase validation. For full API reference and examples, see Workflow API — @app.workflow_builder.

horsies check has two validation paths with different guarantee levels:

Strong guarantee (decorated builders): Functions registered with @app.workflow_builder are executed during check. Every WorkflowSpec they produce is fully validated — DAG structure, kwargs against function signatures, args_from type compatibility, missing required params, definition_key, and more. For the exercised builder cases, this catches structural workflow validity errors before runtime.

Best-effort (undecorated builder detection): Check also scans discovered task modules for top-level functions whose return type annotation is WorkflowSpec but lack the @app.workflow_builder decorator. These produce HRS-030 check errors. This detection is heuristic:

  • It only scans modules directly listed in discover_tasks(), not sub-modules of discovered packages.
  • Functions re-exported in __init__.py from sub-modules may not be detected.

HRS-030 is a safety net, not an absolute proof that no undecorated builders exist.

For deterministic, high-confidence workflow validity in CI:

  1. Decorate every workflow entrypoint with @app.workflow_builder(...).
  2. For parameterized builders, provide cases= that cover all meaningful branches and shapes.
  3. Ensure all builder modules are imported by the app used in horsies check (no hidden or dynamic registration paths).
  4. Run horsies check in CI and fail the pipeline on any errors.
  5. Treat HRS-030 as additional lint signal, not the primary guarantee mechanism.

Log the configuration (with masked password):

import logging
logger = logging.getLogger("myapp")
config.log_config(logger)

Output:

AppConfig:
queue_mode: DEFAULT
broker:
database_url: postgresql+psycopg://user:***@localhost/db
pool_size: 5
max_overflow: 10
recovery:
auto_requeue_stale_claimed: True
...