Skip to main content

content_buffer.py

Source: sunholo/streaming/content_buffer.py

Classes

BufferStreamingStdOutCallbackHandler

A callback handler for streaming LLM output to a content buffer.

This class handles the streaming of output from a large language model (LLM), processes tokens from the model output, and writes them to a ContentBuffer. It supports handling different types of tokens and keeps track of code blocks and questions.

Attributes: content_buffer (ContentBuffer): The buffer to which content is streamed. tokens (str): Tokens that indicate the end of a statement, for buffer flushing. buffer (str): Temporary storage for accumulating streamed tokens. stream_finished (threading.Event): Signals when the streaming is finished. in_code_block (bool): Indicates whether the current context is a code block. in_question_block (bool): Indicates whether the current context is a question block. question_buffer (str): Stores the accumulated questions.

  • init(self, content_buffer: sunholo.streaming.content_buffer.ContentBuffer, tokens: str = '.?!\n', *args, **kwargs)

    • Initializes a new BufferStreamingStdOutCallbackHandler instance.

    Args: content_buffer (ContentBuffer): The buffer to which content will be written. tokens (str): Tokens that indicate the end of a statement (default: ".?! "). *args: Additional positional arguments. **kwargs: Additional keyword arguments.

    Sets up the callback handler with the given content buffer and tokens. Initializes tracking variables for code blocks, buffer content, and the finished signal.

  • _process_buffer(self)

    • Processes the buffer content and writes to the content buffer.

If the buffer ends with a numbered list pattern or specified tokens, the buffer is flushed to the content buffer. Otherwise, the buffer is left intact for further accumulation.

  • on_agent_action(self, action: 'AgentAction', **kwargs: 'Any') -> 'Any'
    • Run on agent action.

Args: action (AgentAction): The agent action. **kwargs (Any): Additional keyword arguments.

  • on_agent_finish(self, finish: 'AgentFinish', **kwargs: 'Any') -> 'None'
    • Run on the agent end.

Args: finish (AgentFinish): The agent finish. **kwargs (Any): Additional keyword arguments.

  • on_chain_end(self, outputs: 'Dict[str, Any]', **kwargs: 'Any') -> 'None'
    • Run when a chain ends running.

Args: outputs (Dict[str, Any]): The outputs of the chain. **kwargs (Any): Additional keyword arguments.

  • on_chain_error(self, error: 'BaseException', **kwargs: 'Any') -> 'None'
    • Run when chain errors.

Args: error (BaseException): The error that occurred. **kwargs (Any): Additional keyword arguments.

  • on_chain_start(self, serialized: 'Dict[str, Any]', inputs: 'Dict[str, Any]', **kwargs: 'Any') -> 'None'
    • Run when a chain starts running.

Args: serialized (Dict[str, Any]): The serialized chain. inputs (Dict[str, Any]): The inputs to the chain. **kwargs (Any): Additional keyword arguments.

  • on_chat_model_start(self, serialized: 'Dict[str, Any]', messages: 'List[List[BaseMessage]]', **kwargs: 'Any') -> 'None'
    • Run when LLM starts running.

Args: serialized (Dict[str, Any]): The serialized LLM. messages (List[List[BaseMessage]]): The messages to run. **kwargs (Any): Additional keyword arguments.

  • on_llm_end(self, response: langchain_core.outputs.llm_result.LLMResult, **kwargs: Any) -> None
    • Handles the end of LLM streaming.

Args: response (LLMResult): The result returned by the LLM. **kwargs: Additional keyword arguments.

Writes any remaining buffer content to the content buffer, and sets a signal indicating that the streaming has finished.

  • on_llm_error(self, error: 'BaseException', **kwargs: 'Any') -> 'None'
    • Run when LLM errors.

Args: error (BaseException): The error that occurred. **kwargs (Any): Additional keyword arguments.

  • on_llm_new_token(self, token: str, **kwargs: Any) -> None
    • Processes a new token from the LLM output.

Args: token (str): The new token generated by the LLM. **kwargs: Additional keyword arguments.

Accumulates the token in the buffer and processes it based on the current context. The buffer content is written to the content buffer when appropriate tokens or patterns are detected.

  • on_llm_start(self, serialized: 'Dict[str, Any]', prompts: 'List[str]', **kwargs: 'Any') -> 'None'
    • Run when LLM starts running.

Args: serialized (Dict[str, Any]): The serialized LLM. prompts (List[str]): The prompts to run. **kwargs (Any): Additional keyword arguments.

  • on_retriever_end(self, documents: 'Sequence[Document]', *, run_id: 'UUID', parent_run_id: 'Optional[UUID]' = None, **kwargs: 'Any') -> 'Any'

    • Run when Retriever ends running.
  • on_retriever_error(self, error: 'BaseException', *, run_id: 'UUID', parent_run_id: 'Optional[UUID]' = None, **kwargs: 'Any') -> 'Any'

    • Run when Retriever errors.
  • on_retriever_start(self, serialized: 'Dict[str, Any]', query: 'str', *, run_id: 'UUID', parent_run_id: 'Optional[UUID]' = None, tags: 'Optional[List[str]]' = None, metadata: 'Optional[Dict[str, Any]]' = None, **kwargs: 'Any') -> 'Any'

    • Run when the Retriever starts running.

Args: serialized (Dict[str, Any]): The serialized Retriever. query (str): The query. run_id (UUID): The run ID. This is the ID of the current run. parent_run_id (UUID): The parent run ID. This is the ID of the parent run. tags (Optional[List[str]]): The tags. metadata (Optional[Dict[str, Any]]): The metadata. kwargs (Any): Additional keyword arguments.

  • on_retry(self, retry_state: 'RetryCallState', *, run_id: 'UUID', parent_run_id: 'Optional[UUID]' = None, **kwargs: 'Any') -> 'Any'
    • Run on a retry event.

Args: retry_state (RetryCallState): The retry state. run_id (UUID): The run ID. This is the ID of the current run. parent_run_id (UUID): The parent run ID. This is the ID of the parent run. kwargs (Any): Additional keyword arguments.

  • on_text(self, text: 'str', **kwargs: 'Any') -> 'None'
    • Run on an arbitrary text.

Args: text (str): The text to print. **kwargs (Any): Additional keyword arguments.

  • on_tool_end(self, output: 'Any', **kwargs: 'Any') -> 'None'
    • Run when tool ends running.

Args: output (Any): The output of the tool. **kwargs (Any): Additional keyword arguments.

  • on_tool_error(self, error: 'BaseException', **kwargs: 'Any') -> 'None'
    • Run when tool errors.

Args: error (BaseException): The error that occurred. **kwargs (Any): Additional keyword arguments.

  • on_tool_start(self, serialized: 'Dict[str, Any]', input_str: 'str', **kwargs: 'Any') -> 'None'
    • Run when the tool starts running.

Args: serialized (Dict[str, Any]): The serialized tool. input_str (str): The input string. **kwargs (Any): Additional keyword arguments.

ContentBuffer

A buffer class for storing and managing textual content.

This class provides methods to write text to the buffer, read the entire buffer content, and clear the buffer content. The buffer can be used to collect text output for further processing or inspection.

Attributes: content (str): Stores the textual content of the buffer.

  • init(self)
    • Initializes a new ContentBuffer instance.

The content buffer starts with an empty string, and logging is initialized to indicate that the buffer has been created.

  • clear(self)
    • Clears the content buffer.

Empties the buffer content, resetting it to an empty string.

  • read(self) -> str
    • Reads the entire content from the buffer.

Returns: str: The content of the buffer.

Provides the entire content stored in the buffer.

  • write(self, text: str)
    • Writes text to the content buffer.

Args: text (str): The text to be added to the buffer.

Adds the given text to the existing content of the buffer.

Sunholo Multivac

Get in touch to see if we can help with your GenAI project.

Contact us

Other Links

Sunholo Multivac - GenAIOps

Copyright ©

Holosun ApS 2024