Retrieving Results
Retrieving Results
Section titled “Retrieving Results”Retrieve task outcomes through TaskHandle.get() / get_async(). For error handling strategy, see error-handling and ../concepts/result-handling.
How To
Section titled “How To”Basic Retrieval
Section titled “Basic Retrieval”from horsies import RetrievalCodefrom instance import my_task
send_result = my_task.send(10, 20)if send_result.is_err(): raise RuntimeError(f"Send failed: {send_result.err_value.code}")handle = send_result.ok_value
# Block until complete (or timeout/error)result = handle.get()
if result.is_err(): error = result.err_value match error.error_code: case RetrievalCode.WAIT_TIMEOUT: print(f"Timeout waiting for {handle.task_id}") case _: print(f"Error: {error.error_code} - {error.message}")else: print(f"Success: {result.ok_value}")Async Retrieval
Section titled “Async Retrieval”send_result = await my_task.send_async(10, 20)if send_result.is_err(): raise RuntimeError(f"Send failed: {send_result.err_value.code}")handle = send_result.ok_valueresult = await handle.get_async()get_async() waits for broker notifications (LISTEN/NOTIFY) with a polling fallback if notifications are lost.
Timeouts
Section titled “Timeouts”Specify maximum wait time in milliseconds:
from horsies import RetrievalCode
# Wait up to 5 secondsresult = handle.get(timeout_ms=5000)
if result.is_err() and result.err_value.error_code == RetrievalCode.WAIT_TIMEOUT: # Task didn't complete in time - may still be running print("Timed out, task may still complete later")Result Caching
Section titled “Result Caching”Results are cached on the handle:
send_result = my_task.send(...)if send_result.is_err(): raise RuntimeError(f"Send failed: {send_result.err_value.code}")handle = send_result.ok_value
# First call fetches from databaseresult1 = handle.get()
# Subsequent calls return cached resultresult2 = handle.get() # No database queryChecking Result State
Section titled “Checking Result State”result = handle.get()
# Check stateif result.is_ok(): value = result.ok_value # Get success valueelif result.is_err(): error = result.err_value # Get error objectTask Metadata and Attempt History
Section titled “Task Metadata and Attempt History”Use handle.info() to fetch task metadata without waiting for completion:
from horsies import is_ok
info_result = handle.info(include_result=True, include_failed_reason=True)if is_ok(info_result) and info_result.ok_value is not None: task_info = info_result.ok_value print(f"Status: {task_info.status}, Retries: {task_info.retry_count}") print(f"Error code: {task_info.error_code}") # final error_code (None if not failed)To inspect per-attempt execution history, pass include_attempts=True:
info_result = handle.info(include_attempts=True)if is_ok(info_result) and info_result.ok_value is not None: task_info = info_result.ok_value for attempt in task_info.attempts or []: print(f"Attempt {attempt.attempt}: {attempt.outcome} " f"(retry={attempt.will_retry}, error={attempt.error_code})")TaskInfo.attempts is None when include_attempts=False (default) and [] when requested but no history exists. Attempts are ordered by attempt number descending (most recent first).
Attempt history and error_code are only available from version 0.1.0a26 onward.
Testing
Section titled “Testing”For unit tests, call tasks directly:
def test_my_task(): result = my_task("test_input") # Direct call, no queue
assert result.is_ok() assert result.ok_value == "expected_output"Things to Avoid
Section titled “Things to Avoid”Don’t silently ignore errors by only logging them.
# Wrong - logs error but takes no actionresult = handle.get()if result.is_err(): print(result.err_value.message)# Code continues as if nothing happened...
# Correct - handle or propagate errorsresult = handle.get()if result.is_err(): # depending on your use case, either pass to a error handler ( prefer this when possible ) your_error_handler(result.err_value) # or return/raise to propagate return result.err_value.error_code
print(f"Success: {result.ok_value}")For detailed error handling patterns, see error-handling.