Skip to main content

vac_routes.py

Source: sunholo/agents/flask/vac_routes.py

Classes

VACRoutes

Usage Example:

from agents.flask import VACRoutes

app = Flask(__name__)

def stream_interpreter(question, vector_name, chat_history, **kwargs):
# Implement your streaming logic
...

def vac_interpreter(question, vector_name, chat_history, **kwargs):
# Implement your static VAC logic
...

vac_routes = VACRoutes(app, stream_interpreter, vac_interpreter)

if __name__ == "__main__":
app.run(debug=True)
  • init(self, app, stream_interpreter, vac_interpreter=None, additional_routes=None)

    • Initialize self. See help(type(self)) for accurate signature.
  • _async_generator_to_stream(async_gen_func)

    • Helper function to stream the async generator's values to the client.
  • check_authentication(self)

    • No docstring available.
  • create_langfuse_trace(self, request, vector_name, trace_id)

    • No docstring available.
  • handle_file_upload(self, file, vector_name)

    • No docstring available.
  • handle_openai_compatible_endpoint(self, vector_name=None)

    • No docstring available.
  • handle_options(self, **kwargs)

    • No docstring available.
  • handle_process_vac(self, vector_name)

    • No docstring available.
  • handle_stream_vac(self, vector_name)

    • No docstring available.
  • health(self)

    • No docstring available.
  • home(self)

    • No docstring available.
  • langfuse_eval_response(self, trace_id, eval_percent=0.01)

    • Sends an evaluation message based on a probability defined by eval_percent.

Args: eval_percent (float): The probability (0 to 1) of triggering the evaluation. trace_id (str): The trace identifier for the evaluation.

Returns: None

  • make_openai_response(self, user_message, vector_name, answer)

    • No docstring available.
  • openai_health_endpoint()

    • No docstring available.
  • prep_vac(self, request, vector_name)

    • No docstring available.
  • register_additional_routes(self)

    • Registers additional custom routes provided during initialization.

Example:

from flask import Flask, jsonify
from agents.flask import VACRoutes

app = Flask(__name__)

def stream_interpreter(question, vector_name, chat_history, **kwargs):
# Implement your streaming logic
...

def vac_interpreter(question, vector_name, chat_history, **kwargs):
# Implement your static VAC logic
...

def custom_handler():
return jsonify({"message": "Custom route!"})

custom_routes = [
{
"rule": "/custom",
"methods": ["GET"],
"handler": custom_handler
}
]

vac_routes = VACRoutes(app, stream_interpreter, vac_interpreter, additional_routes=custom_routes)

if __name__ == "__main__":
app.run(debug=True)
  • register_after_request(self, response)

    • No docstring available.
  • register_routes(self)

    • Registers all the VAC routes for the Flask application.
  • vac_interpreter_default(self, question: str, vector_name: str, chat_history=[], **kwargs)

    • No docstring available.
Sunholo Multivac

Get in touch to see if we can help with your GenAI project.

Contact us

Other Links

Sunholo Multivac - GenAIOps

Copyright ©

Holosun ApS 2024