diff --git a/api/apps/kb_app.py b/api/apps/kb_app.py index 249ac2d73..14caf7c22 100644 --- a/api/apps/kb_app.py +++ b/api/apps/kb_app.py @@ -107,6 +107,7 @@ def update(): settings.docStoreConn.update({"kb_id": kb.id}, {"pagerank_fea": req["pagerank"]}, search.index_name(kb.tenant_id), kb.id) else: + # Elasticsearch requires pagerank_fea be non-zero! settings.docStoreConn.update({"exist": "pagerank_fea"}, {"remove": "pagerank_fea"}, search.index_name(kb.tenant_id), kb.id) diff --git a/rag/utils/infinity_conn.py b/rag/utils/infinity_conn.py index 2cbfede59..7c56a107a 100644 --- a/rag/utils/infinity_conn.py +++ b/rag/utils/infinity_conn.py @@ -46,13 +46,14 @@ def equivalent_condition_to_str(condition: dict) -> str|None: cond.append(f"{k}='{v}'") else: cond.append(f"{k}={str(v)}") - return " AND ".join(cond) if cond else None + return " AND ".join(cond) if cond else "1=1" def concat_dataframes(df_list: list[pl.DataFrame], selectFields: list[str]) -> pl.DataFrame: """ Concatenate multiple dataframes into one. """ + df_list = [df for df in df_list if not df.is_empty()] if df_list: return pl.concat(df_list) schema = dict() @@ -246,8 +247,9 @@ class InfinityConnection(DocStoreConnection): db_instance = inf_conn.get_database(self.dbName) df_list = list() table_list = list() - if "id" not in selectFields: - selectFields.append("id") + for essential_field in ["id", "score()", "pagerank_fea"]: + if essential_field not in selectFields: + selectFields.append(essential_field) # Prepare expressions common to all tables filter_cond = None @@ -331,10 +333,13 @@ class InfinityConnection(DocStoreConnection): kb_res, extra_result = builder.option({"total_hits_count": True}).to_pl() if extra_result: total_hits_count += int(extra_result["total_hits_count"]) + logger.debug(f"INFINITY search table: {str(table_name)}, result: {str(kb_res)}") df_list.append(kb_res) self.connPool.release_conn(inf_conn) res = concat_dataframes(df_list, selectFields) - logger.debug(f"INFINITY search tables: {str(table_list)}, result: {str(res)}") + res = res.sort(pl.col("SCORE") + pl.col("pagerank_fea"), descending=True, maintain_order=True) + res = res.limit(limit) + logger.debug(f"INFINITY search final result: {str(res)}") return res, total_hits_count def get( @@ -350,12 +355,10 @@ class InfinityConnection(DocStoreConnection): table_list.append(table_name) table_instance = db_instance.get_table(table_name) kb_res, _ = table_instance.output(["*"]).filter(f"id = '{chunkId}'").to_pl() - if len(kb_res) != 0 and kb_res.shape[0] > 0: - df_list.append(kb_res) - + logger.debug(f"INFINITY get table: {str(table_list)}, result: {str(kb_res)}") + df_list.append(kb_res) self.connPool.release_conn(inf_conn) res = concat_dataframes(df_list, ["id"]) - logger.debug(f"INFINITY get tables: {str(table_list)}, result: {str(res)}") res_fields = self.getFields(res, res.columns) return res_fields.get(chunkId, None) @@ -421,8 +424,10 @@ class InfinityConnection(DocStoreConnection): db_instance = inf_conn.get_database(self.dbName) table_name = f"{indexName}_{knowledgebaseId}" table_instance = db_instance.get_table(table_name) + if "exist" in condition: + del condition["exist"] filter = equivalent_condition_to_str(condition) - for k, v in newValue.items(): + for k, v in list(newValue.items()): if k.endswith("_kwd") and isinstance(v, list): newValue[k] = " ".join(v) elif k == 'kb_id': @@ -435,6 +440,9 @@ class InfinityConnection(DocStoreConnection): elif k in ["page_num_int", "top_int"]: assert isinstance(v, list) newValue[k] = "_".join(f"{num:08x}" for num in v) + elif k == "remove" and v in ["pagerank_fea"]: + del newValue[k] + newValue[v] = 0 logger.debug(f"INFINITY update table {table_name}, filter {filter}, newValue {newValue}.") table_instance.update(filter, newValue) self.connPool.release_conn(inf_conn)