Skip to main content

discovery_engine_client.py

Source: src/sunholo/discovery_engine/discovery_engine_client.py

Classes

DiscoveryEngineClient

Client for interacting with Google Cloud Discovery Engine.

Args: project_id (str): Your Google Cloud project ID. data_store_id (str): The ID of your Discovery Engine data store. location (str, optional): The location of the data store (default is 'eu').

Example:

client = DiscoveryEngineClient(project_id='your-project-id', data_store_id='your-data-store-id')

# Create a collection
collection_name = client.create_collection("my_new_collection")

# Perform a search
search_response = client.get_chunks("your query", "your_collection_id")

Parsing:

# Perform a search
search_response = client.get_chunks("your query", "your_collection_id")

# Iterate through the search results
for result in search_response.results:
# Get the document (which contains the chunks)
document = result.document

# Iterate through the chunks within the document
for chunk in document.chunks:
chunk_text = chunk.snippet # Extract the text content of the chunk
chunk_document_name = chunk.document_name # Get the name of the document the chunk belongs to

# Do something with the chunk_text and chunk_document_name (e.g., print, store, etc.)
print(f"Chunk Text: {chunk_text}")
print(f"Document Name: {chunk_document_name}")
  • init(self, data_store_id=None, engine_id=None, project_id=None, location='eu')

    • Initialize self. See help(type(self)) for accurate signature.
  • _create_unique_gsuri_docid(self, gcs_uri: str)

    • No docstring available.
  • _import_document_request(self, request) -> str

    • Handles the common logic for making an ImportDocumentsRequest, including retrying.

Args: request (discoveryengine.ImportDocumentsRequest): The prepared request object.

Returns: str: The operation name.

  • _search_data_store_path(self, data_store_id: str, collection_id: str = 'default_collection', serving_config: str = 'default_serving_config')

    • No docstring available.
  • async_get_chunks(self, query: str, num_previous_chunks: int = 3, num_next_chunks: int = 3, page_size: int = 10, parse_chunks_to_string: bool = True, serving_config: str = 'default_serving_config', data_store_ids: Optional[List[str]] = None, filter_str: str = None)

    • Asynchronously retrieves chunks or documents based on a query.

Args: query (str): The search query. num_previous_chunks (int, optional): Number of previous chunks to return for context (default is 3). num_next_chunks (int, optional): Number of next chunks to return for context (default is 3). page_size (int, optional): The maximum number of results to return per page (default is 10). parse_chunks_to_string: If True will put chunks in one big string, False will return object serving_config: The resource name of the Search serving config data_store_ids: If you want to search over many data stores, not just the one that was used to init the class. They should be of the format projects/{project}/locations/{location}/collections/{collection_id}/dataStores/{data_store_id}

Returns: discoveryengine.SearchResponse or str: The search response object or string of chunks.

  • async_get_documents(self, query: str, page_size: int = 10, parse_documents_to_string: bool = True, serving_config: str = 'default_serving_config', data_store_ids: Optional[List[str]] = None, filter_str: str = None, max_limit: int = None)
    • Asynchronously retrieves entire documents based on a query.

Args: query (str): The search query. page_size (int, optional): The maximum number of results to return per page (default is 10). parse_documents_to_string: If True will put documents in one big string, False will return object serving_config: The resource name of the Search serving config data_store_ids: If you want to search over many data stores, not just the one that was used to init the class. They should be of the format projects/{project}/locations/{location}/collections/{collection_id}/dataStores/{data_store_id}

Returns: discoveryengine.SearchResponse or str: The search response object or string of documents.

  • async_process_chunks(self, response)

    • No docstring available.
  • async_process_documents(self, response, max_limit: int = None)

    • Process a search response containing documents into a formatted string asynchronously.
  • async_search_by_objectId_and_or_date(self, query, objectId=None, date=None, **kwargs)

    • Searches and filters by objectId (exact match) and/or date asynchronously.

Args: query (str): The search query. objectId (str, optional): The exact objectId to filter by. date (str, optional): The literal_iso_8601_datetime_format date to filter by e.g. 2025-02-24T12:25:30.123Z **kwargs: Additional keyword arguments to pass to `async_search_with_filters`.

Returns: list: A list of search results.

  • async_search_engine(self, search_query: str, engine_id: str = None, serving_config_id: str = 'default_config', page_size: int = 10, return_snippet: bool = True, summary_result_count: int = 5, include_citations: bool = True, custom_prompt: Optional[str] = None, model_version: str = 'stable', query_expansion_level: 'discoveryengine.SearchRequest.QueryExpansionSpec.Condition' = <Condition.AUTO: 2>, spell_correction_mode: 'discoveryengine.SearchRequest.SpellCorrectionSpec.Mode' = <Mode.AUTO: 2>, filter_str: Optional[str] = None, boost_spec: Optional[ForwardRef('discoveryengine.SearchRequest.BoostSpec')] = None, params: Optional[Dict[str, Any]] = None, user_pseudo_id: Optional[str] = None, collection_id: str = 'default_collection')
    • Performs an asynchronous search against a specified Discovery Engine Search Engine.

Allows configuration for snippets, summaries, query expansion, spell correction, etc.

Args: (Same arguments as the synchronous search_engine method)

Returns: An SearchAsyncPager object to iterate through results asynchronously, or None if an error occurs or the async client is not available.

  • async_search_with_filters(self, query, filter_str=None, num_previous_chunks=3, num_next_chunks=3, page_size=10, parse_chunks_to_string=True, serving_config='default_serving_config', data_store_ids: Optional[List[str]] = None, content_search_spec_type='chunks', max_limit=None)
    • Searches with a generic filter string asynchronously.

Args: query (str): The search query. filter_str (str, optional): The filter string to apply (e.g., "source LIKE 'my_source' AND eventTime > TIMESTAMP('2024-01-01')"). #... other parameters from get_chunks

Returns: discoveryengine.SearchResponse or str: The search response object or string of chunks.

  • chunk_format(self, chunk)

    • No docstring available.
  • create_data_store(self, type='chunk', chunk_size: int = 500, collection: str = 'default_collection')

    • No docstring available.
  • create_data_store_chunk(self, chunk_size: int = 500, collection: str = 'default_collection') -> str

    • Creates a new data store with default configuration.

Args: chunk_size (int, optional): The size of the chunks to create for documents (default is 500).

Returns: str: The name of the long-running operation for data store creation.

  • create_engine(self, engine_id: str, data_store_ids: List[str], solution_type=None, search_tier=None, search_add_ons=None) -> str

    • You only need this if calling Data Store via Vertex Tools.
  • data_store_path(self, collection: str = 'default_collection')

    • No docstring available.
  • document_format(self, document)

    • Format a document for string output.
  • get_chunks(self, query: str, num_previous_chunks: int = 3, num_next_chunks: int = 3, page_size: int = 10, parse_chunks_to_string: bool = True, serving_config: str = 'default_serving_config', data_store_ids: Optional[List[str]] = None, filter_str: str = None)

    • Retrieves chunks or documents based on a query.

Args: query (str): The search query. num_previous_chunks (int, optional): Number of previous chunks to return for context (default is 3). num_next_chunks (int, optional): Number of next chunks to return for context (default is 3). page_size (int, optional): The maximum number of results to return per page (default is 10). parse_chunks_to_string: If True will put chunks in one big string, False will return object serving_config: The resource name of the Search serving config data_store_ids: If you want to search over many data stores, not just the one that was used to init the class. They should be of the format projects/{project}/locations/{location}/collections/{collection_id}/dataStores/{data_store_id}

Returns: discoveryengine.SearchResponse or str: The search response object or string of chunks.

Example:

search_response = client.get_chunks('your query', 'your_collection_id')
for result in search_response.results:
for chunk in result.document.chunks:
print(f"Chunk: &#123;chunk.snippet&#125;, document name: &#123;chunk.document_name&#125;")
  • get_documents(self, query: str, page_size: int = 10, parse_documents_to_string: bool = True, serving_config: str = 'default_serving_config', data_store_ids: Optional[List[str]] = None, filter_str: str = None, max_limit: int = None)
    • Retrieves entire documents based on a query.

Args: query (str): The search query. page_size (int, optional): The maximum number of results to return per page (default is 10). parse_documents_to_string: If True will put documents in one big string, False will return object serving_config: The resource name of the Search serving config data_store_ids: If you want to search over many data stores, not just the one that was used to init the class. They should be of the format projects/{project}/locations/{location}/collections/{collection_id}/dataStores/{data_store_id}

Returns: discoveryengine.SearchResponse or str: The search response object or string of documents.

Example:

search_response = client.get_documents('your query')
for result in search_response.results:
doc = result.document
print(f"Document: &#123;doc.name&#125;, Title: &#123;doc.derived_struct_data.get('title')&#125;")
  • get_mime_type(self, uri: str)

    • No docstring available.
  • import_document_with_metadata(self, gcs_uri: str, metadata: dict, branch='default_branch')

    • Imports a single document with metadata.

Args: gcs_uri: The GCS URI of the document to import. metadata: A dictionary containing the metadata for the document. branch: The branch to import the document into.

Returns: str: The operation name.

  • import_documents(self, gcs_uri: Optional[str] = None, data_schema='content', branch='default_branch', bigquery_dataset: Optional[str] = None, bigquery_table: Optional[str] = None, bigquery_project_id: Optional[str] = None) -> str
    • Args:
  • gcs_uri: Required. List of Cloud Storage URIs to input files. Each URI can be up to 2000 characters long. URIs can match the full object path (for example, gs://bucket/directory/object.json) or a pattern matching one or more files, such as gs://bucket/directory/*.json. A request can contain at most 100 files (or 100,000 files if data_schema is content). Each file can be up to 2 GB (or 100 MB if data_schema is content).
  • data_schema: Must be one of 'user_event', 'custom' or 'document' if using BigQuery. Default 'content' only for GCS. The schema to use when parsing the data from the source. Supported values for document imports: - document (default): One JSON Document per line. Each document must have a valid Document.id. - content: Unstructured data (e.g. PDF, HTML). Each file matched by input_uris becomes a document, with the ID set to the first 128 bits of SHA256(URI) encoded as a hex string. - custom: One custom data JSON per row in arbitrary format that conforms to the defined Schema of the data store. This can only be used by the GENERIC Data Store vertical. - csv: A CSV file with header conforming to the defined Schema of the data store. Each entry after the header is imported as a Document. This can only be used by the GENERIC Data Store vertical. Supported values for user event imports: - user_event (default): One JSON UserEvent per line.
  • import_documents_with_metadata(self, gcs_uri: str, data_schema='content', branch='default_branch')

    • Supply a JSONLD GCS location to import all the GS URIs within and their metadata
  • process_chunks(self, response)

    • No docstring available.
  • process_documents(self, response, max_limit: int = None)

    • Process a search response containing documents into a formatted string.
  • search_by_objectId_and_or_date(self, query, objectId=None, date=None, **kwargs)

    • Searches and filters by objectId (exact match) and/or date.

Args: query (str): The search query. objectId (str, optional): The exact objectId to filter by. date (str, optional): The literal_iso_8601_datetime_format date to filter by e.g. 2025-02-24T12:25:30.123Z **kwargs: Additional keyword arguments to pass to `search_with_filters`.

Returns: list: A list of search results.

  • search_engine(self, search_query: str, engine_id: str = None, serving_config_id: str = 'default_config', page_size: int = 10, return_snippet: bool = True, summary_result_count: int = 5, include_citations: bool = True, custom_prompt: Optional[str] = None, model_version: str = 'stable', query_expansion_level: 'discoveryengine.SearchRequest.QueryExpansionSpec.Condition' = <Condition.AUTO: 2>, spell_correction_mode: 'discoveryengine.SearchRequest.SpellCorrectionSpec.Mode' = <Mode.AUTO: 2>, filter_str: Optional[str] = None, boost_spec: Optional[ForwardRef('discoveryengine.SearchRequest.BoostSpec')] = None, params: Optional[Dict[str, Any]] = None, user_pseudo_id: Optional[str] = None, collection_id: str = 'default_collection')

    • Performs a search against a specified Discovery Engine Search Engine.

    Allows configuration for snippets, summaries, query expansion, spell correction, etc.

    Args: search_query: The user's search query string. engine_id: The ID of the search engine to query or uses class engine_id it init with. serving_config_id: The ID of the specific serving config for the engine. page_size: Maximum number of results per page. return_snippet: Whether to request snippets in the results. summary_result_count: Number of results to use for generating a summary. Set to 0 to disable summaries. include_citations: Whether summaries should include citations. custom_prompt: A custom preamble text to guide the summary generation model. model_version: The version of the summary generation model (e.g., "stable"). query_expansion_level: Level of query expansion to apply (AUTO, DISABLED). spell_correction_mode: Mode for spell correction (AUTO, SUGGEST). filter_str: An optional filter string to apply to the search. boost_spec: Optional boost specification object. params: Optional dictionary of custom parameters. user_pseudo_id: Optional unique identifier for the user/session. custom_fine_tuning_spec: Optional spec to use a fine-tuned model. collection_id: The collection ID associated with the engine.

    Returns: A SearchPager object to iterate through results, or None if an error occurs.

    Example: client = DiscoveryEngineClient( project_id=PROJECT_ID, data_store_id=DATA_STORE_ID, location=LOCATION )

    --- Example: Searching an Engine ---

    search_query_engine = "tell me about search engines" log.info(f" --- Searching Engine: {ENGINE_ID} ---") engine_pager = client.search_engine( search_query=search_query_engine, engine_id=ENGINE_ID, summary_result_count=3 # Request a summary for 3 results )

    if engine_pager: results_found = False

    Iterate through pages to get summary/results

    for page in engine_pager.pages: results_found = True if page.summary: print(f" Search Summary: {page.summary.summary_text} ")

    Citations are part of the summary object if requested

    if page.summary.summary_with_metadata: print("Summary Metadata/Citations:") for citation in page.summary.summary_with_metadata.citations: print(f" - Citation Source: {citation.sources}")

    Access references etc. if needed

    print("Results on this page:") for result in page.results: print(f" ID: {result.document.id}") print(f" Name: {result.document.name}")

    Access snippet if available in result.document.derived_struct_data['snippets']

    Access other document fields as needed (struct_data, etc.)

    print(f" Raw Result: {result}") # For detailed inspection

    print("-" * 10)

    if not results_found: print("No results found for the engine search.") else: print(f"Engine search failed for query: '{search_query_engine}'")

  • search_with_filters(self, query, filter_str=None, num_previous_chunks=3, num_next_chunks=3, page_size=10, parse_chunks_to_string=True, serving_config='default_serving_config', data_store_ids: Optional[List[str]] = None, content_search_spec_type='chunks', max_limit=None)

    • Searches with a generic filter string.

Args: query (str): The search query. filter_str (str, optional): The filter string to apply (e.g., "source LIKE 'my_source' AND eventTime > TIMESTAMP('2024-01-01')"). #... other parameters from get_chunks

Returns: discoveryengine.SearchResponse or str: The search response object or string of chunks.

Sunholo Multivac

Get in touch to see if we can help with your GenAI project.

Contact us

Other Links

Sunholo Multivac - GenAIOps

Copyright ©

Holosun ApS 2025