async_class.py
Source: sunholo/invoke/async_class.py
Classes
AsyncTaskRunner
No docstring available.
-
init(self, retry_enabled=False, retry_kwargs=None)
- Initialize self. See help(type(self)) for accurate signature.
-
_execute_task(self, func: Callable[..., Any], *args: Any) -> Any
- Executes the given task function and returns its result.
Args: func (Callable): The callable to execute. *args: Arguments to pass to the callable.
Returns: Any: The result of the task.
-
_monitor_tasks(self, task_infos, queue)
- Monitors the tasks and heartbeats, and sends a sentinel to the queue when done.
-
_run_with_retries(self, name: str, func: Callable[..., Any], *args: Any, queue: asyncio.queues.Queue, completion_event: asyncio.locks.Event) -> None
- Executes a task with optional retries and sends completion or error messages to the queue.
-
_send_heartbeat(self, func_name: str, completion_event: asyncio.locks.Event, queue: asyncio.queues.Queue, interval: int = 2)
- Sends periodic heartbeat updates to indicate the task is still in progress.
Args: func_name (str): The name of the task function. completion_event (asyncio.Event): Event to signal when the task is completed. queue (asyncio.Queue): The queue to send heartbeat messages to. interval (int): How frequently to send heartbeat messages (in seconds).
-
add_task(self, func: Callable[..., Any], *args: Any)
- Adds a task to the list of tasks to be executed.
-
run_async_as_completed(self) -> AsyncGenerator[Dict[str, Any], NoneType]
- Runs all tasks concurrently and yields results as they complete, while periodically sending heartbeat messages.