diff --git a/api/core/rag/datasource/vdb/lindorm/lindorm_vector.py b/api/core/rag/datasource/vdb/lindorm/lindorm_vector.py index b2fbfced0a..643ac2df4e 100644 --- a/api/core/rag/datasource/vdb/lindorm/lindorm_vector.py +++ b/api/core/rag/datasource/vdb/lindorm/lindorm_vector.py @@ -4,7 +4,8 @@ import logging import time from typing import Any, Optional -from opensearchpy import OpenSearch +from opensearchpy import OpenSearch, helpers +from opensearchpy.helpers import BulkIndexError from pydantic import BaseModel, model_validator from tenacity import retry, stop_after_attempt, wait_exponential @@ -135,14 +136,14 @@ class LindormVectorStore(BaseVector): actions.append(action_header) actions.append(action_values) - logger.info(f"Processing batch {batch_num + 1}/{num_batches} (documents {start_idx + 1} to {end_idx})") + # logger.info(f"Processing batch {batch_num + 1}/{num_batches} (documents {start_idx + 1} to {end_idx})") try: _bulk_with_retry(actions) - logger.info(f"Successfully processed batch {batch_num + 1}") + # logger.info(f"Successfully processed batch {batch_num + 1}") # simple latency to avoid too many requests in a short time if batch_num < num_batches - 1: - time.sleep(1) + time.sleep(0.5) except Exception: logger.exception(f"Failed to process batch {batch_num + 1}") @@ -166,18 +167,51 @@ class LindormVectorStore(BaseVector): self.delete_by_ids(ids) def delete_by_ids(self, ids: list[str]) -> None: - params = {} - if self._using_ugc: - params["routing"] = self._routing + """Delete documents by their IDs in batch. + + Args: + ids: List of document IDs to delete + """ + if not ids: + return + + params = {"routing": self._routing} if self._using_ugc else {} + + # 1. First check if collection exists + if not self._client.indices.exists(index=self._collection_name): + logger.warning(f"Collection {self._collection_name} does not exist") + return + + # 2. Batch process deletions + actions = [] for id in ids: if self._client.exists(index=self._collection_name, id=id, params=params): - params = {} - if self._using_ugc: - params["routing"] = self._routing - self._client.delete(index=self._collection_name, id=id, params=params) + actions.append( + { + "_op_type": "delete", + "_index": self._collection_name, + "_id": id, + **params, # Include routing if using UGC + } + ) else: logger.warning(f"DELETE BY ID: ID {id} does not exist in the index.") + # 3. Perform bulk deletion if there are valid documents to delete + if actions: + try: + helpers.bulk(self._client, actions) + except BulkIndexError as e: + for error in e.errors: + delete_error = error.get("delete", {}) + status = delete_error.get("status") + doc_id = delete_error.get("_id") + + if status == 404: + logger.warning(f"Document not found for deletion: {doc_id}") + else: + logger.exception(f"Error deleting document: {error}") + def delete(self) -> None: if self._using_ugc: routing_filter_query = { @@ -213,7 +247,7 @@ class LindormVectorStore(BaseVector): document_ids_filter = kwargs.get("document_ids_filter") filters = [] if document_ids_filter: - filters.append({"terms": {"metadata.document_id": document_ids_filter}}) + filters.append({"terms": {"metadata.document_id.keyword": document_ids_filter}}) query = default_vector_search_query(query_vector=query_vector, k=top_k, filters=filters, **kwargs) try: @@ -256,7 +290,7 @@ class LindormVectorStore(BaseVector): filters = kwargs.get("filter", []) document_ids_filter = kwargs.get("document_ids_filter") if document_ids_filter: - filters.append({"terms": {"metadata.document_id": document_ids_filter}}) + filters.append({"terms": {"metadata.document_id.keyword": document_ids_filter}}) routing = self._routing full_text_query = default_text_search_query( query_text=query, @@ -270,6 +304,7 @@ class LindormVectorStore(BaseVector): routing=routing, routing_field=self._routing_field, ) + response = self._client.search(index=self._collection_name, body=full_text_query) docs = [] for hit in response["hits"]["hits"]: @@ -479,7 +514,7 @@ def default_vector_search_query( **kwargs, ) -> dict: if filters is not None: - filter_type = "post_filter" if filter_type is None else filter_type + filter_type = "pre_filter" if filter_type is None else filter_type if not isinstance(filters, list): raise RuntimeError(f"unexpected filter with {type(filters)}") final_ext: dict[str, Any] = {"lvector": {}}