diff --git a/rag/utils/es_conn.py b/rag/utils/es_conn.py index c7c66513c..9de87de25 100644 --- a/rag/utils/es_conn.py +++ b/rag/utils/es_conn.py @@ -16,13 +16,15 @@ from rag.utils.doc_store_conn import DocStoreConnection, MatchExpr, OrderByExpr, FusionExpr from rag.nlp import is_english, rag_tokenizer +ATTEMPT_TIME = 2 + @singleton class ESConnection(DocStoreConnection): def __init__(self): self.info = {} logging.info(f"Use Elasticsearch {settings.ES['hosts']} as the doc engine.") - for _ in range(24): + for _ in range(ATTEMPT_TIME): try: self.es = Elasticsearch( settings.ES["hosts"].split(","), @@ -92,7 +94,7 @@ class ESConnection(DocStoreConnection): def indexExist(self, indexName: str, knowledgebaseId: str) -> bool: s = Index(indexName, self.es) - for i in range(3): + for i in range(ATTEMPT_TIME): try: return s.exists() except Exception as e: @@ -144,9 +146,9 @@ class ESConnection(DocStoreConnection): if "minimum_should_match" in m.extra_options: minimum_should_match = str(int(m.extra_options["minimum_should_match"] * 100)) + "%" bqry.must.append(Q("query_string", fields=m.fields, - type="best_fields", query=m.matching_text, - minimum_should_match=minimum_should_match, - boost=1)) + type="best_fields", query=m.matching_text, + minimum_should_match=minimum_should_match, + boost=1)) bqry.boost = 1.0 - vector_similarity_weight elif isinstance(m, MatchDenseExpr): @@ -180,7 +182,7 @@ class ESConnection(DocStoreConnection): q = s.to_dict() logging.debug(f"ESConnection.search {str(indexNames)} query: " + json.dumps(q)) - for i in range(3): + for i in range(ATTEMPT_TIME): try: res = self.es.search(index=indexNames, body=q, @@ -201,7 +203,7 @@ class ESConnection(DocStoreConnection): raise Exception("ESConnection.search timeout.") def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None: - for i in range(3): + for i in range(ATTEMPT_TIME): try: res = self.es.get(index=(indexName), id=chunkId, source=True, ) @@ -233,7 +235,7 @@ class ESConnection(DocStoreConnection): operations.append(d_copy) res = [] - for _ in range(100): + for _ in range(ATTEMPT_TIME): try: r = self.es.bulk(index=(indexName), operations=operations, refresh=False, timeout="600s") @@ -258,7 +260,7 @@ class ESConnection(DocStoreConnection): if "id" in condition and isinstance(condition["id"], str): # update specific single document chunkId = condition["id"] - for i in range(3): + for i in range(ATTEMPT_TIME): try: self.es.update(index=indexName, id=chunkId, doc=doc) return True @@ -326,7 +328,7 @@ class ESConnection(DocStoreConnection): else: raise Exception("Condition value must be int, str or list.") logging.debug("ESConnection.delete query: " + json.dumps(qry.to_dict())) - for _ in range(10): + for _ in range(ATTEMPT_TIME): try: res = self.es.delete_by_query( index=indexName, @@ -437,7 +439,7 @@ class ESConnection(DocStoreConnection): sql = sql.replace(p, r, 1) logging.debug(f"ESConnection.sql to es: {sql}") - for i in range(3): + for i in range(ATTEMPT_TIME): try: res = self.es.sql.query(body={"query": sql, "fetch_size": fetch_size}, format=format, request_timeout="2s")