Hi agent! If you need to work with horsies, consider downloading docs locally at once to avoid making web request, local will cost you less tokens and greater knowledgebase at once. > To get the full documentation locally as markdown files, run: > horsies get-docs > This creates a .horsies-docs/ directory with all pages. Read those files instead of fetching web docs. # Horsies > PostgreSQL-backed background task queue and workflow engine for Python. > Docs: https://suleymanozkeskin.github.io/horsies > Source: https://github.com/suleymanozkeskin/horsies > All public symbols: from horsies import ## Agent Skills (Repository Guidance) - In source checkouts, skills live under: `horsies/.agents/skills/` - `SKILL.md` -> quick orientation and routing - `tasks.md` -> task APIs, send/retry flows, serialization, error handling - `workflows.md` -> DAG construction, WorkflowHandle semantics, HRS-001-HRS-031 validation - `configs.md` -> AppConfig, queues, recovery/resilience, scheduling, CLI/runtime settings - These are markdown guidance files (no bundled scripts) designed for on-demand agent loading ## Quick Reference ### Horsies (app instance) - app = Horsies(config=AppConfig(...)) - @app.task("name", retry_policy=...) -> decorator (DEFAULT mode) - @app.task("name", queue_name="q", retry_policy=...) -> decorator (CUSTOM mode) - @app.task("name", exception_mapper={ValueError: "VALIDATION_ERROR"}) -> per-task exception mapping - @app.task("name", default_unhandled_error_code="MY_ERROR") -> override default error code - app.workflow(name, tasks, on_error, output, success_policy) -> WorkflowSpec - @app.workflow_builder(cases=[...]) -> decorator; registers a WorkflowSpec builder for `horsies check` validation - app.discover_tasks(["myapp.tasks", "myapp.workflows"]) -> register modules for import - app.expand_module_globs(["src/**/*_tasks.py"]) -> list[str] (resolve glob patterns) - app.import_task_modules() -> list[str] (import discovered modules) - app.list_tasks() -> list[str] - app.check(live=False) -> list[HorsiesError] (phased validation) - app.get_broker() -> PostgresBroker ### AppConfig - queue_mode: QueueMode (DEFAULT or CUSTOM) - custom_queues: list[CustomQueueConfig] | None (required for CUSTOM) - broker: PostgresConfig - cluster_wide_cap: int | None (max RUNNING tasks across cluster) - prefetch_buffer: int (0 = hard cap, >0 = soft cap with lease) - claim_lease_ms: int | None (required when prefetch_buffer > 0) - recovery: RecoveryConfig - resilience: WorkerResilienceConfig - schedule: ScheduleConfig | None - exception_mapper: ExceptionMapper (global {ExceptionClass: "ERROR_CODE"}) - default_unhandled_error_code: str (default "UNHANDLED_EXCEPTION") - resend_on_transient_err: bool (default False, auto-retry transient ENQUEUE_FAILED for both task send and workflow start) ### PostgresConfig - database_url: str (must start with "postgresql+psycopg://") - pool_size: int (default 30) - max_overflow: int (default 30) - pool_timeout: int (default 30) - pool_recycle: int (default 1800) - pool_pre_ping: bool (default True) - echo: bool (default False) ### QueueMode - QueueMode.DEFAULT — single "default" queue, no queue_name on tasks - QueueMode.CUSTOM — named queues with priorities and per-queue concurrency ### CustomQueueConfig - name: str - priority: int (1=highest, 100=lowest, default 1) - max_concurrency: int (default 5) ### TaskResult[T, TaskError] - TaskResult(ok=value) | TaskResult(err=TaskError(...)) - .is_ok() -> bool, .is_err() -> bool - .ok -> T | None, .err -> E | None - .ok_value -> T (raises on err), .err_value -> E (raises on ok) ### TaskError - error_code: str | BuiltInTaskCode | None - message: str | None - data: Any | None - exception: dict[str, Any] | BaseException | None ### BuiltInTaskCode (type alias = OperationalErrorCode | ContractCode | RetrievalCode | OutcomeCode) - OperationalErrorCode: UNHANDLED_EXCEPTION, TASK_EXCEPTION, WORKER_CRASHED, BROKER_ERROR, WORKER_RESOLUTION_ERROR, WORKER_SERIALIZATION_ERROR, RESULT_DESERIALIZATION_ERROR, WORKFLOW_ENQUEUE_FAILED, SUBWORKFLOW_LOAD_FAILED - ContractCode: RETURN_TYPE_MISMATCH, PYDANTIC_HYDRATION_ERROR, WORKFLOW_CTX_MISSING_ID - RetrievalCode: WAIT_TIMEOUT, TASK_NOT_FOUND, WORKFLOW_NOT_FOUND, RESULT_NOT_AVAILABLE, RESULT_NOT_READY - OutcomeCode: TASK_CANCELLED, TASK_EXPIRED, WORKFLOW_PAUSED, WORKFLOW_FAILED, WORKFLOW_CANCELLED, UPSTREAM_SKIPPED, SUBWORKFLOW_FAILED, WORKFLOW_SUCCESS_CASE_NOT_MET - All built-in code string values are globally reserved; user-defined error codes must not collide - BUILTIN_CODE_REGISTRY: dict[str, BuiltInTaskCode] maps every reserved string to its enum member - horsies check reports HRS-212 when exception_mapper or default_unhandled_error_code collides with a reserved code - User-defined error codes are plain str only (foreign str,Enum types rejected) - Strict construction: TaskError(error_code="BROKER_ERROR") raises ValueError (reserved string) - Use enum members: TaskError(error_code=OperationalErrorCode.BROKER_ERROR) — accepted - Serialization: built-in codes use tagged format {"__builtin_task_code__": "VALUE"}, user strings as-is - Deserialization: standard model_validate() / model_validate_json() work everywhere — including nested models ### SubWorkflowError (extends TaskError) - sub_workflow_id: str - sub_workflow_summary: SubWorkflowSummary[Any] - Pattern match to distinguish: case SubWorkflowError() vs case TaskError() ### BrokerResult[T] (infrastructure Result type) - All PostgresBroker methods return BrokerResult[T] = Ok[T] | Err[BrokerOperationError] - Ok(value): operation succeeded - Err(BrokerOperationError): operation failed - Use is_ok(result) / is_err(result) type guards, then access ok_value/err_value - BrokerOperationError fields: code (BrokerErrorCode), message (str), retryable (bool), exception (BaseException | None) - BrokerErrorCode: SCHEMA_INIT_FAILED, ENQUEUE_FAILED, TASK_INFO_QUERY_FAILED, MONITORING_QUERY_FAILED, CLEANUP_FAILED, CLOSE_FAILED - retryable=True: transient (connection blip), retryable=False: permanent (schema drift, code bug) - Broker methods: enqueue_async -> BrokerResult[str], get_task_info_async -> BrokerResult[TaskInfo | None], ensure_schema_initialized -> BrokerResult[None], get_stale_tasks -> BrokerResult[list[dict]], get_worker_stats -> BrokerResult[list[dict]], get_expired_tasks -> BrokerResult[list[dict]], mark_stale_tasks_as_failed -> BrokerResult[int], requeue_stale_claimed -> BrokerResult[int], close_async -> BrokerResult[None] - Note: get_result_async still returns TaskResult[T, TaskError] (task execution result, not broker infra) ### Result[T, E] (vendored Ok/Err type) - from horsies import Result, Ok, Err, is_ok, is_err - .ok_value / .err_value properties, .is_ok() / .is_err() methods - is_ok(result) / is_err(result) are TypeIs type guards for narrowing ### @app.task Decorator - Decorated function must return TaskResult[T, TaskError] - Options: task_name, queue_name, good_until, retry_policy, exception_mapper, default_unhandled_error_code - Returns TaskFunction[P, T] ### TaskFunction[P, T] (decorated task) - .__call__(*args, **kwargs) -> TaskResult[T, TaskError] (sync direct call) - .send(*args, **kwargs) -> TaskSendResult[TaskHandle[T]] (enqueue for background execution) - .send_async(*args, **kwargs) -> TaskSendResult[TaskHandle[T]] (async enqueue) - .schedule(delay_seconds, *args, **kwargs) -> TaskSendResult[TaskHandle[T]] (delayed enqueue) - .retry_send(error: TaskSendError) -> TaskSendResult[TaskHandle[T]] (retry failed send with stored payload) - .retry_send_async(error: TaskSendError) -> TaskSendResult[TaskHandle[T]] (async retry) - .retry_schedule(error: TaskSendError) -> TaskSendResult[TaskHandle[T]] (retry failed schedule with stored payload) - .node(**workflow_opts) -> NodeFactory[P, T] (typed workflow node builder) - .task_name -> str ### TaskHandle[T] - .task_id -> str - .get(timeout_ms=None) -> TaskResult[T, TaskError] (blocking) - .get_async(timeout_ms=None) -> TaskResult[T, TaskError] (async blocking) - .info(include_result=False, include_failed_reason=False) -> BrokerResult[TaskInfo | None] - .info_async(include_result=False, include_failed_reason=False) -> BrokerResult[TaskInfo | None] ### TaskSendResult[T] (send/schedule Result type) - type TaskSendResult[T] = Result[T, TaskSendError] - .send() / .send_async() / .schedule() return TaskSendResult[TaskHandle[T]] - Ok(TaskHandle): task enqueued successfully - Err(TaskSendError): enqueue failed with categorized error - Use is_ok(result) / is_err(result) type guards, then access ok_value/err_value - .retry_send(error) / .retry_send_async(error) / .retry_schedule(error): replay stored payload ### TaskSendError - code: TaskSendErrorCode - message: str (human-readable description) - retryable: bool (True only for ENQUEUE_FAILED) - task_id: str | None (None for SEND_SUPPRESSED, VALIDATION_FAILED) - payload: TaskSendPayload | None (serialized envelope for idempotent retry) - exception: BaseException | None (original cause) ### TaskSendErrorCode (enum) - SEND_SUPPRESSED: send suppressed during worker import/discovery - VALIDATION_FAILED: argument serialization or validation failed - ENQUEUE_FAILED: broker/database failure during enqueue (transient, retryable) - PAYLOAD_MISMATCH: retry payload SHA does not match (payload altered) ### TaskSendPayload (frozen dataclass) - task_name: str, queue_name: str, priority: int - args_json: str | None, kwargs_json: str | None - sent_at: datetime, good_until: datetime | None - enqueue_delay_seconds: int | None, task_options: str | None - enqueue_sha: str (SHA-256 hex digest for same-payload guarantee) ### TaskInfo - .task_id -> str, .task_name -> str, .status -> TaskStatus - .queue_name -> str, .priority -> int - .retry_count -> int, .max_retries -> int, .next_retry_at -> datetime | None - .sent_at/enqueued_at/claimed_at/started_at/completed_at/failed_at -> datetime | None - .worker_hostname/worker_pid/worker_process_name -> str | int | None - .result -> TaskResult[Any, TaskError] | None (opt-in via include_result) - .failed_reason -> str | None (opt-in via include_failed_reason) ### TaskStatus (enum) - PENDING -> CLAIMED -> RUNNING -> COMPLETED | FAILED | CANCELLED | EXPIRED (never claimed before good_until) - .is_terminal -> bool - TASK_TERMINAL_STATES: frozenset = {COMPLETED, FAILED, CANCELLED, EXPIRED} ### NodeFactory[P, T] (typed node builder) - Returned by task_function.node(**workflow_opts) - Called with task arguments: node_factory(**kwargs) -> TaskNode[T] - Two-step pattern: step 1 sets workflow options, step 2 sets task arguments - Example: my_task.node(waits_for=[dep], args_from={"x": dep})(extra_arg="val") - Workflow options: waits_for, workflow_ctx_from, args_from, queue, priority, allow_failed_deps, join, min_success, good_until, node_id ### TaskNode[T] - T = ok type of the task's TaskResult - fn: TaskFunction, kwargs: dict (args field exists but is init=False — not a construction param) - waits_for: list of TaskNode | SubWorkflowNode (wait for terminal) - args_from: dict[str, TaskNode | SubWorkflowNode] (inject TaskResult as kwarg) - workflow_ctx_from: list[TaskNode | SubWorkflowNode] | None (subset for WorkflowContext) - queue: str | None (override task decorator queue) - priority: int | None (override task decorator priority) - allow_failed_deps: bool (False=SKIP on dep failure, True=run with failed TaskResult) - join: "all" (default) | "any" | "quorum" - min_success: int | None (required when join="quorum") - good_until: datetime | None (task expiry deadline) - node_id: str | None (auto-assigned as "{slugify(name)}:{index}" if None) - .name -> str, .key() -> NodeKey[T] ### SubWorkflowNode[T] - T = child workflow's output type (from WorkflowDefinition[T]) - workflow_def: type[WorkflowDefinition[T]] - kwargs: passed to workflow_def.build_with(app, **kwargs) - waits_for, args_from, workflow_ctx_from: same semantics as TaskNode - join, min_success, allow_failed_deps: same as TaskNode - node_id: str | None (auto-assigned if None) - .name -> str, .key() -> NodeKey[T] ### WorkflowSpec - .name -> str - .tasks -> list[TaskNode | SubWorkflowNode] - .on_error -> OnError (FAIL or PAUSE) - .output -> TaskNode | SubWorkflowNode | None - .success_policy -> SuccessPolicy | None - .resend_on_transient_err -> bool (auto-retry transient start errors, default False) - .start(workflow_id=None) -> WorkflowStartResult[WorkflowHandle[T]] - .start_async(workflow_id=None) -> WorkflowStartResult[WorkflowHandle[T]] - .retry_start(error: WorkflowStartError) -> WorkflowStartResult[WorkflowHandle[T]] (retry failed start, sync) - .retry_start_async(error: WorkflowStartError) -> WorkflowStartResult[WorkflowHandle[T]] (retry failed start, async) - retry_start: best-effort idempotent by workflow_id (not payload-verified); only ENQUEUE_FAILED eligible - Validates DAG on construction (cycles, deps, args_from, node_ids, etc.) ### WorkflowHandle - .workflow_id -> str - .status() / .status_async() -> WorkflowStatus - .get(timeout_ms=None) / .get_async(...) -> TaskResult[Any, TaskError] (blocking) - .results() / .results_async() -> dict[str, TaskResult] (keyed by node_id) - .result_for(node) / .result_for_async(node) -> TaskResult[T, TaskError] (non-blocking query) - .tasks() / .tasks_async() -> list[WorkflowTaskInfo] - .cancel() / .cancel_async() -> None - .pause() / .pause_async() -> bool - .resume() / .resume_async() -> bool ### WorkflowStatus (enum) - PENDING -> RUNNING -> COMPLETED | FAILED | PAUSED | CANCELLED - .is_terminal -> bool - WORKFLOW_TERMINAL_STATES: frozenset = {COMPLETED, FAILED, CANCELLED} ### WorkflowTaskStatus (enum) - PENDING -> READY -> ENQUEUED -> RUNNING -> COMPLETED | FAILED | SKIPPED - .is_terminal -> bool - WORKFLOW_TASK_TERMINAL_STATES: frozenset = {COMPLETED, FAILED, SKIPPED} ### WorkflowTaskInfo - .node_id -> str | None, .index -> int, .name -> str, .status -> WorkflowTaskStatus - .result -> TaskResult[Any, TaskError] | None (COMPLETED/FAILED; SKIPPED often None) - .started_at -> datetime | None, .completed_at -> datetime | None ### OnError (enum) - FAIL — continue DAG, skip tasks without allow_failed_deps, mark workflow FAILED - PAUSE — pause workflow immediately, block new enqueues, await resume() or cancel() ### WorkflowContext - Injected when task declares workflow_ctx: WorkflowContext | None = None - Requires workflow_ctx_from on the TaskNode - .workflow_id -> str, .task_index -> int, .task_name -> str - .result_for(node: TaskNode[T] | NodeKey[T]) -> TaskResult[T, TaskError] - .has_result(node: TaskNode | SubWorkflowNode) -> bool - .summary_for(node: SubWorkflowNode[T]) -> SubWorkflowSummary[T] - .has_summary(node: SubWorkflowNode) -> bool - Only nodes in workflow_ctx_from are accessible (KeyError otherwise) ### WorkflowMeta - Auto-injected when task declares workflow_meta: WorkflowMeta | None = None - .workflow_id -> str, .task_index -> int, .task_name -> str - Metadata only, no result access (lighter than WorkflowContext) ### SubWorkflowSummary[T] - Accessible via WorkflowContext.summary_for(subworkflow_node) - .status -> WorkflowStatus (child's final status) - .output -> T | None (child's output value, typed via generic) - .total_tasks -> int, .completed_tasks -> int - .failed_tasks -> int, .skipped_tasks -> int - .error_summary -> str | None ### NodeKey[T] - NodeKey("string_id") for dynamic/string-based lookup - node.key() to get NodeKey from a TaskNode or SubWorkflowNode - Used with workflow_ctx.result_for() and handle.result_for() ### SuccessPolicy / SuccessCase - SuccessPolicy(cases=[SuccessCase(required=[...])], optional=[...]) - Workflow COMPLETED if ANY SuccessCase is fully satisfied (all required COMPLETED) - optional: tasks that can fail without affecting success evaluation - Without success_policy: any task failure -> workflow FAILED ### AnyNode - Type alias: TaskNode[Any] | SubWorkflowNode[Any] ### WorkflowDefinition[T] (class-based workflow builder) - Subclass with name (ClassVar[str]) and TaskNode/SubWorkflowNode class attributes - class Meta: output, on_error, success_policy - .build(app) -> WorkflowSpec - .build_with(app, **params) -> WorkflowSpec (with runtime parameters; kwargs-only) - .get_workflow_nodes() -> list[tuple[str, TaskNode | SubWorkflowNode]] - Node IDs auto-assigned from attribute names ### @app.workflow_builder (check-phase validation) - @app.workflow_builder() -> decorator; registers a WorkflowSpec builder for `horsies check` - @app.workflow_builder(cases=[{"region": "us-east"}, ...]) -> for parameterized builders - Zero-arg builders: auto-invoked during check with no arguments - Parameterized builders: requires cases= kwarg dicts covering each required parameter - Builders run under send suppression (no tasks enqueued); returned WorkflowSpec is fully validated - HRS-027: parameterized builder missing cases= - HRS-029: builder raised exception or returned non-WorkflowSpec - HRS-030: function returns WorkflowSpec but lacks @app.workflow_builder ### start_workflow / start_workflow_async - start_workflow(spec, broker, workflow_id=None, *, resend_on_transient_err=False) -> WorkflowStartResult[WorkflowHandle[T]] - start_workflow_async(spec, broker, workflow_id=None, *, resend_on_transient_err=False) -> WorkflowStartResult[WorkflowHandle[T]] - Standalone functions (alternative to spec.start()) - Returns Ok(WorkflowHandle) on success, Err(WorkflowStartError) on failure - resend_on_transient_err: auto-retry up to 3 times on transient infra errors with exponential backoff ### WorkflowStartResult[T] (workflow start Result type) - spec.start() / start_async() / start_workflow() / start_workflow_async() return WorkflowStartResult[WorkflowHandle[T]] - WorkflowStartResult[T] = Result[T, WorkflowStartError] - Ok(WorkflowHandle): workflow created and root tasks enqueued - Err(WorkflowStartError): start failed with categorized error - WorkflowStartError fields: code (WorkflowStartErrorCode), message (str), retryable (bool), workflow_name (str), workflow_id (str), exception (BaseException | None) - WorkflowStartErrorCode: BROKER_NOT_CONFIGURED, VALIDATION_FAILED, ENQUEUE_FAILED, INTERNAL_FAILED - retryable=True: transient (DB connection blip), retryable=False: permanent (validation, config error) - Use is_ok(result) / is_err(result) type guards, or result.ok_value to extract handle ### ExceptionMapper - Type: dict[type[BaseException], str] - Maps exception classes to UPPER_SNAKE_CASE error codes - Exact class matching only (no MRO/inheritance lookup) - Resolution order: task_mapper -> global_mapper -> task_default -> global_default - Global: AppConfig(exception_mapper={ValueError: "VALIDATION_ERROR"}) - Per-task: @app.task("name", exception_mapper={TimeoutError: "TIMEOUT"}) ### RetryPolicy - RetryPolicy.fixed([60, 120, 300], auto_retry_for=["ERROR_CODE", ...]) - RetryPolicy.exponential(base_seconds=1, max_retries=3, auto_retry_for=["ERROR_CODE", ...]) - auto_retry_for: required list of error codes that trigger retries - jitter: bool (default True, +-25% randomization) - Fixed: intervals length must equal max_retries - Exponential: exactly one base interval, delay = base * 2^(attempt-1) ### RecoveryConfig - auto_requeue_stale_claimed: bool (default True) - claimed_stale_threshold_ms: int (default 120_000, range 1s-1hr) - auto_fail_stale_running: bool (default True) - running_stale_threshold_ms: int (default 300_000, range 1s-2hr) - check_interval_ms: int (default 30_000, range 1s-10min) - runner_heartbeat_interval_ms: int (default 30_000, range 1s-2min) - claimer_heartbeat_interval_ms: int (default 30_000, range 1s-2min) - heartbeat_retention_hours: int | None (default 24, range 1-8760, None disables pruning) - worker_state_retention_hours: int | None (default 168 / 7 days, range 1-8760, None disables pruning) - terminal_record_retention_hours: int | None (default 720 / 30 days, range 1-43800, None disables pruning) - Constraint: stale thresholds must be >= 2x heartbeat intervals - Reaper prunes heartbeats, worker_states, and terminal task/workflow rows hourly based on retention settings ### WorkerResilienceConfig - db_retry_initial_ms: int (default 500, range 100ms-60s) - db_retry_max_ms: int (default 30_000, range 500ms-5min) - db_retry_max_attempts: int (default 0 = infinite, range 0-10_000) - notify_poll_interval_ms: int (default 5_000, range 1s-5min) - Constraint: db_retry_max_ms >= db_retry_initial_ms ### discover_tasks - app.discover_tasks(["myapp.tasks", "myapp.workflows"]) - Imports exact entries listed (dotted modules via importlib, .py paths via path import) - Export decorated functions from __init__.py or list submodules explicitly ### Scheduling Models #### Weekday (enum) - MONDAY, TUESDAY, WEDNESDAY, THURSDAY, FRIDAY, SATURDAY, SUNDAY #### IntervalSchedule - IntervalSchedule(seconds=30) | IntervalSchedule(hours=1, minutes=30) - At least one of: seconds, minutes, hours, days #### HourlySchedule - HourlySchedule(minute=30, second=0) -> every hour at XX:30:00 #### DailySchedule - DailySchedule(time=time(3, 0, 0)) -> daily at 03:00:00 #### WeeklySchedule - WeeklySchedule(days=[Weekday.MONDAY, Weekday.FRIDAY], time=time(9, 0)) #### MonthlySchedule - MonthlySchedule(day=15, time=time(15, 0)) -> 15th at 15:00 #### SchedulePattern - Type union: IntervalSchedule | HourlySchedule | DailySchedule | WeeklySchedule | MonthlySchedule #### TaskSchedule - name: str (unique schedule identifier) - task_name: str (registered task name) - pattern: SchedulePattern - args: tuple, kwargs: dict (task arguments) - queue_name: str | None - enabled: bool (default True) - timezone: str (default "UTC") - catch_up_missed: bool (default False) #### ScheduleConfig - enabled: bool (default True) - schedules: list[TaskSchedule] - check_interval_seconds: int (default 1, range 1-60) ### horsies check (CLI validation) - Run `horsies check` before deploys — it lists all blocking errors in one pass - Use as a feedback loop from the library during development: define tasks/workflows, run check, fix errors, repeat - `horsies check [--live]` — validate without starting services - Runs app.check(live=...) which performs phased validation: - Phase 1: Config validation (implicit at AppConfig construction) - Phase 2: Task module imports — imports each module, collects import/definition errors - Phase 3: Workflow validation — triggered during imports (WorkflowSpec DAG construction) - Phase 4: Workflow builders — executes @app.workflow_builder functions, validates returned specs, detects undecorated builders (HRS-030) - Phase 5: Runtime policy safety — validates retry/exception-mapper policy metadata - Phase 6 (--live only): Broker connectivity — async SELECT 1 against PostgreSQL - Guarantee model: - Strong: @app.workflow_builder decorated functions are fully executed and validated for exercised builder cases - Best-effort: undecorated builder detection (HRS-030) is heuristic — only scans directly discovered modules - Returns list[HorsiesError]; empty = all passed - Errors are collected within each phase; check returns early before later phases when earlier phases fail - Worker and scheduler also run check() at startup before accepting work - On success: prints "ok: all validations passed" with task count - On failure: prints Rust-style ValidationReport to stderr, exits with code 1 ### Error Types and Reporting #### ErrorCode (enum, startup/validation errors HRS-001-HRS-399) - Workflow validation (HRS-001-HRS-030): - HRS-001 WORKFLOW_NO_NAME, HRS-002 WORKFLOW_NO_NODES - HRS-003 WORKFLOW_INVALID_NODE_ID, HRS-004 WORKFLOW_DUPLICATE_NODE_ID - HRS-005 WORKFLOW_NO_ROOT_TASKS, HRS-006 WORKFLOW_INVALID_DEPENDENCY - HRS-007 WORKFLOW_CYCLE_DETECTED, HRS-008 WORKFLOW_INVALID_ARGS_FROM - HRS-009 WORKFLOW_INVALID_CTX_FROM, HRS-010 WORKFLOW_CTX_PARAM_MISSING - HRS-011 WORKFLOW_INVALID_OUTPUT, HRS-012 WORKFLOW_INVALID_SUCCESS_POLICY - HRS-013 WORKFLOW_INVALID_JOIN, HRS-014 WORKFLOW_UNRESOLVED_QUEUE, HRS-015 WORKFLOW_UNRESOLVED_PRIORITY - HRS-018 WORKFLOW_SUBWORKFLOW_APP_MISSING, HRS-019 WORKFLOW_INVALID_KWARG_KEY - HRS-020 WORKFLOW_MISSING_REQUIRED_PARAMS, HRS-021 WORKFLOW_KWARGS_ARGS_FROM_OVERLAP - HRS-022 WORKFLOW_SUBWORKFLOW_PARAMS_REQUIRE_BUILD_WITH, HRS-023 WORKFLOW_SUBWORKFLOW_BUILD_WITH_BINDING - HRS-024 WORKFLOW_ARGS_FROM_TYPE_MISMATCH, HRS-025 WORKFLOW_OUTPUT_TYPE_MISMATCH - HRS-026 WORKFLOW_POSITIONAL_ARGS_NOT_SUPPORTED - HRS-027 WORKFLOW_CHECK_CASES_REQUIRED, HRS-028 WORKFLOW_CHECK_CASE_INVALID - HRS-029 WORKFLOW_CHECK_BUILDER_EXCEPTION, HRS-030 WORKFLOW_CHECK_UNDECORATED_BUILDER - Task definition (HRS-100-HRS-103): - HRS-100 TASK_NO_RETURN_TYPE, HRS-101 TASK_INVALID_RETURN_TYPE - HRS-102 TASK_INVALID_OPTIONS, HRS-103 TASK_INVALID_QUEUE - Config/broker (HRS-200-HRS-212): - HRS-200 CONFIG_INVALID_QUEUE_MODE, HRS-201 CONFIG_INVALID_CLUSTER_CAP - HRS-202 CONFIG_INVALID_PREFETCH, HRS-203 BROKER_INVALID_URL - HRS-204 CONFIG_INVALID_RECOVERY, HRS-205 CONFIG_INVALID_SCHEDULE - HRS-206 CLI_INVALID_ARGS, HRS-207 WORKER_INVALID_LOCATOR - HRS-208 CONFIG_INVALID_RESILIENCE, HRS-209 CONFIG_INVALID_EXCEPTION_MAPPER - HRS-210 MODULE_EXEC_ERROR, HRS-211 BROKER_INIT_FAILED - HRS-212 CHECK_RESERVED_CODE_COLLISION (exception_mapper/default_unhandled_error_code collides with built-in code) - Registry (HRS-300-HRS-301): - HRS-300 TASK_NOT_REGISTERED, HRS-301 TASK_DUPLICATE_NAME #### HorsiesError (base exception) - Rust-style formatted error display (like rustc compiler errors) - Fields: message, code (ErrorCode | None), location (SourceLocation | None), notes (list[str]), help_text (str | None) - .format_rust_style(use_colors=None) -> str (ANSI-colored for TTY) - Auto-detects source location from call stack (first frame outside horsies internals) - Subclasses: WorkflowValidationError, ConfigurationError, TaskDefinitionError, RegistryError - Environment variables: - HORSIES_FORCE_COLOR=1: force ANSI colors even when not TTY - NO_COLOR: disable ANSI colors (https://no-color.org/) #### SourceLocation - .file -> str, .line -> int, .column -> int | None - .get_source_line() -> str | None (reads actual source for display) - .format_short() -> str ("file:line" or "file:line:col") - SourceLocation.from_function(fn) -> SourceLocation | None #### ValidationReport - Collects multiple HorsiesError instances within a validation phase - ValidationReport(phase_name: str) - .add(error: HorsiesError), .has_errors() -> bool, .errors -> list[HorsiesError] - .format_rust_style() -> str (formats all errors with "aborting due to N errors" summary) - Used by WorkflowSpec construction, AppConfig validation, and app.check() #### raise_collected(report: ValidationReport) -> None - 0 errors: no-op (returns normally) - 1 error: raises the original HorsiesError (preserves except clauses) - 2+ errors: raises MultipleValidationErrors wrapping the report #### MultipleValidationErrors (extends HorsiesError) - Wraps a ValidationReport with 2+ errors - .report -> ValidationReport - .format_rust_style() delegates to report #### WorkflowContextMissingIdError (RuntimeError) - Raised when TaskNode/NodeKey has no node_id for WorkflowContext.result_for() ### Serialization - Supported: primitives, dict, list, Pydantic BaseModel, dataclass, datetime, date, time - Pydantic models: model_dump(mode="json") -> model_validate() round-trip - datetime: ISO 8601 with timezone preservation - Pydantic/dataclass values rehydrate to instances; plain dicts remain dicts - Unsupported: custom classes, __main__-defined classes, lambdas, file handles ### Failure Semantics - Default (join="all", allow_failed_deps=False): downstream SKIPPED when any dep fails/skips - allow_failed_deps=True: downstream runs, receives failed TaskResult - join="any"/"quorum": downstream can still run if success threshold met - SKIPPED cascades for join="all" with allow_failed_deps=False - on_error="fail" (default): store error, continue DAG, mark FAILED when all terminal - on_error="pause": immediately pause, block new enqueues, await resume ### Utilities - slugify(value: str) -> str: convert string to valid node_id ([A-Za-z0-9_\-:.]+) ### Terminal State Constants - TASK_TERMINAL_STATES: frozenset[TaskStatus] = {COMPLETED, FAILED, CANCELLED, EXPIRED} - WORKFLOW_TERMINAL_STATES: frozenset[WorkflowStatus] = {COMPLETED, FAILED, CANCELLED} - WORKFLOW_TASK_TERMINAL_STATES: frozenset[WorkflowTaskStatus] = {COMPLETED, FAILED, SKIPPED} ### CLI Commands - `horsies worker [--processes N] [--loglevel LEVEL] [--max-claim-batch N] [--max-claim-per-worker N]` - `horsies scheduler [--loglevel LEVEL]` - `horsies check [--live] [--loglevel LEVEL]` - `horsies get-docs [--output DIR]` — fetch docs locally for AI agents (default: .horsies-docs/) - Module format: `myapp.config:app` (dotted:attr) or `app/config.py:app` (file:attr) - Auto-discovery: if module has exactly one Horsies instance, `:name` can be omitted - Exit codes: 0 = success, 1 = error - Signals: SIGTERM/SIGINT for graceful shutdown (workers wait for running tasks) - Both worker and scheduler run app.check() at startup; fail with ValidationReport if errors ## Docs Sitemap Each entry: Title -- summary -> URL ### Quick Start - Getting Started -- minimal working example, install, first task -> /horsies/quick-start/getting-started/ - Configuring Horsies -- app instance and broker setup -> /horsies/quick-start/01-configuring-horsies/ - Producing Tasks -- define and send tasks -> /horsies/quick-start/02-producing-tasks/ - Defining Workflows -- DAG-based workflows -> /horsies/quick-start/03-defining-workflows/ - Scheduling -- recurring schedules -> /horsies/quick-start/04-scheduling/ - Workflow Patterns -- real-world patterns -> /horsies/quick-start/05-workflow-patterns/ ### Concepts - Architecture -- system design and data flow -> /horsies/concepts/architecture/ ### Tasks - Task Lifecycle -- states and transitions -> /horsies/concepts/task-lifecycle/ - Result Handling -- TaskResult pattern, error types -> /horsies/concepts/result-handling/ - Defining Tasks -- @app.task decorator -> /horsies/tasks/defining-tasks/ - Sending Tasks -- enqueue for background execution -> /horsies/tasks/sending-tasks/ - Error Handling -- TaskResult error patterns -> /horsies/tasks/error-handling/ - Errors Reference -- all error codes and types -> /horsies/tasks/errors/ - Retrieving Results -- TaskHandle.get() -> /horsies/tasks/retrieving-results/ - Retry Policy -- fixed/exponential backoff -> /horsies/tasks/retry-policy/ ### Workflows - Workflow Semantics -- DAG behavior, failure, dependencies -> /horsies/concepts/workflows/workflow-semantics/ - Workflow API -- WorkflowSpec/WorkflowHandle reference -> /horsies/concepts/workflows/workflow-api/ - Typed Node Builder -- .node() API for type-safe TaskNode construction -> /horsies/concepts/workflows/typed-node-builder/ - Subworkflows -- composing workflows with SubWorkflowNode -> /horsies/concepts/workflows/subworkflows/ ### Configuration - Queue Modes -- DEFAULT vs CUSTOM, priorities -> /horsies/concepts/queue-modes/ - AppConfig -- root application config -> /horsies/configuration/app-config/ - Broker Config -- PostgresConfig connection -> /horsies/configuration/broker-config/ - Recovery Config -- stale task detection -> /horsies/configuration/recovery-config/ ### Workers - Worker Architecture -- process pool model -> /horsies/workers/worker-architecture/ - Concurrency -- worker/queue/cluster limits -> /horsies/workers/concurrency/ - Heartbeats & Recovery -- crash detection -> /horsies/workers/heartbeats-recovery/ ### Monitoring - Syce Overview -- TUI monitoring dashboard -> /horsies/monitoring/syce-overview/ - Broker Methods -- async inspection APIs -> /horsies/monitoring/broker-methods/ ### Scheduling - Scheduler Overview -- how scheduler enqueues -> /horsies/scheduling/scheduler-overview/ - Schedule Patterns -- Interval, Daily, Weekly, etc. -> /horsies/scheduling/schedule-patterns/ - Schedule Config -- ScheduleConfig model -> /horsies/scheduling/schedule-config/ ### Q&A - Questions & Answers -- design trade-offs, scaling, failure behavior -> /horsies/questions-and-answers/ ### CLI - CLI Reference -- horsies worker, scheduler, check -> /horsies/cli/ - Agent Skills (repository path, usage guidance) -> /horsies/cli/ ### Internals - Database Schema -- PostgreSQL tables -> /horsies/internals/database-schema/ - Serialization -- JSON codec for args/results -> /horsies/internals/serialization/ ## Keyword Index keyword(s) -> page URL install, pip, setup, first task -> /horsies/quick-start/getting-started/ Horsies, AppConfig, PostgresConfig, database_url, broker -> /horsies/configuration/app-config/ PostgresConfig, pool_size, max_overflow, connection -> /horsies/configuration/broker-config/ RecoveryConfig, stale, requeue, crash recovery -> /horsies/configuration/recovery-config/ @app.task, decorator, task definition, define task -> /horsies/tasks/defining-tasks/ send, send_async, enqueue, dispatch, produce, TaskSendResult, TaskSendError, retry_send -> /horsies/tasks/sending-tasks/ TaskResult, is_ok, is_err, ok_value, err_value -> /horsies/tasks/error-handling/ BuiltInTaskCode, OperationalErrorCode, ContractCode, RetrievalCode, OutcomeCode, TaskError, TaskSendErrorCode, error codes, UNHANDLED_EXCEPTION -> /horsies/tasks/errors/ TaskHandle, get, get_async, timeout, result -> /horsies/tasks/retrieving-results/ RetryPolicy, retry, backoff, exponential, fixed, auto_retry_for -> /horsies/tasks/retry-policy/ PENDING, CLAIMED, RUNNING, COMPLETED, FAILED, task status -> /horsies/concepts/task-lifecycle/ TaskResult, Result pattern, error as value -> /horsies/concepts/result-handling/ QueueMode, DEFAULT, CUSTOM, priority, CustomQueueConfig -> /horsies/concepts/queue-modes/ WorkflowSpec, DAG, TaskNode, waits_for, args_from -> /horsies/concepts/workflows/workflow-semantics/ WorkflowHandle, WorkflowTaskInfo, WorkflowStatus -> /horsies/concepts/workflows/workflow-api/ SubWorkflowNode, WorkflowDefinition, build_with, child workflow -> /horsies/concepts/workflows/subworkflows/ WorkflowContext, workflow_ctx_from, result_for, NodeKey -> /horsies/concepts/workflows/workflow-semantics/ SuccessPolicy, SuccessCase, partial success -> /horsies/concepts/workflows/workflow-semantics/ join, any, quorum, min_success, OR-join -> /horsies/concepts/workflows/workflow-semantics/ on_error, pause, resume, PAUSED -> /horsies/concepts/workflows/workflow-semantics/ allow_failed_deps, UPSTREAM_SKIPPED, recovery -> /horsies/concepts/workflows/workflow-semantics/ fan-out, fan-in, parallel, diamond -> /horsies/quick-start/05-workflow-patterns/ NodeFactory, .node(), typed node builder, type-safe -> /horsies/concepts/workflows/typed-node-builder/ SubWorkflowSummary, summary_for, has_summary -> /horsies/concepts/workflows/subworkflows/ WorkflowMeta, workflow_meta, metadata injection -> /horsies/concepts/workflows/workflow-semantics/ WorkflowDefinition, class-based workflow, build, build_with -> /horsies/concepts/workflows/subworkflows/ ExceptionMapper, exception_mapper, error code mapping -> /horsies/tasks/error-handling/ good_until, task expiry, deadline -> /horsies/tasks/defining-tasks/ schedule, delay, TaskFunction.schedule, resend_on_transient_err -> /horsies/tasks/sending-tasks/ worker, process pool, claim, execute -> /horsies/workers/worker-architecture/ concurrency, max_concurrency, cluster_wide_cap, prefetch -> /horsies/workers/concurrency/ heartbeat, crash, stale, worker recovery -> /horsies/workers/heartbeats-recovery/ syce, dashboard, TUI, monitor -> /horsies/monitoring/syce-overview/ get_queue_stats, get_worker_states, broker methods -> /horsies/monitoring/broker-methods/ TaskInfo, retry_count, next_retry_at, task metadata -> /horsies/monitoring/broker-methods/ BrokerResult, BrokerOperationError, BrokerErrorCode, retryable, Ok, Err, Result -> /horsies/monitoring/broker-methods/ ScheduleConfig, TaskSchedule, IntervalSchedule, DailySchedule -> /horsies/scheduling/schedule-config/ schedule pattern, Interval, Hourly, Daily, Weekly, Monthly -> /horsies/scheduling/schedule-patterns/ scheduler, check_interval, cron -> /horsies/scheduling/scheduler-overview/ horsies worker, horsies scheduler, horsies check, horsies get-docs, CLI -> /horsies/cli/ PostgreSQL, tables, schema, migration -> /horsies/internals/database-schema/ serialization, JSON, codec, pydantic, datetime -> /horsies/internals/serialization/ architecture, LISTEN/NOTIFY, components, data flow -> /horsies/concepts/architecture/ FAQ, questions, trade-offs, why PostgreSQL, production-ready, alpha -> /horsies/questions-and-answers/ ErrorCode, HorsiesError, WorkflowValidationError, validation -> /horsies/tasks/errors/ ValidationReport, MultipleValidationErrors, raise_collected, error collection -> /horsies/tasks/errors/ SourceLocation, Rust-style errors, format_rust_style, error display -> /horsies/tasks/errors/ HORSIES_FORCE_COLOR, NO_COLOR -> /horsies/tasks/errors/ horsies check, app.check, phased validation, startup validation -> /horsies/cli/ get-docs, AI agent, local docs, offline docs -> /horsies/cli/ agent skills, SKILL.md, tasks.md, workflows.md, configs.md, horsies/.agents/skills -> /horsies/cli/ discover_tasks, import_task_modules, expand_module_globs -> /horsies/quick-start/01-configuring-horsies/ slugify, node_id, NODE_ID_PATTERN -> /horsies/concepts/workflows/workflow-semantics/