Skip to main content

task_manager.py

Source: src/sunholo/a2a/task_manager.py

Classes

A2ATaskManager

Manages A2A tasks and their lifecycle.

  • init(self, stream_interpreter: Callable, vac_interpreter: Optional[Callable] = None)
    • Initialize the task manager.

Args: stream_interpreter: Function for streaming VAC interactions vac_interpreter: Function for static VAC interactions

  • _convert_chat_history(self, a2a_history: List[Dict[str, Any]]) -> List[Dict[str, str]]
    • Convert A2A chat history format to VAC format.

Args: a2a_history: A2A format chat history

Returns: VAC format chat history

  • _notify_subscribers(self, task_id: str, task_data: Dict[str, Any])

    • Notify all subscribers of a task update.
  • _parse_skill_name(self, skill_name: str) -> tuple[str, str]

    • Parse skill name to extract VAC name and operation.

Expected format: "vac_{operation}_{vac_name}"

Args: skill_name: The skill name to parse

Returns: Tuple of (vac_name, operation)

  • _process_memory_search_task(self, task: sunholo.a2a.task_manager.A2ATask, vac_name: str)

    • Process a memory search task.
  • _process_query_task(self, task: sunholo.a2a.task_manager.A2ATask, vac_name: str)

    • Process a static query task.
  • _process_stream_task(self, task: sunholo.a2a.task_manager.A2ATask, vac_name: str)

    • Process a streaming task.
  • _process_task(self, task: sunholo.a2a.task_manager.A2ATask)

    • Process a task by invoking the appropriate VAC functionality.

Args: task: The task to process

  • cancel_task(self, task_id: str) -> bool
    • Cancel a task.

Args: task_id: ID of the task to cancel

Returns: True if task was canceled, False if not found or already completed

  • cleanup_completed_tasks(self, max_age_hours: int = 24)
    • Clean up completed tasks older than specified age.

Args: max_age_hours: Maximum age in hours for completed tasks

  • create_task(self, skill_name: str, input_data: Dict[str, Any], client_metadata: Optional[Dict[str, Any]] = None) -> sunholo.a2a.task_manager.A2ATask
    • Create a new A2A task.

Args: skill_name: Name of the skill to invoke input_data: Input parameters for the task client_metadata: Optional client metadata

Returns: Created A2ATask instance

  • get_task(self, task_id: str) -> Optional[sunholo.a2a.task_manager.A2ATask]

    • Get a task by ID.
  • subscribe_to_task(self, task_id: str)

    • Subscribe to task updates via async generator.

Args: task_id: ID of the task to subscribe to

Yields: Task update dictionaries

A2ATask

Represents an A2A task with its state and data.

  • init(self, task_id: str, skill_name: str, input_data: Dict[str, Any], client_metadata: Optional[Dict[str, Any]] = None)
    • Initialize an A2A task.

Args: task_id: Unique task identifier skill_name: Name of the skill being invoked input_data: Input parameters for the task client_metadata: Optional metadata from the client

  • add_artifact(self, name: str, content: Any, artifact_type: str = 'text')

    • Add an artifact to the task.
  • add_message(self, role: str, content: str, message_type: str = 'text')

    • Add a message to the task.
  • to_dict(self) -> Dict[str, Any]

    • Convert task to dictionary format for A2A responses.
  • update_progress(self, progress: float)

    • Update task progress (0.0 to 1.0).
  • update_state(self, new_state: sunholo.a2a.task_manager.TaskState, error_message: str = None)

    • Update the task state.

TaskState

A2A Task states as defined in the protocol.

Sunholo Multivac

Get in touch to see if we can help with your GenAI project.

Contact us

Other Links

Sunholo Multivac - GenAIOps

Copyright ©

Holosun ApS 2025