async_class.py
Source: src/sunholo/invoke/async_class.py
Classes
AsyncTaskRunner
No docstring available.
- init(self, retry_enabled: bool = False, retry_kwargs: dict = None, timeout: int = 120, max_concurrency: int = 20, heartbeat_extends_timeout: bool = False, hard_timeout: int = None)
- Initialize AsyncTaskRunner with configurable timeout behavior.
Args: retry_enabled: Whether to enable retries retry_kwargs: Retry configuration timeout: Base timeout for tasks (seconds) max_concurrency: Maximum concurrent tasks heartbeat_extends_timeout: If True, heartbeats reset the timeout timer hard_timeout: Maximum absolute timeout regardless of heartbeats (seconds). If None, defaults to timeout * 5 when heartbeat_extends_timeout=True
- _execute_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any
- Executes the given task function and returns its result.
Args: func (Callable): The callable to execute. *args: Positional arguments to pass to the callable. **kwargs: Keyword arguments to pass to the callable.
Returns: Any: The result of the task.
-
_execute_task_with_heartbeat_timeout(self, func: Callable[..., Any], name: str, last_heartbeat: dict, *args: Any, **kwargs: Any) -> Any
- Execute task with heartbeat-extendable timeout and hard timeout limit.
-
_execute_task_with_timeout(self, func: Callable[..., Any], name: str, last_heartbeat: dict, *args: Any, **kwargs: Any) -> Any
- Execute task with either fixed timeout or heartbeat-extendable timeout.
-
_monitor_tasks(self, task_infos, queue)
- Monitors the tasks and heartbeats, and sends a sentinel to the queue when done.
-
_run_with_retries_and_timeout(self, name: str, func: Callable[..., Any], args: tuple, kwargs: dict, queue: asyncio.queues.Queue, completion_event: asyncio.locks.Event, last_heartbeat: dict) -> None
- No docstring available.
-
_send_heartbeat(self, func_name: str, completion_event: asyncio.locks.Event, queue: asyncio.queues.Queue, last_heartbeat: dict, interval: int = 2)
- Sends periodic heartbeat updates to indicate the task is still in progress. Updates last_heartbeat time if heartbeat_extends_timeout is enabled.
Args: func_name (str): The name of the task function. completion_event (asyncio.Event): Event to signal when the task is completed. queue (asyncio.Queue): The queue to send heartbeat messages to. last_heartbeat (dict): Mutable dict containing the last heartbeat time. interval (int): How frequently to send heartbeat messages (in seconds).
- add_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any)
- Adds a task to the list of tasks to be executed, supporting both positional and keyword arguments.
Args: func: The function to be executed. *args: Positional arguments for the function. **kwargs: Keyword arguments for the function.
- run_async_as_completed(self) -> AsyncGenerator[Dict[str, Any], NoneType]
- Runs all tasks concurrently and yields results as they complete, while periodically sending heartbeat messages.