alloydb_client.py
Source: src/sunholo/database/alloydb_client.py
Classes
AlloyDBClient
A class to manage interactions with an AlloyDB instance.
Example Usage:
client = AlloyDBClient(
project_id="your-project-id",
region="your-region",
cluster_name="your-cluster-name",
instance_name="your-instance-name",
user="your-db-user",
password="your-db-password"
)
# Create a database
client.execute_sql("CREATE DATABASE my_database")
# Execute other SQL statements
client.execute_sql("CREATE TABLE my_table (id INT, name VARCHAR(50))")
- init(self, config: sunholo.utils.config_class.ConfigManager = None, project_id: str = None, region: str = None, cluster_name: str = None, instance_name: str = None, user: str = None, password: str = None, db='postgres')
- Initializes the AlloyDB client.
- project_id (str): GCP project ID where the AlloyDB instance resides.
- region (str): The region where the AlloyDB instance is located.
- cluster_name (str): The name of the AlloyDB cluster.
- instance_name (str): The name of the AlloyDB instance.
- user (str): If user is None will use the default service email
- db_name (str): The name of the database.
-
_and_or_ilike(sources, search_type='OR', operator='ILIKE')
- No docstring available.
-
_build_instance_uri(self, project_id, region, cluster_name, instance_name)
- No docstring available.
-
_create_engine(self)
- No docstring available.
-
_create_engine_from_pg8000(self, user, password, db)
- No docstring available.
-
_execute_sql_async_langchain(self, sql_statement)
- No docstring available.
-
_execute_sql_async_pg8000(self, sql_statement, values=None)
- Executes a given SQL statement asynchronously with error handling.
Args: sql_statement (str): The SQL statement to execute values (list, optional): Values for parameterized query
Returns: Result of SQL execution
-
_execute_sql_langchain(self, sql_statement)
- No docstring available.
-
_execute_sql_pg8000(self, sql_statement, params=None)
- Executes a given SQL statement with error handling.
Args: sql_statement (str): The SQL statement to execute. params (dict or list, optional): Parameters for the SQL statement.
Returns: The result of the execution, if any.
- _flatten_dict(self, nested_dict, parent_key='', separator='.')
- Flatten a nested dictionary into a single-level dictionary with dot notation for keys.
Args: nested_dict (dict): The nested dictionary to flatten parent_key (str): The parent key for the current recursion level separator (str): The separator to use between key levels (default: '.')
Returns: dict: A flattened dictionary with special handling for lists
- _flatten_dict_for_schema(self, nested_dict, parent_key='', separator='.')
- Flatten a nested dictionary for schema creation.
Args: nested_dict (dict): The nested dictionary to flatten parent_key (str): The parent key for the current recursion level separator (str): The separator to use between key levels
Returns: dict: A flattened dictionary
-
_get_document_from_docstore(self, source: str, vector_name: str)
- No docstring available.
-
_get_document_via_docid(self, source: str, vector_name: str, doc_id: str)
- No docstring available.
-
_get_embedder(self)
- No docstring available.
-
_get_sources_from_docstore(self, sources, vector_name, search_type='OR')
- Helper function to build the SQL query for fetching sources.
-
_get_sql_type(self, value)
- Helper method to determine SQL type from a Python value.
Args: value: The value to determine the column type
Returns: str: SQL type
- _get_sql_type_safe(self, value)
- Enhanced version of _get_sql_type with better type detection. Handles placeholder values and common patterns.
Args: value: The value to determine the column type
Returns: str: SQL type
- _insert_single_row(self, table_name: str, data: dict, metadata: dict = None, primary_key_column: str = 'id')
- Inserts a single row of data into the specified table.
Args: table_name (str): Name of the table data (dict): Data to write to the table metadata (dict, optional): Additional metadata to include
Returns: Result of SQL execution
-
_list_sources_from_docstore(self, sources, vector_name, search_type='OR')
- Helper function to build the SQL query for listing sources.
-
_similarity_search(self, query, source_filter: str = '', free_filter: str = None, vector_name: str = None)
- No docstring available.
-
asimilarity_search(self, query: str, source_filter: str = '', free_filter: str = None, k: int = 5, vector_name: str = None)
- No docstring available.
-
check_connection(self)
- Checks if the database connection is still valid.
Returns: bool: True if connection is valid, False otherwise
- check_row(self, table_name: str, primary_key_column: str, primary_key_value: str, columns: list = None, condition: str = None)
- Retrieves a row from the specified table based on the primary key.
Args: table_name (str): Name of the table to query primary_key_column (str): Name of the primary key column (e.g., 'id') primary_key_value (str): Value of the primary key for the row to retrieve columns (list, optional): List of column names to retrieve. If None, retrieves all columns condition (str, optional): Additional condition for the WHERE clause
Returns: The row data if found, None otherwise
-
close(self)
- Properly close the database connection.
-
create_database(self, database_name)
- No docstring available.
-
create_docstore_table(self, vector_name: str, users)
- No docstring available.
-
create_index(self, vectorstore=None)
- No docstring available.
-
create_schema(self, schema_name='public')
- No docstring available.
-
create_table_from_schema(self, table_name: str, schema_data: dict, users: list = None)
- Creates or ensures a table exists based on the structure of the provided schema data, with special handling for expandable lists.
Args: table_name (str): Name of the table to create schema_data (dict): Data structure that matches the expected schema users (list, optional): List of users to grant permissions to
Returns: Result of SQL execution
- create_table_with_columns(self, table_name, column_definitions, if_not_exists=True, primary_key_column='id')
- Create a table with explicit column definitions.
Args: table_name (str): The name of the table to create column_definitions (list): List of column definition dictionaries:
-
name: Column name
-
type: PostgreSQL data type
-
nullable: Whether column allows NULL (default True)
-
default: Default value expression (optional)
-
primary_key: Whether this is a primary key (default False) if_not_exists (bool): Whether to use IF NOT EXISTS clause primary_key_column (str): default name of primary key if not specified in column_definitions
Returns: Result of the execution
-
create_tables(self, vector_name, users)
- No docstring available.
-
create_vectorstore_table(self, vector_name: str, users)
- No docstring available.
-
delete_sources_from_alloydb(self, sources, vector_name)
- Deletes from both vectorstore and docstore
-
ensure_connected(self)
- Ensures the database connection is valid, attempting to reconnect if necessary.
Returns: bool: True if connection is valid or reconnection successful, False otherwise
-
execute_sql(self, sql_statement)
- No docstring available.
-
execute_sql_async(self, sql_statement)
- No docstring available.
-
fetch_owners(self)
- No docstring available.
-
get_document_from_docstore(self, source: str, vector_name)
- No docstring available.
-
get_document_from_docstore_async(self, source: str, vector_name: str)
- No docstring available.
-
get_sources_from_docstore(self, sources, vector_name, search_type='OR', just_source_name=False)
- Fetches sources from the docstore.
-
get_sources_from_docstore_async(self, sources, vector_name, search_type='OR', just_source_name=False)
- Fetches sources from the docstore asynchronously.
-
get_table_columns(self, table_name, schema='public')
- Fetch column information for an existing table.
Args: table_name (str): The table name to get columns for schema (str): Database schema, defaults to "public"
Returns: List[dict]: List of column information dictionaries with keys:
- name: column name
- type: PostgreSQL data type
- is_nullable: whether the column allows NULL values
- default: default value if any
-
get_vectorstore(self, vector_name: str = None)
- No docstring available.
-
grant_schema_permissions(self, schema_name, users)
- No docstring available.
-
grant_table_permissions(self, table_name, users)
- No docstring available.
-
insert_rows_safely(self, table_name, rows, metadata=None, continue_on_error=False, primary_key_column='id')
- Insert multiple rows into a table with error handling for individual rows.
Args: table_name (str): The table to insert into rows (list): List of dictionaries containing row data metadata (dict, optional): Additional metadata to include in each row continue_on_error (bool): Whether to continue if some rows fail primary_key_column (str): The primary key in the table, default 'id'
Returns: dict: { 'success': bool, 'total_rows': int, 'inserted_rows': int, 'failed_rows': int, 'errors': list of errors with row data }
- map_data_to_columns(self, data, column_info, case_sensitive=False)
- Map data dictionary to available table columns, handling case sensitivity.
Args: data (dict): Dictionary of data to map column_info (list): List of column information dictionaries from get_table_columns case_sensitive (bool): Whether to match column names case-sensitively
Returns: dict: Filtered data dictionary with only columns that exist in the table
-
refresh_index(self, vectorstore=None)
- No docstring available.
-
safe_convert_value(self, value, target_type)
- Safely convert a value to the target PostgreSQL type. Handles various formats and placeholder values.
Args: value: The value to convert target_type (str): PostgreSQL data type name
Returns: The converted value appropriate for the target type, or None if conversion fails
-
similarity_search(self, query: str, source_filter: str = '', free_filter: str = None, k: int = 5, vector_name: str = None)
- No docstring available.
-
update_row(self, table_name: str, primary_key_column: str, primary_key_value: str, update_data: dict, condition: str = None)
- Updates a row in the specified table based on the primary key.
Args: table_name (str): Name of the table to update primary_key_column (str): Name of the primary key column (e.g., 'acdid') primary_key_value (str): Value of the primary key for the row to update update_data (dict): Dictionary containing column names and values to update condition (str, optional): Additional condition for the WHERE clause
Returns: Result of SQL execution
- write_data_to_table(self, table_name: str, data: dict, metadata: dict = None)
- Writes data to the specified table, with special handling for expandable lists.
Args: table_name (str): Name of the table data (dict): Data to write to the table metadata (dict, optional): Additional metadata to include
Returns: List of results from SQL executions