Skip to main content

streaming.py

Source: sunholo/streaming/streaming.py

Functions

start_streaming_chat_async(question, vector_name, qna_func, chat_history=[], wait_time=2, timeout=120, **kwargs)

Asynchronously initiates a chat session that streams responses back to the caller in real-time. This function manages the chat by starting a separate thread to handle the chat interaction, while the main coroutine periodically sends out chat responses as they become available.

Parameters:

  • question (str): The initial question to start the chat with.
  • vector_name (str): The vector configuration that determines the behavior of the AI in the chat.
  • qna_func (callable): The function that processes the chat and generates responses.
  • chat_history (list, optional): A list of previous messages in the chat session to maintain context.
  • wait_time (int, optional): The interval in seconds between checking for new chat responses.
  • timeout (int, optional): The maximum time in seconds to wait for a new response before timing out.
  • **kwargs: Additional keyword arguments that are passed to the qna_func.

Yields: str: Outputs from the chat function, which could include processing status messages, chat responses, or final results.

Example:

import asyncio

async def example_usage():
async for message in start_streaming_chat_async(
question="Hello, what's the weather today?",
vector_name="weatherBot",
qna_func=fetch_weather_response,
wait_time=1,
timeout=30):
print(message)

def fetch_weather_response(question, vector_name, chat_history, callback, **kwargs):
# Example callback usage, assuming `callback` handles the response formatting.
responses = ["It's sunny.", "Do you want to know tomorrow's weather?"]
for response in responses:
callback.handle_message(response)
time.sleep(1) # Simulate delay in response generation
return {"status": "completed", "data": responses}

if __name__ == '__main__':
asyncio.run(example_usage())

This example demonstrates setting up an asynchronous streaming chat with a simple Q&A function simulating weather responses. The start_streaming_chat_async function is used in a coroutine that prints messages as they are generated.

start_streaming_chat(question, vector_name, qna_func, chat_history=[], wait_time=2, timeout=120, **kwargs)

No docstring available.

generate_proxy_stream(stream_to_f, user_input, vector_name, chat_history, generate_f_output, **kwargs)

Generator function for proxying and processing streaming output from an underlying model.

This function serves as an adapter for streaming responses from different model agents. It handles the synchronous nature of streaming, parses streaming content, and adapts the output format based on the configured agent type.

Args: stream_to_f: The underlying function that generates streaming responses (e.g., from a language model). user_input (str): The user's input query or message. vector_name (str): The name of the vector store being used. chat_history (list): A list of previous chat messages for context. generate_f_output: A function that processes the raw streaming output from stream_to_f. **kwargs: Additional keyword arguments to pass to stream_to_f.

Yields: str: Parsed and processed chunks of text from the streaming output.

Raises: ValueError: If an invalid agent type is configured.

Examples:

def my_stream_to_function(user_input, vector_name, chat_history, stream=True, **kwargs):

... your custom streaming logic ...

def my_generate_f_output(output):

... your custom output processing ...

for output in generate_proxy_stream( my_stream_to_function, "Hello, how are you?", "my_vector_store", [], my_generate_f_output ): print(output) # Process each streaming output chunk

generate_proxy_stream_async(stream_to_f, user_input, vector_name, chat_history, generate_f_output, **kwargs)

Asynchronous generator function for proxying and processing streaming output from an underlying model.

This function serves as an adapter for streaming responses from different model agents. It handles the asynchronous nature of streaming, parses streaming content, and adapts the output format based on the configured agent type.

Args: stream_to_f: The underlying asynchronous function that generates streaming responses (e.g., from a language model). user_input (str): The user's input query or message. vector_name (str): The name of the vector store being used. chat_history (list): A list of previous chat messages for context. generate_f_output: A function that processes the raw streaming output from stream_to_f. **kwargs: Additional keyword arguments to pass to stream_to_f.

Yields: str: Parsed and processed chunks of text from the streaming output.

Raises: ValueError: If an invalid agent type is configured.

Examples:

async def my_stream_to_function(user_input, vector_name, chat_history, stream=True, **kwargs):

... your custom streaming logic ...

async def my_generate_f_output(output):

... your custom output processing ...

async for output in generate_proxy_stream_async( my_stream_to_function, "Hello, how are you?", "my_vector_store", [], my_generate_f_output ): print(output) # Process each streaming output chunk

process_streaming_content(streaming_content, generate_f_output, json_buffer, inside_json)

No docstring available.

Sunholo Multivac

Get in touch to see if we can help with your GenAI project.

Contact us

Other Links

Sunholo Multivac - GenAIOps

Copyright ©

Holosun ApS 2024