async_task_runner.py
Source: src/sunholo/invoke/async_task_runner.py
Classes
AsyncTaskRunner
No docstring available.
- init(self, retry_enabled: bool = False, retry_kwargs: dict = None, timeout: int = 120, max_concurrency: int = 20, heartbeat_extends_timeout: bool = False, hard_timeout: int = None, callbacks: Optional[Dict[str, Callable]] = None, shared_state: Optional[Dict[str, Any]] = None, use_default_callbacks: bool = True, verbose: bool = True)
- Initialize AsyncTaskRunner with configurable timeout behavior and callbacks.
By default, AsyncTaskRunner uses built-in callbacks that automatically manage task state, making it easy to use without any configuration. Just create, add tasks, and get results!
Args: retry_enabled: Whether to enable retries globally retry_kwargs: Global retry configuration for tenacity timeout: Base timeout for tasks in seconds (default: 120) max_concurrency: Maximum concurrent tasks (default: 20) heartbeat_extends_timeout: If True, heartbeats reset the timeout timer hard_timeout: Maximum absolute timeout regardless of heartbeats (seconds). If None, defaults to timeout * 5 when heartbeat_extends_timeout=True callbacks: Dict of custom callbacks to override defaults:
- on_heartbeat: async (context: CallbackContext) -> None
- on_task_start: async (context: CallbackContext) -> None
- on_task_complete: async (context: CallbackContext) -> None
- on_task_error: async (context: CallbackContext) -> None
- on_retry: async (context: CallbackContext) -> None
- on_timeout: async (context: CallbackContext) -> None shared_state: Custom shared state dict. If None, creates default structure with:
- results: Dict[str, Any] - Task results by task name
- errors: Dict[str, str] - Error messages by task name
- completed: List[str] - Completed task names
- started: List[str] - Started task names
- retries: List[str] - Retry attempt records
- timed_out: List[str] - Timed out task names use_default_callbacks: If True (default), use built-in callbacks that:
- Automatically populate shared_state with results and errors
- Log task progress with emojis (🚀 start, ✅ complete, ❌ error, etc.)
- Track task lifecycle (started, completed, retried, timed out) Set to False for full manual control verbose: If True (default), default callbacks print status messages. If False, default callbacks work silently (still populate state)
Default Callbacks Behavior: When use_default_callbacks=True (default), the following happens automatically:
- on_task_start: Adds task to 'started' list, logs "🚀 Starting task: {name}"
- on_task_complete: Stores result in 'results', adds to 'completed', logs "✅ {name} completed: {result}"
- on_task_error: Stores error in 'errors' (truncated to 500 chars), logs "❌ {name} failed: {error}"
- on_retry: Tracks retry attempts in 'retries', logs "🔄 Retry #{n} for {name}"
- on_timeout: Adds to 'timed_out', stores timeout error, logs "⏱️ {name} timed out"
- on_heartbeat: Silent by default (only logs in DEBUG mode)
Examples:
Simplest usage - everything automatic
>>> runner = AsyncTaskRunner() >>> runner.add_task(fetch_data, "api_endpoint") >>> results = await runner.get_aggregated_results() >>> print(results['results']) # {'fetch_data': 'data from api'}
Custom task names for better clarity
>>> runner = AsyncTaskRunner() >>> runner.add_task(fetch_data, "user_api", task_name="fetch_user_data") >>> runner.add_task(fetch_data, "posts_api", task_name="fetch_posts") >>> results = await runner.get_aggregated_results() >>> print(results['results']['fetch_user_data']) # User data
Silent mode - no console output but still collects results
>>> runner = AsyncTaskRunner(verbose=False)
Override just one callback, keep rest as defaults
>>> runner = AsyncTaskRunner( ... callbacks={'on_task_complete': my_custom_complete_handler} ... )
Full manual control - no default callbacks
>>> runner = AsyncTaskRunner(use_default_callbacks=False)
- _execute_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any
- Executes the given task function and returns its result.
Args: func (Callable): The callable to execute. *args: Positional arguments to pass to the callable. **kwargs: Keyword arguments to pass to the callable.
Returns: Any: The result of the task.
-
_execute_task_with_heartbeat_timeout(self, func: Callable[..., Any], name: str, last_heartbeat: dict, timeout: int, hard_timeout: int, *args: Any, **kwargs: Any) -> Any
- Execute task with heartbeat-extendable timeout and hard timeout limit.
-
_execute_task_with_timeout(self, func: Callable[..., Any], name: str, last_heartbeat: dict, timeout: int, heartbeat_extends: bool, hard_timeout: int, *args: Any, **kwargs: Any) -> Any
- Execute task with either fixed timeout or heartbeat-extendable timeout.
-
_monitor_tasks(self, task_infos, queue)
- Monitors the tasks and heartbeats, and sends a sentinel to the queue when done.
-
_process_message_with_callbacks(self, message: Dict[str, Any]) -> Optional[sunholo.invoke.async_task_runner.CallbackContext]
- Process a message and invoke appropriate callbacks.
-
_run_with_retries_and_timeout(self, name: str, func: Callable[..., Any], args: tuple, kwargs: dict, config: Optional[sunholo.invoke.async_task_runner.TaskConfig], queue: asyncio.queues.Queue, completion_event: asyncio.locks.Event, last_heartbeat: dict) -> None
- No docstring available.
-
_send_heartbeat(self, func_name: str, config: Optional[sunholo.invoke.async_task_runner.TaskConfig], completion_event: asyncio.locks.Event, queue: asyncio.queues.Queue, last_heartbeat: dict, interval: int = 2)
- Sends periodic heartbeat updates to indicate the task is still in progress. Updates last_heartbeat time if heartbeat_extends_timeout is enabled.
Args: func_name (str): The name of the task function. config (Optional[TaskConfig]): Per-task configuration. completion_event (asyncio.Event): Event to signal when the task is completed. queue (asyncio.Queue): The queue to send heartbeat messages to. last_heartbeat (dict): Mutable dict containing the last heartbeat time. interval (int): How frequently to send heartbeat messages (in seconds).
-
_setup_callbacks(self, user_callbacks: Optional[Dict[str, Callable]], use_defaults: bool) -> Dict[str, Callable]
- Setup callbacks, using defaults if requested and filling in any missing callbacks.
-
add_task(self, func: Callable[..., Any], *args: Any, task_config: Optional[sunholo.invoke.async_task_runner.TaskConfig] = None, task_name: Optional[str] = None, **kwargs: Any)
- Adds a task to the list of tasks to be executed, with optional per-task configuration.
Automatically ensures task names are unique by appending a suffix if needed.
Args: func: The function to be executed. *args: Positional arguments for the function. task_config: Optional per-task configuration for timeout, retry, and callbacks. task_name: Optional custom name for the task. If not provided, uses func.name. **kwargs: Keyword arguments for the function.
- get_aggregated_results(self) -> Dict[str, Any]
- Run all tasks with callbacks and return the shared_state with aggregated results.
This is a convenience method that runs all tasks and returns the populated shared_state. When using default callbacks, the returned dict will contain:
- results: Dict[str, Any] with task results keyed by task name
- errors: Dict[str, str] with error messages for failed tasks
- completed: List[str] of completed task names
- started: List[str] of started task names
- retries: List[str] of retry attempt records
- timed_out: List[str] of timed out task names
Returns: Dict containing the shared_state with all task results and metadata
Example: >>> runner = AsyncTaskRunner() >>> runner.add_task(fetch_data, "api", task_name="api_fetch") >>> runner.add_task(process_data, "raw_data", task_name="data_processing") >>> results = await runner.get_aggregated_results() >>> print(results['results']['api_fetch']) # Access specific result >>> if results['errors']: # Check for any errors ... print(f"Errors occurred: {results['errors']}")
- run_async_as_completed(self) -> AsyncGenerator[Dict[str, Any], NoneType]
- Runs all tasks concurrently and yields results as they complete, while periodically sending heartbeat messages.
This is the low-level API that yields raw messages. For a higher-level API with automatic callback processing, use run_async_with_callbacks().
- run_async_with_callbacks(self) -> AsyncGenerator[sunholo.invoke.async_task_runner.CallbackContext, NoneType]
- Runs all tasks and automatically processes messages through callbacks. Yields CallbackContext after each callback invocation for monitoring.
CallbackContext
Context passed to callbacks with task information and shared state.
-
eq(self, other)
- Return self==value.
-
init(self, task_name: str, elapsed_time: float = 0, task_metadata: Dict[str, Any] = <factory>, shared_state: Dict[str, Any] = <factory>, result: Any = None, error: Exception = None, retry_attempt: int = 0, message_type: str = '') -> None
- Initialize self. See help(type(self)) for accurate signature.
-
repr(self)
- Return repr(self).
TaskConfig
Per-task configuration for timeout, retry, and callbacks.
-
eq(self, other)
- Return self==value.
-
init(self, timeout: Optional[int] = None, retry_enabled: Optional[bool] = None, retry_kwargs: Optional[dict] = None, heartbeat_extends_timeout: Optional[bool] = None, hard_timeout: Optional[int] = None, callbacks: Optional[Dict[str, Callable]] = None, metadata: Dict[str, Any] = <factory>) -> None
- Initialize self. See help(type(self)) for accurate signature.
-
repr(self)
- Return repr(self).
