vac_routes.py
Source: sunholo/agents/flask/vac_routes.py
Classes
VACRoutes
Usage Example:
from agents.flask import VACRoutes
app = Flask(__name__)
def stream_interpreter(question, vector_name, chat_history, **kwargs):
# Implement your streaming logic
...
def vac_interpreter(question, vector_name, chat_history, **kwargs):
# Implement your static VAC logic
...
vac_routes = VACRoutes(app, stream_interpreter, vac_interpreter)
if __name__ == "__main__":
app.run(debug=True)
-
init(self, app, stream_interpreter, vac_interpreter=None, additional_routes=None)
- Initialize self. See help(type(self)) for accurate signature.
-
_async_generator_to_stream(async_gen_func)
- Helper function to stream the async generator's values to the client.
-
check_authentication(self)
- No docstring available.
-
create_langfuse_trace(self, request, vector_name, trace_id)
- No docstring available.
-
handle_file_upload(self, file, vector_name)
- No docstring available.
-
handle_openai_compatible_endpoint(self, vector_name=None)
- No docstring available.
-
handle_options(self, **kwargs)
- No docstring available.
-
handle_process_vac(self, vector_name)
- No docstring available.
-
handle_stream_vac(self, vector_name)
- No docstring available.
-
health(self)
- No docstring available.
-
home(self)
- No docstring available.
-
langfuse_eval_response(self, trace_id, eval_percent=0.01)
- Sends an evaluation message based on a probability defined by eval_percent.
Args: eval_percent (float): The probability (0 to 1) of triggering the evaluation. trace_id (str): The trace identifier for the evaluation.
Returns: None
-
make_openai_response(self, user_message, vector_name, answer)
- No docstring available.
-
openai_health_endpoint()
- No docstring available.
-
prep_vac(self, request, vector_name)
- No docstring available.
-
register_additional_routes(self)
- Registers additional custom routes provided during initialization.
Example:
from flask import Flask, jsonify
from agents.flask import VACRoutes
app = Flask(__name__)
def stream_interpreter(question, vector_name, chat_history, **kwargs):
# Implement your streaming logic
...
def vac_interpreter(question, vector_name, chat_history, **kwargs):
# Implement your static VAC logic
...
def custom_handler():
return jsonify({"message": "Custom route!"})
custom_routes = [
{
"rule": "/custom",
"methods": ["GET"],
"handler": custom_handler
}
]
vac_routes = VACRoutes(app, stream_interpreter, vac_interpreter, additional_routes=custom_routes)
if __name__ == "__main__":
app.run(debug=True)
-
register_after_request(self, response)
- No docstring available.
-
register_routes(self)
- Registers all the VAC routes for the Flask application.
-
vac_interpreter_default(self, question: str, vector_name: str, chat_history=[], **kwargs)
- No docstring available.