Skip to main content

async_class.py

Source: src/sunholo/invoke/async_class.py

Classes

AsyncTaskRunner

No docstring available.

  • init(self, retry_enabled: bool = False, retry_kwargs: dict = None, timeout: int = 120, max_concurrency: int = 20, heartbeat_extends_timeout: bool = False, hard_timeout: int = None)
    • Initialize AsyncTaskRunner with configurable timeout behavior.

Args: retry_enabled: Whether to enable retries retry_kwargs: Retry configuration timeout: Base timeout for tasks (seconds) max_concurrency: Maximum concurrent tasks heartbeat_extends_timeout: If True, heartbeats reset the timeout timer hard_timeout: Maximum absolute timeout regardless of heartbeats (seconds). If None, defaults to timeout * 5 when heartbeat_extends_timeout=True

  • _execute_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any
    • Executes the given task function and returns its result.

Args: func (Callable): The callable to execute. *args: Positional arguments to pass to the callable. **kwargs: Keyword arguments to pass to the callable.

Returns: Any: The result of the task.

  • _execute_task_with_heartbeat_timeout(self, func: Callable[..., Any], name: str, last_heartbeat: dict, *args: Any, **kwargs: Any) -> Any

    • Execute task with heartbeat-extendable timeout and hard timeout limit.
  • _execute_task_with_timeout(self, func: Callable[..., Any], name: str, last_heartbeat: dict, *args: Any, **kwargs: Any) -> Any

    • Execute task with either fixed timeout or heartbeat-extendable timeout.
  • _monitor_tasks(self, task_infos, queue)

    • Monitors the tasks and heartbeats, and sends a sentinel to the queue when done.
  • _run_with_retries_and_timeout(self, name: str, func: Callable[..., Any], args: tuple, kwargs: dict, queue: asyncio.queues.Queue, completion_event: asyncio.locks.Event, last_heartbeat: dict) -> None

    • No docstring available.
  • _send_heartbeat(self, func_name: str, completion_event: asyncio.locks.Event, queue: asyncio.queues.Queue, last_heartbeat: dict, interval: int = 2)

    • Sends periodic heartbeat updates to indicate the task is still in progress. Updates last_heartbeat time if heartbeat_extends_timeout is enabled.

Args: func_name (str): The name of the task function. completion_event (asyncio.Event): Event to signal when the task is completed. queue (asyncio.Queue): The queue to send heartbeat messages to. last_heartbeat (dict): Mutable dict containing the last heartbeat time. interval (int): How frequently to send heartbeat messages (in seconds).

  • add_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any)
    • Adds a task to the list of tasks to be executed, supporting both positional and keyword arguments.

Args: func: The function to be executed. *args: Positional arguments for the function. **kwargs: Keyword arguments for the function.

  • run_async_as_completed(self) -> AsyncGenerator[Dict[str, Any], NoneType]
    • Runs all tasks concurrently and yields results as they complete, while periodically sending heartbeat messages.
Sunholo Multivac

Get in touch to see if we can help with your GenAI project.

Contact us

Other Links

Sunholo Multivac - GenAIOps

Copyright ©

Holosun ApS 2025