Skip to content

Getting Started

  • PostgreSQL 12+
  • Python 3.13+
Terminal window
uv add horsies

Create instance.py:

from horsies import Horsies, AppConfig, PostgresConfig, TaskResult, TaskError
config = AppConfig(
broker=PostgresConfig(
database_url="postgresql+psycopg://user:password@localhost:5432/mydb",
),
)
app = Horsies(config)

Add task definitions to instance.py:

@app.task("add_numbers")
def add_numbers(a: int, b: int) -> TaskResult[int, TaskError]:
return TaskResult(ok=a + b)
@app.task("process_data")
def process_data(data: dict) -> TaskResult[str, TaskError]:
if not data:
return TaskResult(err=TaskError(
error_code="EMPTY_DATA",
message="Data cannot be empty",
))
return TaskResult(ok=f"Processed {len(data)} items")
# Register task modules for worker discovery
app.discover_tasks(["instance"])

Requirements:

  • Every task returns TaskResult[T, TaskError]
  • TaskResult(ok=value) for success
  • TaskResult(err=TaskError(...)) for domain errors
  • app.discover_tasks([...]) registers modules for workers

Create producer.py:

from horsies import Ok, Err
from instance import app, add_numbers, process_data
# Send and wait for result
match add_numbers.send(5, 3):
case Ok(handle):
result = handle.get()
if result.is_ok():
do_something(result.ok_value)
else:
handle_error(result.err_value)
case Err(send_err):
print(f"Send failed: {send_err.code} - {send_err.message}")
# Send with blocking timeout
match process_data.send(data={"key": "value"}):
case Ok(handle):
result = handle.get(timeout_ms=5000)
case Err(send_err):
print(f"Send failed: {send_err.code}")
Terminal window
horsies worker instance:app --processes=8 --loglevel=INFO

The worker connects to PostgreSQL, subscribes to notifications, claims tasks, executes them, and stores results.

For async frameworks (FastAPI, etc.):

from horsies import Ok, Err
from instance import add_numbers
async def my_endpoint():
match await add_numbers.send_async(10, 20):
case Ok(handle):
result = await handle.get_async(timeout_ms=10000)
return {"result": result.ok_value if result.is_ok() else None}
case Err(send_err):
return {"error": send_err.message}

These async APIs are producer-side I/O helpers; task execution still happens in worker processes.

Simplest start — an overview of components:

myproject/
├── instance.py # App configuration and Horsies instance
├── tasks.py # Task definitions (@app.task)
├── workflows.py # Workflow definitions (WorkflowDefinition, TaskNode)
├── producer.py # Code that sends tasks / starts workflows
├── worker.py # Custom worker entry point (optional)
└── scheduler.py # Scheduled task configuration (optional)

Adjust to your project needs. Isolate entry points, configs, and workflow definitions.