content_buffer.py
Source: sunholo/streaming/content_buffer.py
Classes
BufferStreamingStdOutCallbackHandler
A callback handler for streaming LLM output to a content buffer.
This class handles the streaming of output from a large language model (LLM), processes tokens from the model output, and writes them to a ContentBuffer. It supports handling different types of tokens and keeps track of code blocks and questions.
Attributes: content_buffer (ContentBuffer): The buffer to which content is streamed. tokens (str): Tokens that indicate the end of a statement, for buffer flushing. buffer (str): Temporary storage for accumulating streamed tokens. stream_finished (threading.Event): Signals when the streaming is finished. in_code_block (bool): Indicates whether the current context is a code block. in_question_block (bool): Indicates whether the current context is a question block. question_buffer (str): Stores the accumulated questions.
-
init(self, content_buffer: sunholo.streaming.content_buffer.ContentBuffer, tokens: str = '.?!\n', *args, **kwargs)
- Initializes a new BufferStreamingStdOutCallbackHandler instance.
Args: content_buffer (ContentBuffer): The buffer to which content will be written. tokens (str): Tokens that indicate the end of a statement (default: ".?! "). *args: Additional positional arguments. **kwargs: Additional keyword arguments.
Sets up the callback handler with the given content buffer and tokens. Initializes tracking variables for code blocks, buffer content, and the finished signal.
-
_is_heartbeat_token(self, token: str) -> bool
- Detects if the token is a heartbeat message.
-
_process_buffer(self)
- Processes the buffer content and writes to the content buffer.
If the buffer ends with a numbered list pattern or specified tokens, the buffer is flushed to the content buffer. Otherwise, the buffer is left intact for further accumulation.
-
_strip_heartbeat_markers(self, token: str) -> str
- Removes the [[HEARTBEAT]] markers from the token.
-
on_agent_action(self, action: 'AgentAction', **kwargs: 'Any') -> 'Any'
- Run on agent action.
Args: action (AgentAction): The agent action. **kwargs (Any): Additional keyword arguments.
- on_agent_finish(self, finish: 'AgentFinish', **kwargs: 'Any') -> 'None'
- Run on the agent end.
Args: finish (AgentFinish): The agent finish. **kwargs (Any): Additional keyword arguments.
- on_chain_end(self, outputs: 'Dict[str, Any]', **kwargs: 'Any') -> 'None'
- Run when a chain ends running.
Args: outputs (Dict[str, Any]): The outputs of the chain. **kwargs (Any): Additional keyword arguments.
- on_chain_error(self, error: 'BaseException', **kwargs: 'Any') -> 'None'
- Run when chain errors.
Args: error (BaseException): The error that occurred. **kwargs (Any): Additional keyword arguments.
- on_chain_start(self, serialized: 'Dict[str, Any]', inputs: 'Dict[str, Any]', **kwargs: 'Any') -> 'None'
- Run when a chain starts running.
Args: serialized (Dict[str, Any]): The serialized chain. inputs (Dict[str, Any]): The inputs to the chain. **kwargs (Any): Additional keyword arguments.
- on_chat_model_start(self, serialized: 'Dict[str, Any]', messages: 'List[List[BaseMessage]]', **kwargs: 'Any') -> 'None'
- Run when LLM starts running.
Args: serialized (Dict[str, Any]): The serialized LLM. messages (List[List[BaseMessage]]): The messages to run. **kwargs (Any): Additional keyword arguments.
- on_custom_event(self, name: 'str', data: 'Any', *, run_id: 'UUID', tags: 'Optional[List[str]]' = None, metadata: 'Optional[Dict[str, Any]]' = None, **kwargs: 'Any') -> 'Any'
- Override to define a handler for a custom event.
Args: name: The name of the custom event. data: The data for the custom event. Format will match the format specified by the user. run_id: The ID of the run. tags: The tags associated with the custom event (includes inherited tags). metadata: The metadata associated with the custom event (includes inherited metadata).
.. versionadded:: 0.2.15
- on_llm_end(self, response: langchain_core.outputs.llm_result.LLMResult, **kwargs: Any) -> None
- Handles the end of LLM streaming.
Args: response (LLMResult): The result returned by the LLM. **kwargs: Additional keyword arguments.
Writes any remaining buffer content to the content buffer, and sets a signal indicating that the streaming has finished.
- on_llm_error(self, error: 'BaseException', **kwargs: 'Any') -> 'None'
- Run when LLM errors.
Args: error (BaseException): The error that occurred. **kwargs (Any): Additional keyword arguments.
- on_llm_new_token(self, token: str, **kwargs: Any) -> None
- Processes a new token from the LLM output.
Args: token (str): The new token generated by the LLM. **kwargs: Additional keyword arguments.
Accumulates the token in the buffer and processes it based on the current context. The buffer content is written to the content buffer when appropriate tokens or patterns are detected.
- on_llm_start(self, serialized: 'Dict[str, Any]', prompts: 'List[str]', **kwargs: 'Any') -> 'None'
- Run when LLM starts running.
Args: serialized (Dict[str, Any]): The serialized LLM. prompts (List[str]): The prompts to run. **kwargs (Any): Additional keyword arguments.
- on_retriever_end(self, documents: 'Sequence[Document]', *, run_id: 'UUID', parent_run_id: 'Optional[UUID]' = None, **kwargs: 'Any') -> 'Any'
- Run when Retriever ends running.
Args: documents (Sequence[Document]): The documents retrieved. run_id (UUID): The run ID. This is the ID of the current run. parent_run_id (UUID): The parent run ID. This is the ID of the parent run. kwargs (Any): Additional keyword arguments.
- on_retriever_error(self, error: 'BaseException', *, run_id: 'UUID', parent_run_id: 'Optional[UUID]' = None, **kwargs: 'Any') -> 'Any'
- Run when Retriever errors.
Args: error (BaseException): The error that occurred. run_id (UUID): The run ID. This is the ID of the current run. parent_run_id (UUID): The parent run ID. This is the ID of the parent run. kwargs (Any): Additional keyword arguments.
- on_retriever_start(self, serialized: 'Dict[str, Any]', query: 'str', *, run_id: 'UUID', parent_run_id: 'Optional[UUID]' = None, tags: 'Optional[List[str]]' = None, metadata: 'Optional[Dict[str, Any]]' = None, **kwargs: 'Any') -> 'Any'
- Run when the Retriever starts running.
Args: serialized (Dict[str, Any]): The serialized Retriever. query (str): The query. run_id (UUID): The run ID. This is the ID of the current run. parent_run_id (UUID): The parent run ID. This is the ID of the parent run. tags (Optional[List[str]]): The tags. metadata (Optional[Dict[str, Any]]): The metadata. kwargs (Any): Additional keyword arguments.
- on_retry(self, retry_state: 'RetryCallState', *, run_id: 'UUID', parent_run_id: 'Optional[UUID]' = None, **kwargs: 'Any') -> 'Any'
- Run on a retry event.
Args: retry_state (RetryCallState): The retry state. run_id (UUID): The run ID. This is the ID of the current run. parent_run_id (UUID): The parent run ID. This is the ID of the parent run. kwargs (Any): Additional keyword arguments.
- on_text(self, text: 'str', **kwargs: 'Any') -> 'None'
- Run on an arbitrary text.
Args: text (str): The text to print. **kwargs (Any): Additional keyword arguments.
- on_tool_end(self, output: 'Any', **kwargs: 'Any') -> 'None'
- Run when tool ends running.
Args: output (Any): The output of the tool. **kwargs (Any): Additional keyword arguments.
- on_tool_error(self, error: 'BaseException', **kwargs: 'Any') -> 'None'
- Run when tool errors.
Args: error (BaseException): The error that occurred. **kwargs (Any): Additional keyword arguments.
- on_tool_start(self, serialized: 'Dict[str, Any]', input_str: 'str', **kwargs: 'Any') -> 'None'
- Run when the tool starts running.
Args: serialized (Dict[str, Any]): The serialized tool. input_str (str): The input string. **kwargs (Any): Additional keyword arguments.
BufferStreamingStdOutCallbackHandlerAsync
An async callback handler for streaming LLM output to a content buffer.
This class handles the streaming of output from a large language model (LLM), processes tokens from the model output, and writes them to a ContentBuffer. It supports handling different types of tokens and keeps track of code blocks and questions.
Attributes: content_buffer (ContentBuffer): The buffer to which content is streamed. tokens (str): Tokens that indicate the end of a statement, for buffer flushing. buffer (str): Temporary storage for accumulating streamed tokens. stream_finished (asyncio.Event): Signals when the streaming is finished. in_code_block (bool): Indicates whether the current context is a code block.
-
init(self, content_buffer: sunholo.streaming.content_buffer.ContentBuffer, tokens: str = '.?!\n', *args, **kwargs)
- Initializes a new BufferStreamingStdOutCallbackHandler instance.
Args: content_buffer (ContentBuffer): The buffer to which content will be written. tokens (str): Tokens that indicate the end of a statement (default: ".?! "). *args: Additional positional arguments. **kwargs: Additional keyword arguments.
Sets up the callback handler with the given content buffer and tokens. Initializes tracking variables for code blocks, buffer content, and the finished signal.
-
_async_process_buffer(self)
- Asynchronously processes the buffer content and writes to the content buffer.
If the buffer ends with a numbered list pattern or specified tokens, the buffer is flushed to the content buffer. Otherwise, the buffer is left intact for further accumulation.
-
_is_heartbeat_token(self, token: str) -> bool
- Detects if the token is a heartbeat message.
-
_strip_heartbeat_markers(self, token: str) -> str
- Removes the [[HEARTBEAT]] markers from the token.
-
async_on_llm_end(self, response: langchain_core.outputs.llm_result.LLMResult, **kwargs: Any) -> None
- Asynchronously handles the end of LLM streaming.
Args: response (LLMResult): The result returned by the LLM. **kwargs: Additional keyword arguments.
-
async_on_llm_new_token(self, token: str, **kwargs: Any) -> None
- No docstring available.
-
on_agent_action(self, action: 'AgentAction', **kwargs: 'Any') -> 'Any'
- Run on agent action.
Args: action (AgentAction): The agent action. **kwargs (Any): Additional keyword arguments.
- on_agent_finish(self, finish: 'AgentFinish', **kwargs: 'Any') -> 'None'
- Run on the agent end.
Args: finish (AgentFinish): The agent finish. **kwargs (Any): Additional keyword arguments.
- on_chain_end(self, outputs: 'Dict[str, Any]', **kwargs: 'Any') -> 'None'
- Run when a chain ends running.
Args: outputs (Dict[str, Any]): The outputs of the chain. **kwargs (Any): Additional keyword arguments.
- on_chain_error(self, error: 'BaseException', **kwargs: 'Any') -> 'None'
- Run when chain errors.
Args: error (BaseException): The error that occurred. **kwargs (Any): Additional keyword arguments.
- on_chain_start(self, serialized: 'Dict[str, Any]', inputs: 'Dict[str, Any]', **kwargs: 'Any') -> 'None'
- Run when a chain starts running.
Args: serialized (Dict[str, Any]): The serialized chain. inputs (Dict[str, Any]): The inputs to the chain. **kwargs (Any): Additional keyword arguments.
- on_chat_model_start(self, serialized: 'Dict[str, Any]', messages: 'List[List[BaseMessage]]', **kwargs: 'Any') -> 'None'
- Run when LLM starts running.
Args: serialized (Dict[str, Any]): The serialized LLM. messages (List[List[BaseMessage]]): The messages to run. **kwargs (Any): Additional keyword arguments.
- on_custom_event(self, name: 'str', data: 'Any', *, run_id: 'UUID', tags: 'Optional[List[str]]' = None, metadata: 'Optional[Dict[str, Any]]' = None, **kwargs: 'Any') -> 'Any'
- Override to define a handler for a custom event.
Args: name: The name of the custom event. data: The data for the custom event. Format will match the format specified by the user. run_id: The ID of the run. tags: The tags associated with the custom event (includes inherited tags). metadata: The metadata associated with the custom event (includes inherited metadata).
.. versionadded:: 0.2.15
- on_llm_end(self, response: 'LLMResult', **kwargs: 'Any') -> 'None'
- Run when LLM ends running.
Args: response (LLMResult): The response from the LLM. **kwargs (Any): Additional keyword arguments.
- on_llm_error(self, error: 'BaseException', **kwargs: 'Any') -> 'None'
- Run when LLM errors.
Args: error (BaseException): The error that occurred. **kwargs (Any): Additional keyword arguments.
- on_llm_new_token(self, token: 'str', **kwargs: 'Any') -> 'None'
- Run on new LLM token. Only available when streaming is enabled.
Args: token (str): The new token. **kwargs (Any): Additional keyword arguments.
- on_llm_start(self, serialized: 'Dict[str, Any]', prompts: 'List[str]', **kwargs: 'Any') -> 'None'
- Run when LLM starts running.
Args: serialized (Dict[str, Any]): The serialized LLM. prompts (List[str]): The prompts to run. **kwargs (Any): Additional keyword arguments.
- on_retriever_end(self, documents: 'Sequence[Document]', *, run_id: 'UUID', parent_run_id: 'Optional[UUID]' = None, **kwargs: 'Any') -> 'Any'
- Run when Retriever ends running.
Args: documents (Sequence[Document]): The documents retrieved. run_id (UUID): The run ID. This is the ID of the current run. parent_run_id (UUID): The parent run ID. This is the ID of the parent run. kwargs (Any): Additional keyword arguments.
- on_retriever_error(self, error: 'BaseException', *, run_id: 'UUID', parent_run_id: 'Optional[UUID]' = None, **kwargs: 'Any') -> 'Any'
- Run when Retriever errors.
Args: error (BaseException): The error that occurred. run_id (UUID): The run ID. This is the ID of the current run. parent_run_id (UUID): The parent run ID. This is the ID of the parent run. kwargs (Any): Additional keyword arguments.
- on_retriever_start(self, serialized: 'Dict[str, Any]', query: 'str', *, run_id: 'UUID', parent_run_id: 'Optional[UUID]' = None, tags: 'Optional[List[str]]' = None, metadata: 'Optional[Dict[str, Any]]' = None, **kwargs: 'Any') -> 'Any'
- Run when the Retriever starts running.
Args: serialized (Dict[str, Any]): The serialized Retriever. query (str): The query. run_id (UUID): The run ID. This is the ID of the current run. parent_run_id (UUID): The parent run ID. This is the ID of the parent run. tags (Optional[List[str]]): The tags. metadata (Optional[Dict[str, Any]]): The metadata. kwargs (Any): Additional keyword arguments.
- on_retry(self, retry_state: 'RetryCallState', *, run_id: 'UUID', parent_run_id: 'Optional[UUID]' = None, **kwargs: 'Any') -> 'Any'
- Run on a retry event.
Args: retry_state (RetryCallState): The retry state. run_id (UUID): The run ID. This is the ID of the current run. parent_run_id (UUID): The parent run ID. This is the ID of the parent run. kwargs (Any): Additional keyword arguments.
- on_text(self, text: 'str', **kwargs: 'Any') -> 'None'
- Run on an arbitrary text.
Args: text (str): The text to print. **kwargs (Any): Additional keyword arguments.
- on_tool_end(self, output: 'Any', **kwargs: 'Any') -> 'None'
- Run when tool ends running.
Args: output (Any): The output of the tool. **kwargs (Any): Additional keyword arguments.
- on_tool_error(self, error: 'BaseException', **kwargs: 'Any') -> 'None'
- Run when tool errors.
Args: error (BaseException): The error that occurred. **kwargs (Any): Additional keyword arguments.
- on_tool_start(self, serialized: 'Dict[str, Any]', input_str: 'str', **kwargs: 'Any') -> 'None'
- Run when the tool starts running.
Args: serialized (Dict[str, Any]): The serialized tool. input_str (str): The input string. **kwargs (Any): Additional keyword arguments.
ContentBuffer
A buffer class for storing and managing textual content.
This class provides methods to write text to the buffer, read the entire buffer content, and clear the buffer content. The buffer can be used to collect text output for further processing or inspection.
Attributes: content (str): Stores the textual content of the buffer.
- init(self)
- Initializes a new ContentBuffer instance.
The content buffer starts with an empty string, and logging is initialized to indicate that the buffer has been created.
- async_clear(self)
- Asynchronously clears the content buffer.
Empties the buffer content, resetting it to an empty string.
- async_read(self) -> str
- Asynchronously reads the entire content from the buffer.
Returns: str: The content of the buffer.
- async_write(self, text: str)
- Asynchronously writes text to the content buffer.
Args: text (str): The text to be added to the buffer.
Adds the given text to the existing content of the buffer.
- clear(self)
- Clears the content buffer.
Empties the buffer content, resetting it to an empty string.
- read(self) -> str
- Reads the entire content from the buffer.
Returns: str: The content of the buffer.
Provides the entire content stored in the buffer.
- write(self, text: str)
- Writes text to the content buffer.
Args: text (str): The text to be added to the buffer.
Adds the given text to the existing content of the buffer.