Skip to main content

async_class.py

Source: sunholo/invoke/async_class.py

Classes

AsyncTaskRunner

No docstring available.

  • init(self, retry_enabled=False, retry_kwargs=None)

    • Initialize self. See help(type(self)) for accurate signature.
  • _execute_task(self, func: Callable[..., Any], *args: Any) -> Any

    • Executes the given task function and returns its result.

Args: func (Callable): The callable to execute. *args: Arguments to pass to the callable.

Returns: Any: The result of the task.

  • _monitor_tasks(self, task_infos, queue)

    • Monitors the tasks and heartbeats, and sends a sentinel to the queue when done.
  • _run_with_retries(self, name: str, func: Callable[..., Any], *args: Any, queue: asyncio.queues.Queue, completion_event: asyncio.locks.Event) -> None

    • Executes a task with optional retries and sends completion or error messages to the queue.
  • _send_heartbeat(self, func_name: str, completion_event: asyncio.locks.Event, queue: asyncio.queues.Queue, interval: int = 2)

    • Sends periodic heartbeat updates to indicate the task is still in progress.

Args: func_name (str): The name of the task function. completion_event (asyncio.Event): Event to signal when the task is completed. queue (asyncio.Queue): The queue to send heartbeat messages to. interval (int): How frequently to send heartbeat messages (in seconds).

  • add_task(self, func: Callable[..., Any], *args: Any)

    • Adds a task to the list of tasks to be executed.
  • run_async_as_completed(self) -> AsyncGenerator[Dict[str, Any], NoneType]

    • Runs all tasks concurrently and yields results as they complete, while periodically sending heartbeat messages.
Sunholo Multivac

Get in touch to see if we can help with your GenAI project.

Contact us

Other Links

Sunholo Multivac - GenAIOps

Copyright ©

Holosun ApS 2024