Getting Started
Prerequisites
Section titled “Prerequisites”- PostgreSQL 12+
- Python 3.13+
Installation
Section titled “Installation”uv add horsies1. Create the App Instance
Section titled “1. Create the App Instance”Create instance.py:
from horsies import Horsies, AppConfig, PostgresConfig, TaskResult, TaskError
config = AppConfig( broker=PostgresConfig( database_url="postgresql+psycopg://user:password@localhost:5432/mydb", ),)
app = Horsies(config)2. Define Tasks
Section titled “2. Define Tasks”Add task definitions to instance.py:
@app.task("add_numbers")def add_numbers(a: int, b: int) -> TaskResult[int, TaskError]: return TaskResult(ok=a + b)
@app.task("process_data")def process_data(data: dict) -> TaskResult[str, TaskError]: if not data: return TaskResult(err=TaskError( error_code="EMPTY_DATA", message="Data cannot be empty", )) return TaskResult(ok=f"Processed {len(data)} items")
# Register task modules for worker discoveryapp.discover_tasks(["instance"])Requirements:
- Every task returns
TaskResult[T, TaskError] TaskResult(ok=value)for successTaskResult(err=TaskError(...))for domain errorsapp.discover_tasks([...])registers modules for workers
3. Send Tasks
Section titled “3. Send Tasks”Create producer.py:
from horsies import Ok, Errfrom instance import app, add_numbers, process_data
# Send and wait for resultmatch add_numbers.send(5, 3): case Ok(handle): result = handle.get() if result.is_ok(): do_something(result.ok_value) else: handle_error(result.err_value) case Err(send_err): print(f"Send failed: {send_err.code} - {send_err.message}")
# Send with blocking timeoutmatch process_data.send(data={"key": "value"}): case Ok(handle): result = handle.get(timeout_ms=5000) case Err(send_err): print(f"Send failed: {send_err.code}")4. Run the Worker
Section titled “4. Run the Worker”horsies worker instance:app --processes=8 --loglevel=INFOThe worker connects to PostgreSQL, subscribes to notifications, claims tasks, executes them, and stores results.
5. Async Usage
Section titled “5. Async Usage”For async frameworks (FastAPI, etc.):
from horsies import Ok, Errfrom instance import add_numbers
async def my_endpoint(): match await add_numbers.send_async(10, 20): case Ok(handle): result = await handle.get_async(timeout_ms=10000) return {"result": result.ok_value if result.is_ok() else None} case Err(send_err): return {"error": send_err.message}These async APIs are producer-side I/O helpers; task execution still happens in worker processes.
Project Structure
Section titled “Project Structure”Simplest start — an overview of components:
myproject/├── instance.py # App configuration and Horsies instance├── tasks.py # Task definitions (@app.task)├── workflows.py # Workflow definitions (WorkflowDefinition, TaskNode)├── producer.py # Code that sends tasks / starts workflows├── worker.py # Custom worker entry point (optional)└── scheduler.py # Scheduled task configuration (optional)Adjust to your project needs. Isolate entry points, configs, and workflow definitions.
Next Steps
Section titled “Next Steps”- Task Lifecycle — task states and transitions
- Queue Modes — multiple queues
- Scheduler Overview — scheduled tasks
- Recovery Config — crash recovery