diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index a30016b3b..7a653a6b4 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -37,6 +37,7 @@ from rag.nlp import rag_tokenizer, search from rag.settings import get_svr_queue_name from rag.utils.redis_conn import REDIS_CONN from rag.utils.storage_factory import STORAGE_IMPL +from rag.utils.doc_store_conn import OrderByExpr class DocumentService(CommonService): @@ -111,14 +112,18 @@ class DocumentService(CommonService): cls.clear_chunk_num(doc.id) try: settings.docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), doc.kb_id) - settings.docStoreConn.update({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["entity", "relation", "graph", "subgraph", "community_report"], "source_id": doc.id}, - {"remove": {"source_id": doc.id}}, - search.index_name(tenant_id), doc.kb_id) - settings.docStoreConn.update({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["graph"]}, - {"removed_kwd": "Y"}, - search.index_name(tenant_id), doc.kb_id) - settings.docStoreConn.delete({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["entity", "relation", "graph", "subgraph", "community_report"], "must_not": {"exists": "source_id"}}, - search.index_name(tenant_id), doc.kb_id) + graph_source = settings.docStoreConn.getFields( + settings.docStoreConn.search(["source_id"], [], {"kb_id": doc.kb_id, "knowledge_graph_kwd": ["graph"]}, [], OrderByExpr(), 0, 1, search.index_name(tenant_id), [doc.kb_id]), ["source_id"] + ) + if len(graph_source) > 0 and doc.id in list(graph_source.values())[0]["source_id"]: + settings.docStoreConn.update({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["entity", "relation", "graph", "subgraph", "community_report"], "source_id": doc.id}, + {"remove": {"source_id": doc.id}}, + search.index_name(tenant_id), doc.kb_id) + settings.docStoreConn.update({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["graph"]}, + {"removed_kwd": "Y"}, + search.index_name(tenant_id), doc.kb_id) + settings.docStoreConn.delete({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["entity", "relation", "graph", "subgraph", "community_report"], "must_not": {"exists": "source_id"}}, + search.index_name(tenant_id), doc.kb_id) except Exception: pass return cls.delete_by_id(doc.id) diff --git a/graphrag/general/index.py b/graphrag/general/index.py index 1e300f55f..06aef6b48 100644 --- a/graphrag/general/index.py +++ b/graphrag/general/index.py @@ -204,7 +204,7 @@ async def merge_subgraph( ): start = trio.current_time() change = GraphChange() - old_graph = await get_graph(tenant_id, kb_id) + old_graph = await get_graph(tenant_id, kb_id, subgraph.graph["source_id"]) if old_graph is not None: logging.info("Merge with an exiting graph...................") tidy_graph(old_graph, callback) diff --git a/graphrag/utils.py b/graphrag/utils.py index 13472e414..151b5aab7 100644 --- a/graphrag/utils.py +++ b/graphrag/utils.py @@ -406,32 +406,33 @@ async def get_graph_doc_ids(tenant_id, kb_id) -> list[str]: return doc_ids -async def get_graph(tenant_id, kb_id): +async def get_graph(tenant_id, kb_id, exclude_rebuild=None): conds = { - "fields": ["content_with_weight", "source_id"], - "removed_kwd": "N", + "fields": ["content_with_weight", "removed_kwd", "source_id"], "size": 1, "knowledge_graph_kwd": ["graph"] } res = await trio.to_thread.run_sync(lambda: settings.retrievaler.search(conds, search.index_name(tenant_id), [kb_id])) - if res.total == 0: - return None - for id in res.ids: - try: - g = json_graph.node_link_graph(json.loads(res.field[id]["content_with_weight"]), edges="edges") - if "source_id" not in g.graph: - g.graph["source_id"] = res.field[id]["source_id"] - return g - except Exception: - continue - result = await rebuild_graph(tenant_id, kb_id) + if not res.total == 0: + for id in res.ids: + try: + if res.field[id]["removed_kwd"] == "N": + g = json_graph.node_link_graph(json.loads(res.field[id]["content_with_weight"]), edges="edges") + if "source_id" not in g.graph: + g.graph["source_id"] = res.field[id]["source_id"] + else: + g = await rebuild_graph(tenant_id, kb_id, exclude_rebuild) + return g + except Exception: + continue + result = None return result async def set_graph(tenant_id: str, kb_id: str, embd_mdl, graph: nx.Graph, change: GraphChange, callback): start = trio.current_time() - await trio.to_thread.run_sync(lambda: settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph"]}, search.index_name(tenant_id), kb_id)) + await trio.to_thread.run_sync(lambda: settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph"]}, search.index_name(tenant_id), kb_id)) if change.removed_nodes: await trio.to_thread.run_sync(lambda: settings.docStoreConn.delete({"knowledge_graph_kwd": ["entity"], "entity_kwd": sorted(change.removed_nodes)}, search.index_name(tenant_id), kb_id)) @@ -454,6 +455,23 @@ async def set_graph(tenant_id: str, kb_id: str, embd_mdl, graph: nx.Graph, chang "available_int": 0, "removed_kwd": "N" }] + + # generate updated subgraphs + for source in graph.graph["source_id"]: + subgraph = graph.subgraph([n for n in graph.nodes if source in graph.nodes[n]["source_id"]]).copy() + subgraph.graph["source_id"] = [source] + for n in subgraph.nodes: + subgraph.nodes[n]["source_id"] = [source] + chunks.append({ + "id": get_uuid(), + "content_with_weight": json.dumps(nx.node_link_data(subgraph, edges="edges"), ensure_ascii=False), + "knowledge_graph_kwd": "subgraph", + "kb_id": kb_id, + "source_id": [source], + "available_int": 0, + "removed_kwd": "N" + }) + async with trio.open_nursery() as nursery: for node in change.added_updated_nodes: node_attrs = graph.nodes[node] @@ -554,48 +572,45 @@ def flat_uniq_list(arr, key): return list(set(res)) -async def rebuild_graph(tenant_id, kb_id): +async def rebuild_graph(tenant_id, kb_id, exclude_rebuild=None): graph = nx.Graph() - src_ids = set() - flds = ["entity_kwd", "from_entity_kwd", "to_entity_kwd", "knowledge_graph_kwd", "content_with_weight", "source_id"] + flds = ["knowledge_graph_kwd", "content_with_weight", "source_id"] bs = 256 for i in range(0, 1024*bs, bs): es_res = await trio.to_thread.run_sync(lambda: settings.docStoreConn.search(flds, [], - {"kb_id": kb_id, "knowledge_graph_kwd": ["entity"]}, + {"kb_id": kb_id, "knowledge_graph_kwd": ["subgraph"]}, [], OrderByExpr(), i, bs, search.index_name(tenant_id), [kb_id] )) - tot = settings.docStoreConn.getTotal(es_res) - if tot == 0: + # tot = settings.docStoreConn.getTotal(es_res) + es_res = settings.docStoreConn.getFields(es_res, flds) + + if len(es_res) == 0: break - es_res = settings.docStoreConn.getFields(es_res, flds) for id, d in es_res.items(): - assert d["knowledge_graph_kwd"] == "relation" - src_ids.update(d.get("source_id", [])) - attrs = json.load(d["content_with_weight"]) - graph.add_node(d["entity_kwd"], **attrs) + assert d["knowledge_graph_kwd"] == "subgraph" + if isinstance(exclude_rebuild, list): + if sum([n in d["source_id"] for n in exclude_rebuild]): + continue + elif exclude_rebuild in d["source_id"]: + continue + + next_graph = json_graph.node_link_graph(json.loads(d["content_with_weight"]), edges="edges") + merged_graph = nx.compose(graph, next_graph) + merged_source = { + n: graph.nodes[n]["source_id"] + next_graph.nodes[n]["source_id"] + for n in graph.nodes & next_graph.nodes + } + nx.set_node_attributes(merged_graph, merged_source, "source_id") + if "source_id" in graph.graph: + merged_graph.graph["source_id"] = graph.graph["source_id"] + next_graph.graph["source_id"] + else: + merged_graph.graph["source_id"] = next_graph.graph["source_id"] + graph = merged_graph - for i in range(0, 1024*bs, bs): - es_res = await trio.to_thread.run_sync(lambda: settings.docStoreConn.search(flds, [], - {"kb_id": kb_id, "knowledge_graph_kwd": ["relation"]}, - [], - OrderByExpr(), - i, bs, search.index_name(tenant_id), [kb_id] - )) - tot = settings.docStoreConn.getTotal(es_res) - if tot == 0: - return None - - es_res = settings.docStoreConn.getFields(es_res, flds) - for id, d in es_res.items(): - assert d["knowledge_graph_kwd"] == "relation" - src_ids.update(d.get("source_id", [])) - if graph.has_node(d["from_entity_kwd"]) and graph.has_node(d["to_entity_kwd"]): - attrs = json.load(d["content_with_weight"]) - graph.add_edge(d["from_entity_kwd"], d["to_entity_kwd"], **attrs) - - src_ids = sorted(src_ids) - graph.graph["source_id"] = src_ids + if len(graph.nodes) == 0: + return None + graph.graph["source_id"] = sorted(graph.graph["source_id"]) return graph diff --git a/rag/utils/infinity_conn.py b/rag/utils/infinity_conn.py index 0abc1903d..f3f9be3a0 100644 --- a/rag/utils/infinity_conn.py +++ b/rag/utils/infinity_conn.py @@ -42,6 +42,11 @@ from rag.utils.doc_store_conn import ( logger = logging.getLogger('ragflow.infinity_conn') +def field_keyword(field_name: str): + # The "docnm_kwd" field is always a string, not list. + if field_name == "source_id" or (field_name.endswith("_kwd") and field_name != "docnm_kwd" and field_name != "knowledge_graph_kwd"): + return True + return False def equivalent_condition_to_str(condition: dict, table_instance=None) -> str | None: assert "_id" not in condition @@ -64,7 +69,20 @@ def equivalent_condition_to_str(condition: dict, table_instance=None) -> str | N for k, v in condition.items(): if not isinstance(k, str) or k in ["kb_id"] or not v: continue - if isinstance(v, list): + if field_keyword(k): + if isinstance(v, list): + inCond = list() + for item in v: + if isinstance(item, str): + item = item.replace("'","''") + inCond.append(f"filter_fulltext('{k}', '{item}')") + if inCond: + strInCond = " or ".join(inCond) + strInCond = f"({strInCond})" + cond.append(strInCond) + else: + cond.append(f"filter_fulltext('{k}', '{v}')") + elif isinstance(v, list): inCond = list() for item in v: if isinstance(item, str): @@ -118,7 +136,7 @@ class InfinityConnection(DocStoreConnection): logger.info(f"Use Infinity {infinity_uri} as the doc engine.") for _ in range(24): try: - connPool = ConnectionPool(infinity_uri) + connPool = ConnectionPool(infinity_uri, max_size=32) inf_conn = connPool.get_conn() res = inf_conn.show_current_node() if res.error_code == ErrorCode.OK and res.server_status in ["started", "alive"]: @@ -173,12 +191,6 @@ class InfinityConnection(DocStoreConnection): ConflictType.Ignore, ) - def field_keyword(self, field_name: str): - # The "docnm_kwd" field is always a string, not list. - if field_name == "source_id" or (field_name.endswith("_kwd") and field_name != "docnm_kwd" and field_name != "knowledge_graph_kwd"): - return True - return False - """ Database operations """ @@ -487,7 +499,7 @@ class InfinityConnection(DocStoreConnection): assert "_id" not in d assert "id" in d for k, v in d.items(): - if self.field_keyword(k): + if field_keyword(k): if isinstance(v, list): d[k] = "###".join(v) else: @@ -534,9 +546,15 @@ class InfinityConnection(DocStoreConnection): table_instance = db_instance.get_table(table_name) #if "exists" in condition: # del condition["exists"] + + clmns = {} + if table_instance: + for n, ty, de, _ in table_instance.show_columns().rows(): + clmns[n] = (ty, de) filter = equivalent_condition_to_str(condition, table_instance) + removeValue = {} for k, v in list(newValue.items()): - if self.field_keyword(k): + if field_keyword(k): if isinstance(v, list): newValue[k] = "###".join(v) else: @@ -554,13 +572,42 @@ class InfinityConnection(DocStoreConnection): assert isinstance(v, list) newValue[k] = "_".join(f"{num:08x}" for num in v) elif k == "remove": - del newValue[k] - if v in [PAGERANK_FLD]: - newValue[v] = 0 + if isinstance(v, str): + assert v in clmns, f"'{v}' should be in '{clmns}'." + ty, de = clmns[v] + if ty.lower().find("cha"): + if not de: + de = "" + newValue[v] = de + else: + for kk, vv in v.items(): + removeValue[kk] = vv + del newValue[k] else: newValue[k] = v + + remove_opt = {} # "[k,new_value]": [id_to_update, ...] + if removeValue: + col_to_remove = list(removeValue.keys()) + row_to_opt = table_instance.output(col_to_remove + ['id']).filter(filter).to_df() + logger.debug(f"INFINITY search table {str(table_name)}, filter {filter}, result: {str(row_to_opt[0])}") + row_to_opt = self.getFields(row_to_opt, col_to_remove) + for id, old_v in row_to_opt.items(): + for k, remove_v in removeValue.items(): + if remove_v in old_v[k]: + new_v = old_v[k].copy() + new_v.remove(remove_v) + kv_key = json.dumps([k, new_v]) + if kv_key not in remove_opt: + remove_opt[kv_key] = [id] + else: + remove_opt[kv_key].append(id) logger.debug(f"INFINITY update table {table_name}, filter {filter}, newValue {newValue}.") + for update_kv, ids in remove_opt.items(): + k, v = json.loads(update_kv) + table_instance.update(filter + " AND id in ({0})".format(",".join([f"'{id}'" for id in ids])), {k:"###".join(v)}) + table_instance.update(filter, newValue) self.connPool.release_conn(inf_conn) return True @@ -613,7 +660,7 @@ class InfinityConnection(DocStoreConnection): for column in res2.columns: k = column.lower() - if self.field_keyword(k): + if field_keyword(k): res2[column] = res2[column].apply(lambda v:[kwd for kwd in v.split("###") if kwd]) elif k == "position_int": def to_position_int(v):