rag-system/rag_system/vectordb/azure_search.py

140 lines
4.5 KiB
Python

import os
from typing import Tuple
from langchain_community.vectorstores.azuresearch import AzureSearch
from langchain_openai import AzureOpenAIEmbeddings, OpenAIEmbeddings
from dotenv import load_dotenv
from uuid import uuid4
load_dotenv() # take environment variables
required_env_vars = [
"AZURE_DEPLOYMENT",
"AZURE_OPENAI_API_VERSION",
"AZURE_ENDPOINT",
"AZURE_OPENAI_API_KEY",
"VECTOR_STORE_ADDRESS",
"VECTOR_STORE_PASSWORD",
"INDEX_NAME",
"RETRY_TOTAL",
]
missing_vars = [var for var in required_env_vars if not os.environ.get(var)]
if missing_vars:
raise ValueError(
f"Missing required environment variables: {', '.join(missing_vars)}"
)
# Use AzureOpenAIEmbeddings with an Azure account
embeddings: AzureOpenAIEmbeddings = AzureOpenAIEmbeddings(
azure_deployment=os.getenv("AZURE_DEPLOYMENT"),
openai_api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
azure_endpoint=os.getenv("AZURE_ENDPOINT"),
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
)
# Specify additional properties for the Azure client such as the following https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/core/azure-core/README.md#configurations
vector_store: AzureSearch = AzureSearch(
azure_search_endpoint=os.getenv("VECTOR_STORE_ADDRESS"),
azure_search_key=os.getenv("VECTOR_STORE_PASSWORD"),
index_name=os.getenv("INDEX_NAME"),
embedding_function=embeddings.embed_query,
# Configure max retries for the Azure client
additional_search_client_options={"retry_total": os.getenv("RETRY_TOTAL")},
)
def get_document_id(document):
"""
Get the document ID from the document object.
"""
if hasattr(document, "metadata") and "id" in document.metadata:
return document.metadata["id"]
elif hasattr(document, "id"):
return document.id
else:
raise ValueError("Document does not have a valid ID.")
def delete_all_documents():
"""
Delete all documents from the AzureSearch vector store.
"""
try:
docs_to_delete = []
while True:
# Delete all documents in the index
docs_to_delete = retrieve("", 10)
vector_store.delete(list(map(get_document_id, docs_to_delete)))
if len(docs_to_delete) > 0:
continue
else:
break
print("All documents deleted successfully.")
except Exception as e:
print(f"Error deleting documents: {str(e)}")
def add_documents(documents):
# uuids = [str(uuid4()) for _ in range(len(documents))]
try:
vector_store.add_documents(documents)
except Exception as e:
print(f"Error adding document to vector store: {str(e)}")
def retrieve(query_text, n_results=1):
# Perform a similarity search
docs = vector_store.similarity_search(
query=query_text,
k=n_results,
search_type="similarity",
)
return docs
# def add_document_to_vector_store(document):
# """
# Add a document to the AzureSearch vector store.
# Args:
# vector_store: The initialized AzureSearch vector store instance.
# document: A dictionary or object representing the document to be added.
# Example format:
# {
# "id": "unique_document_id",
# "content": "The text content of the document",
# "metadata": {
# "source": "source_url",
# "created": "2025-03-04T14:14:40.421666",
# "modified": "2025-03-04T14:14:40.421666"
# }
# }
# """
# try:
# # Add the document to the vector store
# vector_store.add_documents([document])
# print(f"Document with ID {document['id']} added successfully.")
# except Exception as e:
# print(f"Error adding document to vector store: {str(e)}")
# add_document_to_vector_store("https://api.python.langchain.com/en/latest/langchain_api_reference.html",None)
# Example document to add
# doc = Document(
# page_content="This is the content of the document.For testing IVA demo integration ",
# metadata= {
# "source": "https://example.com/source",
# "created": "2025-03-04T14:14:40.421666",
# "modified": "2025-03-04T14:14:40.421666"
# }
# )
# Add the document to the vector store
# add_document_to_vector_store( doc)
# result = retrieve("iva",1)