async_class.py
Source: sunholo/invoke/async_class.py
Classes
AsyncTaskRunner
No docstring available.
-
init(self, retry_enabled=False, retry_kwargs=None, timeout=120)
- Initialize self. See help(type(self)) for accurate signature.
-
_execute_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any
- Executes the given task function and returns its result.
Args: func (Callable): The callable to execute. *args: Positional arguments to pass to the callable. **kwargs: Keyword arguments to pass to the callable.
Returns: Any: The result of the task.
-
_monitor_tasks(self, task_infos, queue)
- Monitors the tasks and heartbeats, and sends a sentinel to the queue when done.
-
_run_with_retries_and_timeout(self, name: str, func: Callable[..., Any], args: tuple, kwargs: dict, queue: asyncio.queues.Queue, completion_event: asyncio.locks.Event) -> None
- No docstring available.
-
_send_heartbeat(self, func_name: str, completion_event: asyncio.locks.Event, queue: asyncio.queues.Queue, interval: int = 2)
- Sends periodic heartbeat updates to indicate the task is still in progress.
Args: func_name (str): The name of the task function. completion_event (asyncio.Event): Event to signal when the task is completed. queue (asyncio.Queue): The queue to send heartbeat messages to. interval (int): How frequently to send heartbeat messages (in seconds).
- add_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any)
- Adds a task to the list of tasks to be executed, supporting both positional and keyword arguments.
Args: func: The function to be executed. *args: Positional arguments for the function. **kwargs: Keyword arguments for the function.
- run_async_as_completed(self) -> AsyncGenerator[Dict[str, Any], NoneType]
- Runs all tasks concurrently and yields results as they complete, while periodically sending heartbeat messages.