Skip to main content

alloydb_client.py

Source: src/sunholo/database/alloydb_client.py

Classes

AlloyDBClient

A class to manage interactions with an AlloyDB instance.

Example Usage:

client = AlloyDBClient(
project_id="your-project-id",
region="your-region",
cluster_name="your-cluster-name",
instance_name="your-instance-name",
user="your-db-user",
password="your-db-password"
)

# Create a database
client.execute_sql("CREATE DATABASE my_database")

# Execute other SQL statements
client.execute_sql("CREATE TABLE my_table (id INT, name VARCHAR(50))")
  • init(self, config: sunholo.utils.config_class.ConfigManager = None, project_id: str = None, region: str = None, cluster_name: str = None, instance_name: str = None, user: str = None, password: str = None, db='postgres')
    • Initializes the AlloyDB client.
  • project_id (str): GCP project ID where the AlloyDB instance resides.
  • region (str): The region where the AlloyDB instance is located.
  • cluster_name (str): The name of the AlloyDB cluster.
  • instance_name (str): The name of the AlloyDB instance.
  • user (str): If user is None will use the default service email
  • db_name (str): The name of the database.
  • _and_or_ilike(sources, search_type='OR', operator='ILIKE')

    • No docstring available.
  • _build_instance_uri(self, project_id, region, cluster_name, instance_name)

    • No docstring available.
  • _create_engine(self)

    • No docstring available.
  • _create_engine_from_pg8000(self, user, password, db)

    • No docstring available.
  • _execute_sql_async_langchain(self, sql_statement)

    • No docstring available.
  • _execute_sql_async_pg8000(self, sql_statement, values=None)

    • Executes a given SQL statement asynchronously with error handling.

Args: sql_statement (str): The SQL statement to execute values (list, optional): Values for parameterized query

Returns: Result of SQL execution

  • _execute_sql_langchain(self, sql_statement)

    • No docstring available.
  • _execute_sql_pg8000(self, sql_statement, params=None)

    • Executes a given SQL statement with error handling.

Args: sql_statement (str): The SQL statement to execute. params (dict or list, optional): Parameters for the SQL statement.

Returns: The result of the execution, if any.

  • _flatten_dict(self, nested_dict, parent_key='', separator='.')
    • Flatten a nested dictionary into a single-level dictionary with dot notation for keys.

Args: nested_dict (dict): The nested dictionary to flatten parent_key (str): The parent key for the current recursion level separator (str): The separator to use between key levels (default: '.')

Returns: dict: A flattened dictionary with special handling for lists

  • _flatten_dict_for_schema(self, nested_dict, parent_key='', separator='.')
    • Flatten a nested dictionary for schema creation.

Args: nested_dict (dict): The nested dictionary to flatten parent_key (str): The parent key for the current recursion level separator (str): The separator to use between key levels

Returns: dict: A flattened dictionary

  • _get_document_from_docstore(self, source: str, vector_name: str)

    • No docstring available.
  • _get_document_via_docid(self, source: str, vector_name: str, doc_id: str)

    • No docstring available.
  • _get_embedder(self)

    • No docstring available.
  • _get_sources_from_docstore(self, sources, vector_name, search_type='OR')

    • Helper function to build the SQL query for fetching sources.
  • _get_sql_type(self, value)

    • Helper method to determine SQL type from a Python value.

Args: value: The value to determine the column type

Returns: str: SQL type

  • _get_sql_type_safe(self, value)
    • Enhanced version of _get_sql_type with better type detection. Handles placeholder values and common patterns.

Args: value: The value to determine the column type

Returns: str: SQL type

  • _insert_single_row(self, table_name: str, data: dict, metadata: dict = None, primary_key_column: str = 'id')
    • Inserts a single row of data into the specified table.

Args: table_name (str): Name of the table data (dict): Data to write to the table metadata (dict, optional): Additional metadata to include

Returns: Result of SQL execution

  • _list_sources_from_docstore(self, sources, vector_name, search_type='OR')

    • Helper function to build the SQL query for listing sources.
  • _similarity_search(self, query, source_filter: str = '', free_filter: str = None, vector_name: str = None)

    • No docstring available.
  • asimilarity_search(self, query: str, source_filter: str = '', free_filter: str = None, k: int = 5, vector_name: str = None)

    • No docstring available.
  • check_connection(self)

    • Checks if the database connection is still valid.

Returns: bool: True if connection is valid, False otherwise

  • check_row(self, table_name: str, primary_key_column: str, primary_key_value: str, columns: list = None, condition: str = None)
    • Retrieves a row from the specified table based on the primary key.

Args: table_name (str): Name of the table to query primary_key_column (str): Name of the primary key column (e.g., 'id') primary_key_value (str): Value of the primary key for the row to retrieve columns (list, optional): List of column names to retrieve. If None, retrieves all columns condition (str, optional): Additional condition for the WHERE clause

Returns: The row data if found, None otherwise

  • close(self)

    • Properly close the database connection.
  • create_database(self, database_name)

    • No docstring available.
  • create_docstore_table(self, vector_name: str, users)

    • No docstring available.
  • create_index(self, vectorstore=None)

    • No docstring available.
  • create_schema(self, schema_name='public')

    • No docstring available.
  • create_table_from_schema(self, table_name: str, schema_data: dict, users: list = None)

    • Creates or ensures a table exists based on the structure of the provided schema data, with special handling for expandable lists.

Args: table_name (str): Name of the table to create schema_data (dict): Data structure that matches the expected schema users (list, optional): List of users to grant permissions to

Returns: Result of SQL execution

  • create_table_with_columns(self, table_name, column_definitions, if_not_exists=True, primary_key_column='id')
    • Create a table with explicit column definitions.

Args: table_name (str): The name of the table to create column_definitions (list): List of column definition dictionaries:

  • name: Column name

  • type: PostgreSQL data type

  • nullable: Whether column allows NULL (default True)

  • default: Default value expression (optional)

  • primary_key: Whether this is a primary key (default False) if_not_exists (bool): Whether to use IF NOT EXISTS clause primary_key_column (str): default name of primary key if not specified in column_definitions

Returns: Result of the execution

  • create_tables(self, vector_name, users)

    • No docstring available.
  • create_vectorstore_table(self, vector_name: str, users)

    • No docstring available.
  • delete_sources_from_alloydb(self, sources, vector_name)

    • Deletes from both vectorstore and docstore
  • ensure_connected(self)

    • Ensures the database connection is valid, attempting to reconnect if necessary.

Returns: bool: True if connection is valid or reconnection successful, False otherwise

  • execute_sql(self, sql_statement)

    • No docstring available.
  • execute_sql_async(self, sql_statement)

    • No docstring available.
  • fetch_owners(self)

    • No docstring available.
  • get_document_from_docstore(self, source: str, vector_name)

    • No docstring available.
  • get_document_from_docstore_async(self, source: str, vector_name: str)

    • No docstring available.
  • get_sources_from_docstore(self, sources, vector_name, search_type='OR', just_source_name=False)

    • Fetches sources from the docstore.
  • get_sources_from_docstore_async(self, sources, vector_name, search_type='OR', just_source_name=False)

    • Fetches sources from the docstore asynchronously.
  • get_table_columns(self, table_name, schema='public')

    • Fetch column information for an existing table.

Args: table_name (str): The table name to get columns for schema (str): Database schema, defaults to "public"

Returns: List[dict]: List of column information dictionaries with keys:

  • name: column name
  • type: PostgreSQL data type
  • is_nullable: whether the column allows NULL values
  • default: default value if any
  • get_vectorstore(self, vector_name: str = None)

    • No docstring available.
  • grant_schema_permissions(self, schema_name, users)

    • No docstring available.
  • grant_table_permissions(self, table_name, users)

    • No docstring available.
  • insert_rows_safely(self, table_name, rows, metadata=None, continue_on_error=False, primary_key_column='id')

    • Insert multiple rows into a table with error handling for individual rows.

Args: table_name (str): The table to insert into rows (list): List of dictionaries containing row data metadata (dict, optional): Additional metadata to include in each row continue_on_error (bool): Whether to continue if some rows fail primary_key_column (str): The primary key in the table, default 'id'

Returns: dict: { 'success': bool, 'total_rows': int, 'inserted_rows': int, 'failed_rows': int, 'errors': list of errors with row data }

  • map_data_to_columns(self, data, column_info, case_sensitive=False)
    • Map data dictionary to available table columns, handling case sensitivity.

Args: data (dict): Dictionary of data to map column_info (list): List of column information dictionaries from get_table_columns case_sensitive (bool): Whether to match column names case-sensitively

Returns: dict: Filtered data dictionary with only columns that exist in the table

  • refresh_index(self, vectorstore=None)

    • No docstring available.
  • safe_convert_value(self, value, target_type)

    • Safely convert a value to the target PostgreSQL type. Handles various formats and placeholder values.

Args: value: The value to convert target_type (str): PostgreSQL data type name

Returns: The converted value appropriate for the target type, or None if conversion fails

  • similarity_search(self, query: str, source_filter: str = '', free_filter: str = None, k: int = 5, vector_name: str = None)

    • No docstring available.
  • update_row(self, table_name: str, primary_key_column: str, primary_key_value: str, update_data: dict, condition: str = None)

    • Updates a row in the specified table based on the primary key.

Args: table_name (str): Name of the table to update primary_key_column (str): Name of the primary key column (e.g., 'acdid') primary_key_value (str): Value of the primary key for the row to update update_data (dict): Dictionary containing column names and values to update condition (str, optional): Additional condition for the WHERE clause

Returns: Result of SQL execution

  • write_data_to_table(self, table_name: str, data: dict, metadata: dict = None)
    • Writes data to the specified table, with special handling for expandable lists.

Args: table_name (str): Name of the table data (dict): Data to write to the table metadata (dict, optional): Additional metadata to include

Returns: List of results from SQL executions

Sunholo Multivac

Get in touch to see if we can help with your GenAI project.

Contact us

Other Links

Sunholo Multivac - GenAIOps

Copyright ©

Holosun ApS 2025