Skip to content

Migrating to 0.1.2

0.1.2 lands the strict-serde redesign. The wire stops carrying class identity. Every parameter and return type must classify into a concrete shape; the decoder uses the declared type — not metadata embedded on the value — to materialize objects.

This page lists every user-facing change against 0.1.1 and the mechanical fix for each.

Variadic and positional-only task parameters are rejected at registration

Section titled “Variadic and positional-only task parameters are rejected at registration”

*args (VAR_POSITIONAL), **kwargs (VAR_KEYWORD), and positional-only parameters (def f(x, /):) raise SignatureValidationError at @app.task time.

# Wrong
@app.task("bad")
def bad(*args, **kwargs) -> TaskResult[int, TaskError]:
...
# Correct
@app.task("good")
def good(*, x: int, y: int) -> TaskResult[int, TaskError]:
return TaskResult(ok=x + y)

These are rejected by the signature validator:

BannedFix
AnyUse a concrete type, or JsonValue for raw JSON data
objectSame as above
bare dict, list, tupleParameterize: dict[str, JsonValue], list[int], tuple[str, int]
set, frozensetJSON arrays don’t preserve set semantics; use list[T]
bytesWrap in a model with explicit base64 fields, or pass str
TypeVarDefine a wrapper task per concrete instantiation
bare BaseModelUse a concrete subclass
TypedDictReplace with BaseModel or @dataclass
pathlib.PurePath (and subclasses)Pass str and convert at the boundary
Tasks without return annotationDeclare -> TaskResult[T, TaskError]

JsonValue (re-exported as horsies.JsonValue) is the only untyped fence. Use it only at task boundary positions or inside BaseModel / @dataclass fields:

from horsies import JsonValue
@app.task("validate_input")
def validate_input(
*,
data: dict[str, JsonValue],
) -> TaskResult[dict[str, JsonValue], TaskError]:
...

.send() / .send_async() / .schedule() are keyword-only

Section titled “.send() / .send_async() / .schedule() are keyword-only”

Positional arguments to send/schedule are rejected with Err(TaskSendError(code=VALIDATION_FAILED)).

# Wrong
match my_task.send(5, 3):
...
# Correct
match my_task.send(a=5, b=3):
...

The user-facing contract is keyword-only. The wrapper may still accept *args at the Python signature level for compatibility and typing, but strict-serde rejects non-empty positional args before enqueue.

TaskNode no longer accepts an args field. Pass all data through kwargs.

# Wrong
node = TaskNode(fn=step_task, args=("hello",))
# Correct
node = TaskNode(fn=step_task, kwargs={"step": "hello"})

Subworkflow build_with receives typed TaskResult

Section titled “Subworkflow build_with receives typed TaskResult”

In-process subworkflow handoff no longer round-trips through a JSON envelope. build_with(parent_result) receives the typed TaskResult[T, TaskError] directly. Existing user code that already declared the typed parameter needs no change; code that declared Any or relied on a wire envelope must declare the concrete TaskResult[ParentOk, TaskError].

TaskSchedule(args=...) rejected at enqueue

Section titled “TaskSchedule(args=...) rejected at enqueue”

The scheduler service rejects schedules carrying positional args with Err(BrokerOperationError(code=ENQUEUE_FAILED)). Use kwargs only.

# Wrong
TaskSchedule(
name="sync-us-east",
task_name="process_region",
pattern=IntervalSchedule(hours=1),
args=("us-east",),
)
# Correct
TaskSchedule(
name="sync-us-east",
task_name="process_region",
pattern=IntervalSchedule(hours=1),
kwargs={"region": "us-east"},
)

broker.get_result(...) / broker.get_result_async(...) removed

Section titled “broker.get_result(...) / broker.get_result_async(...) removed”

The broker no longer exposes a typed result fetch. Use app.get_result(...) / app.get_result_async(...) for typed decode, or broker.get_raw_result_record_async(...) for the raw envelope.

# Wrong
result = await broker.get_result_async(task_id, timeout_ms=5000)
if result.is_ok():
...
# Correct (typed)
outer = await app.get_result_async(task_id, timeout_ms=5000)
if is_err(outer):
# infrastructure failure (INVALID_JSON_PAYLOAD, NO_TYPE_AVAILABLE, BROKER_ERROR)
handle_broker_error(outer.err_value)
else:
task_result = outer.ok_value # TaskResult[Any, TaskError]
if task_result.is_ok():
...

The return shape is now BrokerResult[TaskResult[Any, TaskError]] — the outer BrokerResult carries infrastructure errors; the inner TaskResult carries the domain result.

CodeEnumWhen
INVALID_JSON_PAYLOADBrokerErrorCodeStored result JSON is malformed, not a JSON object, has an invalid strict-serde envelope, or fails app-level typed slot decode
NO_TYPE_AVAILABLEBrokerErrorCodeapp.get_result_async ok-slot decode needs ok_type but task_name is not in the local registry
NO_TYPE_AVAILABLEContractCodeOutputless workflow per-node decode failed for a terminal task name not registered locally
RESULT_DESERIALIZATION_ERROROperationalErrorCodeEnvelope shape invalid or ok slot does not match ok_type (existed pre-strict-serde; semantics narrowed)

Failed-task results decode without an ok_type, so BrokerErrorCode.NO_TYPE_AVAILABLE only fires on the success path. Reading failed tasks across processes that don’t import the user code still works.

PYDANTIC_HYDRATION_ERROR is retained as a legacy enum member on ContractCode but is no longer emitted by any production path. New code should not match on it. The strict-serde equivalents are RETURN_TYPE_MISMATCH (encode-side return type mismatch), RESULT_DESERIALIZATION_ERROR (handle/workflow decode failure folded into TaskResult.err), and BrokerErrorCode.INVALID_JSON_PAYLOAD (app-level result read failure).

The stored result column shape changed from the legacy __task_result__ / __pydantic_model__ / __dataclass__ / __datetime__ per-value class tags to a single envelope:

{
"__h_task_result__": true,
"ok": <typed JSON value>,
"err": null
}

See Serialization for the full envelope shape (including outputless workflow terminals).

  • In-flight rows from a pre-strict-serde worker are not consumable by a 0.1.2 worker/result reader. A 0.1.2 worker rejects rows carrying positional args, and strict result readers reject legacy result envelopes. Drop or drain pre-strict-serde rows before upgrading.
  • The __horsies_* smuggle path is closed. Any payload carrying reserved __h_* or __builtin_task_code__ keys at user-controlled positions is rejected at decode time.
TaskError.exception: BaseException | FlattenedException | None

In-process the value is a live BaseException. On the wire it flattens to a FlattenedException TypedDict (type, module, message, repr, traceback). Code accessing .exception should switch on the concrete type rather than assuming dict[str, Any].

Reserved built-in codes are rejected as strings

Section titled “Reserved built-in codes are rejected as strings”

TaskError(error_code="WORKER_RESOLUTION_ERROR") raises ValueError at construction time. Built-in codes must be passed as enum members:

# Wrong
TaskError(error_code="WORKER_RESOLUTION_ERROR")
# Correct
from horsies import OperationalErrorCode
TaskError(error_code=OperationalErrorCode.WORKER_RESOLUTION_ERROR)

User-defined codes remain plain str (must not collide with any reserved built-in code; horsies check catches statically visible collisions via HRS-212).

The canonical app.get_broker() sets both app._broker = broker AND broker.app = app. Direct construction (PostgresBroker(config.broker) + manual app._broker = broker) leaves broker.app as None, which breaks consumer-side outputless workflow decode (terminal node lookup needs broker.app.tasks).

If you maintain test fixtures that bypass app.get_broker():

# Wrong
app = Horsies(config)
broker = PostgresBroker(config.broker)
app._broker = broker
# Correct
app = Horsies(config)
broker = PostgresBroker(config.broker)
app._broker = broker
broker.app = app # required for outputless workflow per-node decode
# Or simply:
app = Horsies(config)
broker = app.get_broker()
  • Audit @app.task signatures: replace Any / object / bare containers / set / frozenset / Callable / bytes / TypeVar / TypedDict / pathlib.PurePath with concrete types or JsonValue.
  • Search for positional .send( / .send_async( / .schedule( calls and rewrite as kwargs.
  • Search for TaskNode(args= / TaskSchedule(args= and migrate to kwargs=.
  • Replace broker.get_result(...) / broker.get_result_async(...) with app.get_result(...) / app.get_result_async(...); update callers to unwrap the outer BrokerResult.
  • Drop or drain in-flight pre-strict-serde rows before upgrading workers.
  • Replace string-form built-in error codes (e.g. "BROKER_ERROR", "WORKER_RESOLUTION_ERROR") with the corresponding enum member.
  • If you match on PYDANTIC_HYDRATION_ERROR, add RESULT_DESERIALIZATION_ERROR and RETURN_TYPE_MISMATCH to the match arms.