diff --git a/api/settings.py b/api/settings.py index 24dc31e17..30db67a51 100644 --- a/api/settings.py +++ b/api/settings.py @@ -19,6 +19,7 @@ from enum import IntEnum, Enum import json import rag.utils.es_conn import rag.utils.infinity_conn +import rag.utils.opensearch_coon import rag.utils from rag.nlp import search @@ -132,11 +133,14 @@ def init_settings(): global DOC_ENGINE, docStoreConn, retrievaler, kg_retrievaler DOC_ENGINE = os.environ.get('DOC_ENGINE', "elasticsearch") + # DOC_ENGINE = os.environ.get('DOC_ENGINE', "opensearch") lower_case_doc_engine = DOC_ENGINE.lower() if lower_case_doc_engine == "elasticsearch": docStoreConn = rag.utils.es_conn.ESConnection() elif lower_case_doc_engine == "infinity": docStoreConn = rag.utils.infinity_conn.InfinityConnection() + elif lower_case_doc_engine == "opensearch": + docStoreConn = rag.utils.opensearch_coon.OSConnection() else: raise Exception(f"Not supported doc engine: {DOC_ENGINE}") diff --git a/conf/os_mapping.json b/conf/os_mapping.json new file mode 100644 index 000000000..a8663e069 --- /dev/null +++ b/conf/os_mapping.json @@ -0,0 +1,213 @@ +{ + "settings": { + "index": { + "number_of_shards": 2, + "number_of_replicas": 0, + "refresh_interval": "1000ms", + "knn": true, + "similarity": { + "scripted_sim": { + "type": "scripted", + "script": { + "source": "double idf = Math.log(1+(field.docCount-term.docFreq+0.5)/(term.docFreq + 0.5))/Math.log(1+((field.docCount-0.5)/1.5)); return query.boost * idf * Math.min(doc.freq, 1);" + } + } + } + } + }, + "mappings": { + "properties": { + "lat_lon": { + "type": "geo_point", + "store": "true" + } + }, + "date_detection": "true", + "dynamic_templates": [ + { + "int": { + "match": "*_int", + "mapping": { + "type": "integer", + "store": "true" + } + } + }, + { + "ulong": { + "match": "*_ulong", + "mapping": { + "type": "unsigned_long", + "store": "true" + } + } + }, + { + "long": { + "match": "*_long", + "mapping": { + "type": "long", + "store": "true" + } + } + }, + { + "short": { + "match": "*_short", + "mapping": { + "type": "short", + "store": "true" + } + } + }, + { + "numeric": { + "match": "*_flt", + "mapping": { + "type": "float", + "store": true + } + } + }, + { + "tks": { + "match": "*_tks", + "mapping": { + "type": "text", + "similarity": "scripted_sim", + "analyzer": "whitespace", + "store": true + } + } + }, + { + "ltks": { + "match": "*_ltks", + "mapping": { + "type": "text", + "analyzer": "whitespace", + "store": true + } + } + }, + { + "kwd": { + "match_pattern": "regex", + "match": "^(.*_(kwd|id|ids|uid|uids)|uid)$", + "mapping": { + "type": "keyword", + "similarity": "boolean", + "store": true + } + } + }, + { + "dt": { + "match_pattern": "regex", + "match": "^.*(_dt|_time|_at)$", + "mapping": { + "type": "date", + "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||yyyy-MM-dd_HH:mm:ss", + "store": true + } + } + }, + { + "nested": { + "match": "*_nst", + "mapping": { + "type": "nested" + } + } + }, + { + "object": { + "match": "*_obj", + "mapping": { + "type": "object", + "dynamic": "true" + } + } + }, + { + "string": { + "match_pattern": "regex", + "match": "^.*_(with_weight|list)$", + "mapping": { + "type": "text", + "index": "false", + "store": true + } + } + }, + { + "rank_feature": { + "match": "*_fea", + "mapping": { + "type": "rank_feature" + } + } + }, + { + "rank_features": { + "match": "*_feas", + "mapping": { + "type": "rank_features" + } + } + }, + { + "knn_vector": { + "match": "*_512_vec", + "mapping": { + "type": "knn_vector", + "index": true, + "space_type": "cosinesimil", + "dimension": 512 + } + } + }, + { + "knn_vector": { + "match": "*_768_vec", + "mapping": { + "type": "knn_vector", + "index": true, + "space_type": "cosinesimil", + "dimension": 768 + } + } + }, + { + "knn_vector": { + "match": "*_1024_vec", + "mapping": { + "type": "knn_vector", + "index": true, + "space_type": "cosinesimil", + "dimension": 1024 + } + } + }, + { + "knn_vector": { + "match": "*_1536_vec", + "mapping": { + "type": "knn_vector", + "index": true, + "space_type": "cosinesimil", + "dimension": 1536 + } + } + }, + { + "binary": { + "match": "*_bin", + "mapping": { + "type": "binary" + } + } + } + ] + } +} \ No newline at end of file diff --git a/conf/service_conf.yaml b/conf/service_conf.yaml index e0ed3de86..8f249d26c 100644 --- a/conf/service_conf.yaml +++ b/conf/service_conf.yaml @@ -17,6 +17,10 @@ es: hosts: 'http://localhost:1200' username: 'elastic' password: 'infini_rag_flow' +os: + hosts: 'http://localhost:1201' + username: 'admin' + password: 'infini_rag_flow_OS_01' infinity: uri: 'localhost:23817' db_name: 'default_db' diff --git a/docker/.env b/docker/.env index fcec532c6..b6eeb0bff 100644 --- a/docker/.env +++ b/docker/.env @@ -2,8 +2,10 @@ # Available options: # - `elasticsearch` (default) # - `infinity` (https://github.com/infiniflow/infinity) +# - `opensearch` (https://github.com/opensearch-project/OpenSearch) DOC_ENGINE=${DOC_ENGINE:-elasticsearch} + # ------------------------------ # docker env var for specifying vector db type at startup # (based on the vector db type, the corresponding docker @@ -24,6 +26,16 @@ ES_PORT=1200 # The password for Elasticsearch. ELASTIC_PASSWORD=infini_rag_flow +# the hostname where OpenSearch service is exposed, set it not the same as elasticsearch +OS_PORT=1201 + +# The hostname where the OpenSearch service is exposed +OS_HOST=opensearch01 + +# The password for OpenSearch. +# At least one uppercase letter, one lowercase letter, one digit, and one special character +OPENSEARCH_PASSWORD=infini_rag_flow_OS_01 + # The port used to expose the Kibana service to the host machine, # allowing EXTERNAL access to the service running inside the Docker container. KIBANA_PORT=6601 diff --git a/docker/docker-compose-base.yml b/docker/docker-compose-base.yml index d9a39fdfc..66a87d8b7 100644 --- a/docker/docker-compose-base.yml +++ b/docker/docker-compose-base.yml @@ -35,6 +35,44 @@ services: - ragflow restart: on-failure + opensearch01: + container_name: ragflow-opensearch-01 + profiles: + - opensearch + image: hub.icert.top/opensearchproject/opensearch:2.19.1 + volumes: + - osdata01:/usr/share/opensearch/data + ports: + - ${OS_PORT}:9201 + env_file: .env + environment: + - node.name=opensearch01 + - OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD} + - OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_PASSWORD} + - bootstrap.memory_lock=false + - discovery.type=single-node + - plugins.security.disabled=false + - plugins.security.ssl.http.enabled=false + - plugins.security.ssl.transport.enabled=true + - cluster.routing.allocation.disk.watermark.low=5gb + - cluster.routing.allocation.disk.watermark.high=3gb + - cluster.routing.allocation.disk.watermark.flood_stage=2gb + - TZ=${TIMEZONE} + - http.port=9201 + mem_limit: ${MEM_LIMIT} + ulimits: + memlock: + soft: -1 + hard: -1 + healthcheck: + test: ["CMD-SHELL", "curl http://localhost:9201"] + interval: 10s + timeout: 10s + retries: 120 + networks: + - ragflow + restart: on-failure + infinity: container_name: ragflow-infinity profiles: @@ -133,6 +171,8 @@ services: volumes: esdata01: driver: local + osdata01: + driver: local infinity_data: driver: local mysql_data: diff --git a/docker/service_conf.yaml.template b/docker/service_conf.yaml.template index 20d347e52..aaf8fa708 100644 --- a/docker/service_conf.yaml.template +++ b/docker/service_conf.yaml.template @@ -17,6 +17,10 @@ es: hosts: 'http://${ES_HOST:-es01}:9200' username: '${ES_USER:-elastic}' password: '${ELASTIC_PASSWORD:-infini_rag_flow}' +os: + hosts: 'http://${OS_HOST:-opensearch01}:9201' + username: '${OS_USER:-admin}' + password: '${OPENSEARCHH_PASSWORD:-infini_rag_flow_OS_01}' infinity: uri: '${INFINITY_HOST:-infinity}:23817' db_name: 'default_db' diff --git a/pyproject.toml b/pyproject.toml index 12eebe80d..20746edaf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -127,6 +127,7 @@ dependencies = [ "langfuse>=2.60.0", "debugpy>=1.8.13", "mcp>=1.6.0", + "opensearch-py==2.7.1" ] [project.optional-dependencies] diff --git a/rag/settings.py b/rag/settings.py index 7fa497c16..2dfaea627 100644 --- a/rag/settings.py +++ b/rag/settings.py @@ -31,10 +31,13 @@ AZURE = {} S3 = {} MINIO = {} OSS = {} +OS = {} # Initialize the selected configuration data based on environment variables to solve the problem of initialization errors due to lack of configuration if DOC_ENGINE == 'elasticsearch': ES = get_base_config("es", {}) +elif DOC_ENGINE == 'opensearch': + OS = get_base_config("os", {}) elif DOC_ENGINE == 'infinity': INFINITY = get_base_config("infinity", {"uri": "infinity:23817"}) diff --git a/rag/utils/opensearch_coon.py b/rag/utils/opensearch_coon.py new file mode 100644 index 000000000..8bbde7e07 --- /dev/null +++ b/rag/utils/opensearch_coon.py @@ -0,0 +1,558 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import re +import json +import time +import os + +import copy +from opensearchpy import OpenSearch, NotFoundError +from opensearchpy import UpdateByQuery, Q, Search, Index +from opensearchpy import ConnectionTimeout +from rag import settings +from rag.settings import TAG_FLD, PAGERANK_FLD +from rag.utils import singleton +from api.utils.file_utils import get_project_base_directory +from rag.utils.doc_store_conn import DocStoreConnection, MatchExpr, OrderByExpr, MatchTextExpr, MatchDenseExpr, \ + FusionExpr +from rag.nlp import is_english, rag_tokenizer + +ATTEMPT_TIME = 2 + +logger = logging.getLogger('ragflow.opensearch_conn') + + +@singleton +class OSConnection(DocStoreConnection): + def __init__(self): + self.info = {} + logger.info(f"Use OpenSearch {settings.OS['hosts']} as the doc engine.") + for _ in range(ATTEMPT_TIME): + try: + self.os = OpenSearch( + settings.OS["hosts"].split(","), + http_auth=(settings.OS["username"], settings.OS[ + "password"]) if "username" in settings.OS and "password" in settings.OS else None, + verify_certs=False, + timeout=600 + ) + if self.os: + self.info = self.os.info() + break + except Exception as e: + logger.warning(f"{str(e)}. Waiting OpenSearch {settings.OS['hosts']} to be healthy.") + time.sleep(5) + if not self.os.ping(): + msg = f"OpenSearch {settings.OS['hosts']} is unhealthy in 120s." + logger.error(msg) + raise Exception(msg) + v = self.info.get("version", {"number": "2.18.0"}) + v = v["number"].split(".")[0] + if int(v) < 2: + msg = f"OpenSearch version must be greater than or equal to 2, current version: {v}" + logger.error(msg) + raise Exception(msg) + fp_mapping = os.path.join(get_project_base_directory(), "conf", "os_mapping.json") + if not os.path.exists(fp_mapping): + msg = f"OpenSearch mapping file not found at {fp_mapping}" + logger.error(msg) + raise Exception(msg) + self.mapping = json.load(open(fp_mapping, "r")) + logger.info(f"OpenSearch {settings.OS['hosts']} is healthy.") + + """ + Database operations + """ + + def dbType(self) -> str: + return "opensearch" + + def health(self) -> dict: + health_dict = dict(self.os.cluster.health()) + health_dict["type"] = "opensearch" + return health_dict + + """ + Table operations + """ + + def createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int): + if self.indexExist(indexName, knowledgebaseId): + return True + try: + from opensearchpy.client import IndicesClient + return IndicesClient(self.os).create(index=indexName, + body=self.mapping) + except Exception: + logger.exception("OSConnection.createIndex error %s" % (indexName)) + + def deleteIdx(self, indexName: str, knowledgebaseId: str): + if len(knowledgebaseId) > 0: + # The index need to be alive after any kb deletion since all kb under this tenant are in one index. + return + try: + self.os.indices.delete(index=indexName, allow_no_indices=True) + except NotFoundError: + pass + except Exception: + logger.exception("OSConnection.deleteIdx error %s" % (indexName)) + + def indexExist(self, indexName: str, knowledgebaseId: str = None) -> bool: + s = Index(indexName, self.os) + for i in range(ATTEMPT_TIME): + try: + return s.exists() + except Exception as e: + logger.exception("OSConnection.indexExist got exception") + if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0: + continue + break + return False + + """ + CRUD operations + """ + + def search( + self, selectFields: list[str], + highlightFields: list[str], + condition: dict, + matchExprs: list[MatchExpr], + orderBy: OrderByExpr, + offset: int, + limit: int, + indexNames: str | list[str], + knowledgebaseIds: list[str], + aggFields: list[str] = [], + rank_feature: dict | None = None + ): + """ + Refers to https://github.com/opensearch-project/opensearch-py/blob/main/guides/dsl.md + """ + use_knn = False + if isinstance(indexNames, str): + indexNames = indexNames.split(",") + assert isinstance(indexNames, list) and len(indexNames) > 0 + assert "_id" not in condition + + bqry = Q("bool", must=[]) + condition["kb_id"] = knowledgebaseIds + for k, v in condition.items(): + if k == "available_int": + if v == 0: + bqry.filter.append(Q("range", available_int={"lt": 1})) + else: + bqry.filter.append( + Q("bool", must_not=Q("range", available_int={"lt": 1}))) + continue + if not v: + continue + if isinstance(v, list): + bqry.filter.append(Q("terms", **{k: v})) + elif isinstance(v, str) or isinstance(v, int): + bqry.filter.append(Q("term", **{k: v})) + else: + raise Exception( + f"Condition `{str(k)}={str(v)}` value type is {str(type(v))}, expected to be int, str or list.") + + s = Search() + vector_similarity_weight = 0.5 + for m in matchExprs: + if isinstance(m, FusionExpr) and m.method == "weighted_sum" and "weights" in m.fusion_params: + assert len(matchExprs) == 3 and isinstance(matchExprs[0], MatchTextExpr) and isinstance(matchExprs[1], + MatchDenseExpr) and isinstance( + matchExprs[2], FusionExpr) + weights = m.fusion_params["weights"] + vector_similarity_weight = float(weights.split(",")[1]) + knn_query = {} + for m in matchExprs: + if isinstance(m, MatchTextExpr): + minimum_should_match = m.extra_options.get("minimum_should_match", 0.0) + if isinstance(minimum_should_match, float): + minimum_should_match = str(int(minimum_should_match * 100)) + "%" + bqry.must.append(Q("query_string", fields=m.fields, + type="best_fields", query=m.matching_text, + minimum_should_match=minimum_should_match, + boost=1)) + bqry.boost = 1.0 - vector_similarity_weight + + # Elasticsearch has the encapsulation of KNN_search in python sdk + # while the Python SDK for OpenSearch does not provide encapsulation for KNN_search, + # the following codes implement KNN_search in OpenSearch using DSL + # Besides, Opensearch's DSL for KNN_search query syntax differs from that in Elasticsearch, I also made some adaptions for it + elif isinstance(m, MatchDenseExpr): + assert (bqry is not None) + similarity = 0.0 + if "similarity" in m.extra_options: + similarity = m.extra_options["similarity"] + use_knn = True + vector_column_name = m.vector_column_name + knn_query[vector_column_name] = {} + knn_query[vector_column_name]["vector"] = list(m.embedding_data) + knn_query[vector_column_name]["k"] = m.topn + knn_query[vector_column_name]["filter"] = bqry.to_dict() + knn_query[vector_column_name]["boost"] = similarity + + if bqry and rank_feature: + for fld, sc in rank_feature.items(): + if fld != PAGERANK_FLD: + fld = f"{TAG_FLD}.{fld}" + bqry.should.append(Q("rank_feature", field=fld, linear={}, boost=sc)) + + if bqry: + s = s.query(bqry) + for field in highlightFields: + s = s.highlight(field) + + if orderBy: + orders = list() + for field, order in orderBy.fields: + order = "asc" if order == 0 else "desc" + if field in ["page_num_int", "top_int"]: + order_info = {"order": order, "unmapped_type": "float", + "mode": "avg", "numeric_type": "double"} + elif field.endswith("_int") or field.endswith("_flt"): + order_info = {"order": order, "unmapped_type": "float"} + else: + order_info = {"order": order, "unmapped_type": "text"} + orders.append({field: order_info}) + s = s.sort(*orders) + + for fld in aggFields: + s.aggs.bucket(f'aggs_{fld}', 'terms', field=fld, size=1000000) + + if limit > 0: + s = s[offset:offset + limit] + q = s.to_dict() + logger.debug(f"OSConnection.search {str(indexNames)} query: " + json.dumps(q)) + + if use_knn: + del q["query"] + q["query"] = {"knn" : knn_query} + + for i in range(ATTEMPT_TIME): + try: + res = self.os.search(index=indexNames, + body=q, + timeout=600, + # search_type="dfs_query_then_fetch", + track_total_hits=True, + _source=True) + if str(res.get("timed_out", "")).lower() == "true": + raise Exception("OpenSearch Timeout.") + logger.debug(f"OSConnection.search {str(indexNames)} res: " + str(res)) + return res + except Exception as e: + logger.exception(f"OSConnection.search {str(indexNames)} query: " + str(q)) + if str(e).find("Timeout") > 0: + continue + raise e + logger.error("OSConnection.search timeout for 3 times!") + raise Exception("OSConnection.search timeout.") + + def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None: + for i in range(ATTEMPT_TIME): + try: + res = self.os.get(index=(indexName), + id=chunkId, source=True, ) + if str(res.get("timed_out", "")).lower() == "true": + raise Exception("Es Timeout.") + chunk = res["_source"] + chunk["id"] = chunkId + return chunk + except NotFoundError: + return None + except Exception as e: + logger.exception(f"OSConnection.get({chunkId}) got exception") + if str(e).find("Timeout") > 0: + continue + raise e + logger.error("OSConnection.get timeout for 3 times!") + raise Exception("OSConnection.get timeout.") + + def insert(self, documents: list[dict], indexName: str, knowledgebaseId: str = None) -> list[str]: + # Refers to https://opensearch.org/docs/latest/api-reference/document-apis/bulk/ + operations = [] + for d in documents: + assert "_id" not in d + assert "id" in d + d_copy = copy.deepcopy(d) + meta_id = d_copy.pop("id", "") + operations.append( + {"index": {"_index": indexName, "_id": meta_id}}) + operations.append(d_copy) + + res = [] + for _ in range(ATTEMPT_TIME): + try: + res = [] + r = self.os.bulk(index=(indexName), body=operations, + refresh=False, timeout=60) + if re.search(r"False", str(r["errors"]), re.IGNORECASE): + return res + + for item in r["items"]: + for action in ["create", "delete", "index", "update"]: + if action in item and "error" in item[action]: + res.append(str(item[action]["_id"]) + ":" + str(item[action]["error"])) + return res + except Exception as e: + res.append(str(e)) + logger.warning("OSConnection.insert got exception: " + str(e)) + res = [] + if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE): + res.append(str(e)) + time.sleep(3) + continue + return res + + def update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool: + doc = copy.deepcopy(newValue) + doc.pop("id", None) + if "id" in condition and isinstance(condition["id"], str): + # update specific single document + chunkId = condition["id"] + for i in range(ATTEMPT_TIME): + try: + self.os.update(index=indexName, id=chunkId, doc=doc) + return True + except Exception as e: + logger.exception( + f"OSConnection.update(index={indexName}, id={id}, doc={json.dumps(condition, ensure_ascii=False)}) got exception") + if re.search(r"(timeout|connection)", str(e).lower()): + continue + break + return False + + # update unspecific maybe-multiple documents + bqry = Q("bool") + for k, v in condition.items(): + if not isinstance(k, str) or not v: + continue + if k == "exists": + bqry.filter.append(Q("exists", field=v)) + continue + if isinstance(v, list): + bqry.filter.append(Q("terms", **{k: v})) + elif isinstance(v, str) or isinstance(v, int): + bqry.filter.append(Q("term", **{k: v})) + else: + raise Exception( + f"Condition `{str(k)}={str(v)}` value type is {str(type(v))}, expected to be int, str or list.") + scripts = [] + params = {} + for k, v in newValue.items(): + if k == "remove": + if isinstance(v, str): + scripts.append(f"ctx._source.remove('{v}');") + if isinstance(v, dict): + for kk, vv in v.items(): + scripts.append(f"int i=ctx._source.{kk}.indexOf(params.p_{kk});ctx._source.{kk}.remove(i);") + params[f"p_{kk}"] = vv + continue + if k == "add": + if isinstance(v, dict): + for kk, vv in v.items(): + scripts.append(f"ctx._source.{kk}.add(params.pp_{kk});") + params[f"pp_{kk}"] = vv.strip() + continue + if (not isinstance(k, str) or not v) and k != "available_int": + continue + if isinstance(v, str): + v = re.sub(r"(['\n\r]|\\.)", " ", v) + params[f"pp_{k}"] = v + scripts.append(f"ctx._source.{k}=params.pp_{k};") + elif isinstance(v, int) or isinstance(v, float): + scripts.append(f"ctx._source.{k}={v};") + elif isinstance(v, list): + scripts.append(f"ctx._source.{k}=params.pp_{k};") + params[f"pp_{k}"] = json.dumps(v, ensure_ascii=False) + else: + raise Exception( + f"newValue `{str(k)}={str(v)}` value type is {str(type(v))}, expected to be int, str.") + ubq = UpdateByQuery( + index=indexName).using( + self.os).query(bqry) + ubq = ubq.script(source="".join(scripts), params=params) + ubq = ubq.params(refresh=True) + ubq = ubq.params(slices=5) + ubq = ubq.params(conflicts="proceed") + + for _ in range(ATTEMPT_TIME): + try: + _ = ubq.execute() + return True + except Exception as e: + logger.error("OSConnection.update got exception: " + str(e) + "\n".join(scripts)) + if re.search(r"(timeout|connection|conflict)", str(e).lower()): + continue + break + return False + + def delete(self, condition: dict, indexName: str, knowledgebaseId: str) -> int: + qry = None + assert "_id" not in condition + if "id" in condition: + chunk_ids = condition["id"] + if not isinstance(chunk_ids, list): + chunk_ids = [chunk_ids] + qry = Q("ids", values=chunk_ids) + else: + qry = Q("bool") + for k, v in condition.items(): + if k == "exists": + qry.filter.append(Q("exists", field=v)) + + elif k == "must_not": + if isinstance(v, dict): + for kk, vv in v.items(): + if kk == "exists": + qry.must_not.append(Q("exists", field=vv)) + + elif isinstance(v, list): + qry.must.append(Q("terms", **{k: v})) + elif isinstance(v, str) or isinstance(v, int): + qry.must.append(Q("term", **{k: v})) + else: + raise Exception("Condition value must be int, str or list.") + logger.debug("OSConnection.delete query: " + json.dumps(qry.to_dict())) + for _ in range(ATTEMPT_TIME): + try: + #print(Search().query(qry).to_dict(), flush=True) + res = self.os.delete_by_query( + index=indexName, + body=Search().query(qry).to_dict(), + refresh=True) + return res["deleted"] + except Exception as e: + logger.warning("OSConnection.delete got exception: " + str(e)) + if re.search(r"(timeout|connection)", str(e).lower()): + time.sleep(3) + continue + if re.search(r"(not_found)", str(e), re.IGNORECASE): + return 0 + return 0 + + """ + Helper functions for search result + """ + + def getTotal(self, res): + if isinstance(res["hits"]["total"], type({})): + return res["hits"]["total"]["value"] + return res["hits"]["total"] + + def getChunkIds(self, res): + return [d["_id"] for d in res["hits"]["hits"]] + + def __getSource(self, res): + rr = [] + for d in res["hits"]["hits"]: + d["_source"]["id"] = d["_id"] + d["_source"]["_score"] = d["_score"] + rr.append(d["_source"]) + return rr + + def getFields(self, res, fields: list[str]) -> dict[str, dict]: + res_fields = {} + if not fields: + return {} + for d in self.__getSource(res): + m = {n: d.get(n) for n in fields if d.get(n) is not None} + for n, v in m.items(): + if isinstance(v, list): + m[n] = v + continue + if not isinstance(v, str): + m[n] = str(m[n]) + # if n.find("tks") > 0: + # m[n] = rmSpace(m[n]) + + if m: + res_fields[d["id"]] = m + return res_fields + + def getHighlight(self, res, keywords: list[str], fieldnm: str): + ans = {} + for d in res["hits"]["hits"]: + hlts = d.get("highlight") + if not hlts: + continue + txt = "...".join([a for a in list(hlts.items())[0][1]]) + if not is_english(txt.split()): + ans[d["_id"]] = txt + continue + + txt = d["_source"][fieldnm] + txt = re.sub(r"[\r\n]", " ", txt, flags=re.IGNORECASE | re.MULTILINE) + txts = [] + for t in re.split(r"[.?!;\n]", txt): + for w in keywords: + t = re.sub(r"(^|[ .?/'\"\(\)!,:;-])(%s)([ .?/'\"\(\)!,:;-])" % re.escape(w), r"\1\2\3", t, + flags=re.IGNORECASE | re.MULTILINE) + if not re.search(r"[^<>]+", t, flags=re.IGNORECASE | re.MULTILINE): + continue + txts.append(t) + ans[d["_id"]] = "...".join(txts) if txts else "...".join([a for a in list(hlts.items())[0][1]]) + + return ans + + def getAggregation(self, res, fieldnm: str): + agg_field = "aggs_" + fieldnm + if "aggregations" not in res or agg_field not in res["aggregations"]: + return list() + bkts = res["aggregations"][agg_field]["buckets"] + return [(b["key"], b["doc_count"]) for b in bkts] + + """ + SQL + """ + + def sql(self, sql: str, fetch_size: int, format: str): + logger.debug(f"OSConnection.sql get sql: {sql}") + sql = re.sub(r"[ `]+", " ", sql) + sql = sql.replace("%", "") + replaces = [] + for r in re.finditer(r" ([a-z_]+_l?tks)( like | ?= ?)'([^']+)'", sql): + fld, v = r.group(1), r.group(3) + match = " MATCH({}, '{}', 'operator=OR;minimum_should_match=30%') ".format( + fld, rag_tokenizer.fine_grained_tokenize(rag_tokenizer.tokenize(v))) + replaces.append( + ("{}{}'{}'".format( + r.group(1), + r.group(2), + r.group(3)), + match)) + + for p, r in replaces: + sql = sql.replace(p, r, 1) + logger.debug(f"OSConnection.sql to os: {sql}") + + for i in range(ATTEMPT_TIME): + try: + res = self.os.sql.query(body={"query": sql, "fetch_size": fetch_size}, format=format, + request_timeout="2s") + return res + except ConnectionTimeout: + logger.exception("OSConnection.sql timeout") + continue + except Exception: + logger.exception("OSConnection.sql got exception") + return None + logger.error("OSConnection.sql timeout for 3 times!") + return None diff --git a/uv.lock b/uv.lock index e11d21e02..426ae7863 100644 --- a/uv.lock +++ b/uv.lock @@ -1324,6 +1324,14 @@ wheels = [ { url = "https://mirrors.aliyun.com/pypi/packages/c1/8b/5fe2cc11fee489817272089c4203e679c63b570a5aaeb18d852ae3cbba6a/et_xmlfile-2.0.0-py3-none-any.whl", hash = "sha256:7a91720bc756843502c3b7504c77b8fe44217c85c537d85037f0f536151b2caa" }, ] +[[package]] +name = "events" +version = "0.5" +source = { registry = "https://mirrors.aliyun.com/pypi/simple" } +wheels = [ + { url = "https://mirrors.aliyun.com/pypi/packages/25/ed/e47dec0626edd468c84c04d97769e7ab4ea6457b7f54dcb3f72b17fcd876/Events-0.5-py3-none-any.whl", hash = "sha256:a7286af378ba3e46640ac9825156c93bdba7502174dd696090fdfcd4d80a1abd" }, +] + [[package]] name = "exceptiongroup" version = "1.2.2" @@ -3652,6 +3660,22 @@ wheels = [ { url = "https://mirrors.aliyun.com/pypi/packages/c0/da/977ded879c29cbd04de313843e76868e6e13408a94ed6b987245dc7c8506/openpyxl-3.1.5-py2.py3-none-any.whl", hash = "sha256:5282c12b107bffeef825f4617dc029afaf41d0ea60823bbb665ef3079dc79de2" }, ] +[[package]] +name = "opensearch-py" +version = "2.7.1" +source = { registry = "https://mirrors.aliyun.com/pypi/simple" } +dependencies = [ + { name = "certifi" }, + { name = "events" }, + { name = "python-dateutil" }, + { name = "requests" }, + { name = "urllib3" }, +] +sdist = { url = "https://mirrors.aliyun.com/pypi/packages/c4/ca/5be52de5c69ecd327c16f3fc0dba82b7ffda5bbd0c0e215bdf23a4d12b12/opensearch_py-2.7.1.tar.gz", hash = "sha256:67ab76e9373669bc71da417096df59827c08369ac3795d5438c9a8be21cbd759" } +wheels = [ + { url = "https://mirrors.aliyun.com/pypi/packages/80/8f/db678ae203d761922a73920215ea53a79faf3bb1ec6aa9511f809c8e234c/opensearch_py-2.7.1-py3-none-any.whl", hash = "sha256:5417650eba98a1c7648e502207cebf3a12beab623ffe0ebbf55f9b1b4b6e44e9" }, +] + [[package]] name = "orjson" version = "3.10.15" @@ -4842,6 +4866,7 @@ dependencies = [ { name = "opencv-python" }, { name = "opencv-python-headless" }, { name = "openpyxl" }, + { name = "opensearch-py" }, { name = "ormsgpack" }, { name = "pandas" }, { name = "pdfplumber" }, @@ -4978,6 +5003,7 @@ requires-dist = [ { name = "opencv-python", specifier = "==4.10.0.84" }, { name = "opencv-python-headless", specifier = "==4.10.0.84" }, { name = "openpyxl", specifier = ">=3.1.0,<4.0.0" }, + { name = "opensearch-py", specifier = "==2.7.1" }, { name = "ormsgpack", specifier = "==1.5.0" }, { name = "pandas", specifier = ">=2.2.0,<3.0.0" }, { name = "pdfplumber", specifier = "==0.10.4" },