AppConfig
Basic Usage
Section titled “Basic Usage”from horsies import Horsies, AppConfig, PostgresConfig, QueueMode
config = AppConfig( queue_mode=QueueMode.DEFAULT, broker=PostgresConfig(database_url="postgresql+psycopg://..."),)app = Horsies(config)Fields
Section titled “Fields”| Field | Type | Default | Description |
|---|---|---|---|
queue_mode | QueueMode | DEFAULT | Queue operating mode |
custom_queues | list[CustomQueueConfig] | None | Queue definitions (CUSTOM mode only) |
broker | PostgresConfig | required | Database connection settings |
cluster_wide_cap | int | None | Max in-flight tasks cluster-wide |
prefetch_buffer | int | 0 | 0 = hard cap mode, >0 = soft cap with prefetch |
claim_lease_ms | int | None | Claim lease duration (required if prefetch_buffer > 0; optional override in hard cap mode) |
max_claim_renew_age_ms | int | 180000 | Max age (ms) of a CLAIMED task that heartbeat will renew. Older claims are left to expire, preventing indefinite renewal of orphaned tasks. Must be >= effective claim lease |
recovery | RecoveryConfig | defaults | Crash recovery settings |
resilience | WorkerResilienceConfig | defaults | Worker retry/backoff and notify polling |
schedule | ScheduleConfig | None | Scheduled task configuration |
exception_mapper | ExceptionMapper | ExceptionMapper() | Maps exceptions to TaskError codes |
default_unhandled_error_code | str | "UNHANDLED_EXCEPTION" | Error code for uncaught exceptions |
resend_on_transient_err | bool | False | Auto-retry transient enqueue failures in workflow start |
Queue Mode Configuration
Section titled “Queue Mode Configuration”DEFAULT Mode
Section titled “DEFAULT Mode”config = AppConfig( queue_mode=QueueMode.DEFAULT, broker=PostgresConfig(...), # custom_queues must be None or omitted)CUSTOM Mode
Section titled “CUSTOM Mode”from horsies import CustomQueueConfig
config = AppConfig( queue_mode=QueueMode.CUSTOM, custom_queues=[ CustomQueueConfig(name="high", priority=1, max_concurrency=10), CustomQueueConfig(name="low", priority=100, max_concurrency=3), ], broker=PostgresConfig(...),)See Queue Modes for details.
Cluster-Wide Concurrency
Section titled “Cluster-Wide Concurrency”Limit total in-flight tasks across all workers:
config = AppConfig( queue_mode=QueueMode.DEFAULT, broker=PostgresConfig(...), cluster_wide_cap=100, # Max 100 in-flight tasks (RUNNING + CLAIMED))Set to None (default) for unlimited.
Note: When cluster_wide_cap is set, the system operates in hard cap mode (counts RUNNING + CLAIMED tasks). This ensures strict enforcement and fair distribution across workers.
Prefetch Configuration
Section titled “Prefetch Configuration”Control whether workers can prefetch tasks beyond their running capacity:
# Hard cap mode (default) - strict enforcement, fair distributionconfig = AppConfig( broker=PostgresConfig(...), prefetch_buffer=0, # No prefetch, workers only claim what they can run)
# Soft cap mode - allows prefetch with lease expiryconfig = AppConfig( broker=PostgresConfig(...), prefetch_buffer=4, # Prefetch up to 4 extra tasks per worker claim_lease_ms=5000, # Prefetched claims expire after 5 seconds)Important: cluster_wide_cap cannot be used with prefetch_buffer > 0. If you need a global cap, hard cap mode is required.
See Concurrency for detailed explanation of hard vs soft cap modes.
Recovery Configuration
Section titled “Recovery Configuration”Override crash recovery defaults:
from horsies import RecoveryConfig
config = AppConfig( broker=PostgresConfig(...), recovery=RecoveryConfig( auto_requeue_stale_claimed=True, claimed_stale_threshold_ms=120_000, # 2 minutes auto_fail_stale_running=True, running_stale_threshold_ms=300_000, # 5 minutes ),)See Recovery Config for all options.
Resilience Configuration
Section titled “Resilience Configuration”Configure worker retry/backoff and NOTIFY fallback polling:
from horsies import WorkerResilienceConfig
config = AppConfig( broker=PostgresConfig(...), resilience=WorkerResilienceConfig( db_retry_initial_ms=500, db_retry_max_ms=30_000, db_retry_max_attempts=0, # 0 = infinite notify_poll_interval_ms=5_000, ),)Schedule Configuration
Section titled “Schedule Configuration”Enable scheduled tasks:
from horsies import ScheduleConfig, TaskSchedule, DailySchedulefrom datetime import time
config = AppConfig( broker=PostgresConfig(...), schedule=ScheduleConfig( enabled=True, check_interval_seconds=1, schedules=[ TaskSchedule( name="daily-cleanup", task_name="cleanup_old_data", pattern=DailySchedule(time=time(3, 0, 0)), ), ], ),)See Scheduler Overview for details.
Validation
Section titled “Validation”AppConfig validates consistency at creation:
- CUSTOM mode requires non-empty
custom_queues - DEFAULT mode must not have
custom_queues - Queue names must be unique
cluster_wide_capmust be positive if setprefetch_buffermust be non-negativeclaim_lease_msis required whenprefetch_buffer > 0claim_lease_msis optional whenprefetch_buffer = 0(overrides the default 60s lease)cluster_wide_capcannot be combined withprefetch_buffer > 0
Multiple validation errors within the same phase are collected and reported together (compiler-style), rather than stopping at the first error.
Startup Validation (app.check())
Section titled “Startup Validation (app.check())”Use app.check() to run phased validation before starting a worker or scheduler. This is the programmatic equivalent of the horsies check CLI command.
errors = app.check(live=True)if errors: for err in errors: print(err)else: print("All validations passed")Signature:
def check(self, *, live: bool = False) -> list[HorsiesError]Phases:
| Phase | What it validates | Gating |
|---|---|---|
| 1. Config | AppConfig, RecoveryConfig, ScheduleConfig consistency | Validated at construction (implicit pass) |
| 2. Task imports | Imports all discovered task modules, collects import/registration errors | Errors stop progression to Phases 4-6 |
| 3. Workflows | WorkflowSpec DAG validation (triggered during module imports) | Collected alongside Phase 2 |
| 3.1 Workflow definitions | Imported WorkflowDefinition subclasses declare a valid definition_key | Errors stop progression to Phases 3.2-6 |
| 3.2 Workflow builders | Executes @app.workflow_builder functions, validates returned specs | Errors stop progression to Phases 3.3-6 |
| 3.3 Undecorated builders | Detects top-level functions returning WorkflowSpec without @app.workflow_builder | Errors stop progression to Phases 5-6 |
| 5. Runtime policy safety | Validates retry/exception-mapper policy metadata after imports | Errors stop progression to Phase 6 |
6. Broker (if live=True) | Connects to PostgreSQL and runs SELECT 1 | Only runs if Phases 2-5 pass |
Returns an empty list when all validations pass, or a list of HorsiesError instances with Rust-style formatting.
CLI equivalent:
horsies check myapp.instance:app # Phases 1-5horsies check myapp.instance:app --live # Phases 1-6@app.workflow_builder
Section titled “@app.workflow_builder”Register workflow builder functions for check-phase validation. For full API reference and examples, see Workflow API — @app.workflow_builder.
Guarantee Model
Section titled “Guarantee Model”horsies check has two validation paths with different guarantee levels:
Strong guarantee (decorated builders): Functions registered with @app.workflow_builder are executed during check. Every WorkflowSpec they produce is fully validated — DAG structure, kwargs against function signatures, args_from type compatibility, missing required params, definition_key, and more. For the exercised builder cases, this catches structural workflow validity errors before runtime.
Best-effort (undecorated builder detection): Check also scans discovered task modules for top-level functions whose return type annotation is WorkflowSpec but lack the @app.workflow_builder decorator. These produce HRS-030 check errors. This detection is heuristic:
- It only scans modules directly listed in
discover_tasks(), not sub-modules of discovered packages. - Functions re-exported in
__init__.pyfrom sub-modules may not be detected.
HRS-030 is a safety net, not an absolute proof that no undecorated builders exist.
CI Playbook
Section titled “CI Playbook”For deterministic, high-confidence workflow validity in CI:
- Decorate every workflow entrypoint with
@app.workflow_builder(...). - For parameterized builders, provide
cases=that cover all meaningful branches and shapes. - Ensure all builder modules are imported by the app used in
horsies check(no hidden or dynamic registration paths). - Run
horsies checkin CI and fail the pipeline on any errors. - Treat HRS-030 as additional lint signal, not the primary guarantee mechanism.
Logging Configuration
Section titled “Logging Configuration”Log the configuration (with masked password):
import logginglogger = logging.getLogger("myapp")
config.log_config(logger)Output:
AppConfig: queue_mode: DEFAULT broker: database_url: postgresql+psycopg://user:***@localhost/db pool_size: 5 max_overflow: 10 recovery: auto_requeue_stale_claimed: True ...