task_manager.py
Source: src/sunholo/a2a/task_manager.py
Classes
A2ATaskManager
Manages A2A tasks and their lifecycle.
- init(self, stream_interpreter: Callable, vac_interpreter: Optional[Callable] = None)
- Initialize the task manager.
Args: stream_interpreter: Function for streaming VAC interactions vac_interpreter: Function for static VAC interactions
- _convert_chat_history(self, a2a_history: List[Dict[str, Any]]) -> List[Dict[str, str]]
- Convert A2A chat history format to VAC format.
Args: a2a_history: A2A format chat history
Returns: VAC format chat history
-
_notify_subscribers(self, task_id: str, task_data: Dict[str, Any])
- Notify all subscribers of a task update.
-
_parse_skill_name(self, skill_name: str) -> tuple[str, str]
- Parse skill name to extract VAC name and operation.
Expected format: "vac_{operation}_{vac_name}"
Args: skill_name: The skill name to parse
Returns: Tuple of (vac_name, operation)
-
_process_memory_search_task(self, task: sunholo.a2a.task_manager.A2ATask, vac_name: str)
- Process a memory search task.
-
_process_query_task(self, task: sunholo.a2a.task_manager.A2ATask, vac_name: str)
- Process a static query task.
-
_process_stream_task(self, task: sunholo.a2a.task_manager.A2ATask, vac_name: str)
- Process a streaming task.
-
_process_task(self, task: sunholo.a2a.task_manager.A2ATask)
- Process a task by invoking the appropriate VAC functionality.
Args: task: The task to process
- cancel_task(self, task_id: str) -> bool
- Cancel a task.
Args: task_id: ID of the task to cancel
Returns: True if task was canceled, False if not found or already completed
- cleanup_completed_tasks(self, max_age_hours: int = 24)
- Clean up completed tasks older than specified age.
Args: max_age_hours: Maximum age in hours for completed tasks
- create_task(self, skill_name: str, input_data: Dict[str, Any], client_metadata: Optional[Dict[str, Any]] = None) -> sunholo.a2a.task_manager.A2ATask
- Create a new A2A task.
Args: skill_name: Name of the skill to invoke input_data: Input parameters for the task client_metadata: Optional client metadata
Returns: Created A2ATask instance
-
get_task(self, task_id: str) -> Optional[sunholo.a2a.task_manager.A2ATask]
- Get a task by ID.
-
subscribe_to_task(self, task_id: str)
- Subscribe to task updates via async generator.
Args: task_id: ID of the task to subscribe to
Yields: Task update dictionaries
A2ATask
Represents an A2A task with its state and data.
- init(self, task_id: str, skill_name: str, input_data: Dict[str, Any], client_metadata: Optional[Dict[str, Any]] = None)
- Initialize an A2A task.
Args: task_id: Unique task identifier skill_name: Name of the skill being invoked input_data: Input parameters for the task client_metadata: Optional metadata from the client
-
add_artifact(self, name: str, content: Any, artifact_type: str = 'text')
- Add an artifact to the task.
-
add_message(self, role: str, content: str, message_type: str = 'text')
- Add a message to the task.
-
to_dict(self) -> Dict[str, Any]
- Convert task to dictionary format for A2A responses.
-
update_progress(self, progress: float)
- Update task progress (0.0 to 1.0).
-
update_state(self, new_state: sunholo.a2a.task_manager.TaskState, error_message: str = None)
- Update the task state.
TaskState
A2A Task states as defined in the protocol.