orchestrator.py
Source: src/sunholo/tools/orchestrator.py
Classes
ToolOrchestrator
Coordinates async execution of multiple tools.
Manages a registry of tool handlers and executes them concurrently using AsyncTaskRunner, aggregating results as they complete.
Args: context: Execution context for timeout configuration ("ui", "email", etc.). max_concurrent: Maximum concurrent tool executions.
-
init(self, context: 'str' = 'ui', max_concurrent: 'int' = 10)
- Initialize self. See help(type(self)) for accurate signature.
-
list_tools(self) -> 'List[Dict[str, str]]'
- List all registered tools.
Returns: List of dicts with "id" and "capability" keys.
- merge_tool_configs(self, tools: 'List[str]', user_configs: 'Dict[str, Any] | None' = None) -> 'Dict[str, Dict[str, Any]]'
- Merge user configs with default tool configs.
For each requested tool, merges:
- Registered default_args
- User-provided configs (overrides defaults)
Args: tools: List of tool IDs to configure. user_configs: User-provided per-tool configs.
Returns: Dict mapping tool_id -> merged config dict.
- register(self, tool_id: 'str', handler: 'ToolHandler', capability: 'str' = '', default_args: 'Dict[str, Any] | None' = None) -> 'None'
- Register a tool handler.
Args: tool_id: Unique tool identifier. handler: Async callable that executes the tool. capability: Human-readable capability description. default_args: Default arguments for the handler.
- run(self, tools: 'List[str]', question: 'str' = '', tool_configs: 'Dict[str, Any] | None' = None, common_args: 'Dict[str, Any] | None' = None) -> 'Dict[str, Any]'
- Execute multiple tools concurrently and aggregate results.
Args: tools: List of tool IDs to execute. question: The user question/query to pass to tools. tool_configs: Per-tool configuration overrides. common_args: Args passed to all tool handlers.
Returns: Dict with:
- "results": Dict mapping tool_id -> result
- "errors": Dict mapping tool_id -> error message
- "completed": List of successfully completed tool IDs
- run_streaming(self, tools: 'List[str]', question: 'str' = '', tool_configs: 'Dict[str, Any] | None' = None, common_args: 'Dict[str, Any] | None' = None) -> 'AsyncGenerator[Dict[str, Any], None]'
- Execute tools and yield results as they complete.
Same as run() but yields individual results as an async generator.
Yields: Dicts with "type" ("result" or "error"), "tool_id", and payload.
ToolEntry
Registration entry for a tool in the orchestrator.
Args: handler: Async callable that executes the tool. capability: Human-readable capability description. default_args: Default arguments passed to the handler.
- init(self, handler: 'ToolHandler', capability: 'str' = '', default_args: 'Dict[str, Any] | None' = None)
- Initialize self. See help(type(self)) for accurate signature.
