content_buffer.py
Source: sunholo/streaming/content_buffer.py
Classes
BufferStreamingStdOutCallbackHandler
A callback handler for streaming LLM output to a content buffer.
This class handles the streaming of output from a large language model (LLM), processes tokens from the model output, and writes them to a ContentBuffer. It supports handling different types of tokens and keeps track of code blocks and questions.
Attributes: content_buffer (ContentBuffer): The buffer to which content is streamed. tokens (str): Tokens that indicate the end of a statement, for buffer flushing. buffer (str): Temporary storage for accumulating streamed tokens. stream_finished (threading.Event): Signals when the streaming is finished. in_code_block (bool): Indicates whether the current context is a code block. in_question_block (bool): Indicates whether the current context is a question block. question_buffer (str): Stores the accumulated questions.
-
init(self, content_buffer: sunholo.streaming.content_buffer.ContentBuffer, tokens: str = '.?!\n', *args, **kwargs)
- Initializes a new BufferStreamingStdOutCallbackHandler instance.
Args: content_buffer (ContentBuffer): The buffer to which content will be written. tokens (str): Tokens that indicate the end of a statement (default: ".?! "). *args: Additional positional arguments. **kwargs: Additional keyword arguments.
Sets up the callback handler with the given content buffer and tokens. Initializes tracking variables for code blocks, buffer content, and the finished signal.
-
_is_heartbeat_token(self, token: str) -> bool
- Detects if the token is a heartbeat message.
-
_process_buffer(self)
- Processes the buffer content and writes to the content buffer.
If the buffer ends with a numbered list pattern or specified tokens, the buffer is flushed to the content buffer. Otherwise, the buffer is left intact for further accumulation.
-
_strip_heartbeat_markers(self, token: str) -> str
- Removes the [[HEARTBEAT]] markers from the token.
-
on_llm_end(self, response, **kwargs: Any) -> None
- Handles the end of LLM streaming.
Args: response: The result returned by the LLM. **kwargs: Additional keyword arguments.
Writes any remaining buffer content to the content buffer, and sets a signal indicating that the streaming has finished.
- on_llm_new_token(self, token: str, **kwargs: Any) -> None
- Processes a new token from the LLM output.
Args: token (str): The new token generated by the LLM. **kwargs: Additional keyword arguments.
Accumulates the token in the buffer and processes it based on the current context. The buffer content is written to the content buffer when appropriate tokens or patterns are detected.
BufferStreamingStdOutCallbackHandlerAsync
An async callback handler for streaming LLM output to a content buffer.
This class handles the streaming of output from a large language model (LLM), processes tokens from the model output, and writes them to a ContentBuffer. It supports handling different types of tokens and keeps track of code blocks and questions.
Attributes: content_buffer (ContentBuffer): The buffer to which content is streamed. tokens (str): Tokens that indicate the end of a statement, for buffer flushing. buffer (str): Temporary storage for accumulating streamed tokens. stream_finished (asyncio.Event): Signals when the streaming is finished. in_code_block (bool): Indicates whether the current context is a code block.
-
init(self, content_buffer: sunholo.streaming.content_buffer.ContentBuffer, tokens: str = '.?!\n', *args, **kwargs)
- Initializes a new BufferStreamingStdOutCallbackHandler instance.
Args: content_buffer (ContentBuffer): The buffer to which content will be written. tokens (str): Tokens that indicate the end of a statement (default: ".?! "). *args: Additional positional arguments. **kwargs: Additional keyword arguments.
Sets up the callback handler with the given content buffer and tokens. Initializes tracking variables for code blocks, buffer content, and the finished signal.
-
_async_process_buffer(self)
- Asynchronously processes the buffer content and writes to the content buffer.
If the buffer ends with a numbered list pattern or specified tokens, the buffer is flushed to the content buffer. Otherwise, the buffer is left intact for further accumulation.
-
_is_heartbeat_token(self, token: str) -> bool
- Detects if the token is a heartbeat message.
-
_strip_heartbeat_markers(self, token: str) -> str
- Removes the [[HEARTBEAT]] markers from the token.
-
async_on_llm_end(self, response, **kwargs: Any) -> None
- Asynchronously handles the end of LLM streaming.
Args: response: The result returned by the LLM. **kwargs: Additional keyword arguments.
- async_on_llm_new_token(self, token: str, **kwargs: Any) -> None
- No docstring available.
ContentBuffer
A buffer class for storing and managing textual content.
This class provides methods to write text to the buffer, read the entire buffer content, and clear the buffer content. The buffer can be used to collect text output for further processing or inspection.
Attributes: content (str): Stores the textual content of the buffer.
- init(self)
- Initializes a new ContentBuffer instance.
The content buffer starts with an empty string, and logging is initialized to indicate that the buffer has been created.
- async_clear(self)
- Asynchronously clears the content buffer.
Empties the buffer content, resetting it to an empty string.
- async_read(self) -> str
- Asynchronously reads the entire content from the buffer.
Returns: str: The content of the buffer.
- async_write(self, text: str)
- Asynchronously writes text to the content buffer.
Args: text (str): The text to be added to the buffer.
Adds the given text to the existing content of the buffer.
- clear(self)
- Clears the content buffer.
Empties the buffer content, resetting it to an empty string.
- read(self) -> str
- Reads the entire content from the buffer.
Returns: str: The content of the buffer.
Provides the entire content stored in the buffer.
- write(self, text: str)
- Writes text to the content buffer.
Args: text (str): The text to be added to the buffer.
Adds the given text to the existing content of the buffer.