Serialization
Codec Module
Section titled “Codec Module”Located at horsies/core/codec/serde.py. Handles serialization (to_jsonable) and rehydration (rehydrate_value) of task arguments, keyword arguments, and results.
Serialization Functions
Section titled “Serialization Functions”All serialization and deserialization functions return SerdeResult[T] (an alias for Result[T, SerializationError]) instead of raising exceptions. Callers check the result with is_err() and handle failures explicitly.
| Function | Returns | Purpose |
|---|---|---|
to_jsonable(value) | SerdeResult[Json] | Convert a value to a JSON-serializable structure |
rehydrate_value(value) | SerdeResult[Any] | Restore typed objects from JSON structures |
args_to_json(args) | SerdeResult[str] | Serialize positional arguments |
kwargs_to_json(kwargs) | SerdeResult[str] | Serialize keyword arguments |
dumps_json(value) | SerdeResult[str] | Serialize a value to a JSON string |
loads_json(json_str) | SerdeResult[Json] | Deserialize a JSON string |
task_result_from_json(j) | SerdeResult[TaskResult[Any, TaskError]] | Deserialize a TaskResult |
serialize_error_payload(tr) | str | Serialize a TaskResult error with hardcoded fallback (never fails) |
Supported Types
Section titled “Supported Types”Native JSON Types
Section titled “Native JSON Types”str,int,float,bool,Nonelist,dict- Nested combinations of the above
Pydantic BaseModel
Section titled “Pydantic BaseModel”Pydantic models serialize with type metadata for automatic rehydration. The codec stores the module path and class name so workers can reconstruct the exact type.
from pydantic import BaseModelfrom horsies import Horsies, TaskResult, TaskError
app = Horsies(config)
class Order(BaseModel): id: int items: list[str]
@app.task('process_order')def process_order(order: Order) -> TaskResult[Order, TaskError]: return TaskResult(ok=order)Serialized form:
{ "__pydantic_model__": true, "module": "myapp.models", "qualname": "Order", "data": {"id": 1, "items": ["widget"]}}Rehydration uses model_validate() on the resolved class — Pydantic handles type coercion (including ISO strings back to datetime for model fields).
Pydantic Models in Workflows
Section titled “Pydantic Models in Workflows”Pydantic models flowing through args_from are serialized with model_dump(mode="json") and rehydrated with model_validate(). The downstream task receives the reconstructed model instance, not a dict.
from datetime import datetime, timezonefrom pydantic import BaseModelfrom horsies import ( Horsies, AppConfig, PostgresConfig, TaskResult, TaskError, TaskNode,)
config = AppConfig( broker=PostgresConfig( database_url="postgresql+psycopg://user:password@localhost:5432/mydb", ),)app = Horsies(config)
class Order(BaseModel): item: str total: float created_at: datetime
@app.task("create_order")def create_order() -> TaskResult[Order, TaskError]: return TaskResult(ok=Order( item="widget", total=9.99, created_at=datetime.now(timezone.utc), ))
@app.task("process_order")def process_order(order_result: TaskResult[Order, TaskError]) -> TaskResult[str, TaskError]: if order_result.is_err(): return TaskResult(err=order_result.err_value) order: Order = order_result.ok_value # Rehydrated Order instance, not a dict print(order.created_at) # datetime object, not a string return TaskResult(ok=f"Processed {order.item}")
# Wiringnode_create: TaskNode[Order] = TaskNode(fn=create_order)node_process: TaskNode[str] = TaskNode( fn=process_order, waits_for=[node_create], args_from={"order_result": node_create},)Dataclasses
Section titled “Dataclasses”Dataclasses serialize with the same metadata approach. Each field is recursively converted via to_jsonable, preserving nested Pydantic and dataclass types.
from dataclasses import dataclass
@dataclassclass Metrics: page_count: int total_words: intSerialized form:
{ "__dataclass__": true, "module": "myapp.models", "qualname": "Metrics", "data": {"page_count": 5, "total_words": 1200}}Rehydration reconstructs the dataclass via its constructor. Fields with init=False are set directly on the instance after construction.
Datetime Types
Section titled “Datetime Types”datetime.datetime, datetime.date, and datetime.time serialize as tagged dicts with ISO 8601 strings. This enables lossless round-trip rehydration — values come back as the correct Python type, not as plain strings.
import datetime as dt
@app.task('record_event')def record_event() -> TaskResult[dict, TaskError]: return TaskResult(ok={ 'occurred_at': dt.datetime(2025, 6, 15, 10, 30, 0, tzinfo=dt.timezone.utc), 'event_date': dt.date(2025, 6, 15), })Serialized forms:
{"__datetime__": true, "value": "2025-06-15T10:30:00+00:00"}{"__date__": true, "value": "2025-06-15"}{"__time__": true, "value": "14:30:00"}Timezone offsets are preserved. isoformat() produces the offset (e.g. +00:00, +05:30), and fromisoformat() restores it. Naive datetimes (no timezone) round-trip as naive.
Datetime types also work as fields inside dataclasses and dicts — the recursive serialization handles them automatically.
Sequences and Mappings
Section titled “Sequences and Mappings”Sequence types (e.g. tuple, list) and Mapping types (e.g. dict, OrderedDict) are recursively serialized. str, bytes, and bytearray are excluded from sequence handling.
Unsupported
Section titled “Unsupported”- Custom classes without Pydantic or dataclass decoration
- Classes defined in
__main__(not importable by workers) - Local classes defined inside functions
- File handles, connections
- Functions, lambdas
Attempting to serialize an unsupported type returns Err(SerializationError).
TaskResult Serialization
Section titled “TaskResult Serialization”# SuccessTaskResult(ok=value)# → {"__task_result__": true, "ok": <serialized_value>, "err": null}
# ErrorTaskResult(err=TaskError(...))# → {"__task_result__": true, "ok": null, "err": {"__task_error__": true, ...}}Error Codes
Section titled “Error Codes”| Code | Cause |
|---|---|
WORKER_SERIALIZATION_ERROR | Task result could not be serialized to JSON |
PYDANTIC_HYDRATION_ERROR | Task succeeded but return value could not be rehydrated to declared type |
RESULT_DESERIALIZATION_ERROR | Stored result JSON is corrupt or could not be deserialized |
Return Type Validation
Section titled “Return Type Validation”Return values are validated against declared types using Pydantic’s TypeAdapter (in horsies/core/task_decorator.py):
@app.task('typed')def typed() -> TaskResult[int, TaskError]: return TaskResult(ok='not an int') # RETURN_TYPE_MISMATCHThings to Avoid
Section titled “Things to Avoid”Don’t return bare custom classes. Use Pydantic BaseModel or @dataclass for task arguments and results. The codec needs type metadata for rehydration.
Don’t define result types in __main__. Workers import types by module path. Classes defined in the entrypoint script cannot be resolved. Move them to a separate module.
Don’t define result types inside functions. Local classes have <locals> in their qualname and cannot be imported by workers.