mirror of
https://git.mirrors.martin98.com/https://github.com/infiniflow/ragflow.git
synced 2025-06-04 11:24:00 +08:00
Fix: whole knowledge graph lost after removing any document in the knowledge base (#7151)
### What problem does this PR solve? When you removed any document in a knowledge base using knowledge graph, the graph's `removed_kwd` is set to "Y". However, in the function `graphrag.utils.get_gaph`, `rebuild_graph` method is passed and directly return `None` while `removed_kwd=Y`, making residual part of the graph abandoned (but old entity data still exist in db). Besides, infinity instance actually pass deleting graph components' `source_id` when removing document. It may cause wrong graph after rebuild. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
parent
538a408608
commit
ab27609a64
@ -37,6 +37,7 @@ from rag.nlp import rag_tokenizer, search
|
|||||||
from rag.settings import get_svr_queue_name
|
from rag.settings import get_svr_queue_name
|
||||||
from rag.utils.redis_conn import REDIS_CONN
|
from rag.utils.redis_conn import REDIS_CONN
|
||||||
from rag.utils.storage_factory import STORAGE_IMPL
|
from rag.utils.storage_factory import STORAGE_IMPL
|
||||||
|
from rag.utils.doc_store_conn import OrderByExpr
|
||||||
|
|
||||||
|
|
||||||
class DocumentService(CommonService):
|
class DocumentService(CommonService):
|
||||||
@ -111,14 +112,18 @@ class DocumentService(CommonService):
|
|||||||
cls.clear_chunk_num(doc.id)
|
cls.clear_chunk_num(doc.id)
|
||||||
try:
|
try:
|
||||||
settings.docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), doc.kb_id)
|
settings.docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), doc.kb_id)
|
||||||
settings.docStoreConn.update({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["entity", "relation", "graph", "subgraph", "community_report"], "source_id": doc.id},
|
graph_source = settings.docStoreConn.getFields(
|
||||||
{"remove": {"source_id": doc.id}},
|
settings.docStoreConn.search(["source_id"], [], {"kb_id": doc.kb_id, "knowledge_graph_kwd": ["graph"]}, [], OrderByExpr(), 0, 1, search.index_name(tenant_id), [doc.kb_id]), ["source_id"]
|
||||||
search.index_name(tenant_id), doc.kb_id)
|
)
|
||||||
settings.docStoreConn.update({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["graph"]},
|
if len(graph_source) > 0 and doc.id in list(graph_source.values())[0]["source_id"]:
|
||||||
{"removed_kwd": "Y"},
|
settings.docStoreConn.update({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["entity", "relation", "graph", "subgraph", "community_report"], "source_id": doc.id},
|
||||||
search.index_name(tenant_id), doc.kb_id)
|
{"remove": {"source_id": doc.id}},
|
||||||
settings.docStoreConn.delete({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["entity", "relation", "graph", "subgraph", "community_report"], "must_not": {"exists": "source_id"}},
|
search.index_name(tenant_id), doc.kb_id)
|
||||||
search.index_name(tenant_id), doc.kb_id)
|
settings.docStoreConn.update({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["graph"]},
|
||||||
|
{"removed_kwd": "Y"},
|
||||||
|
search.index_name(tenant_id), doc.kb_id)
|
||||||
|
settings.docStoreConn.delete({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["entity", "relation", "graph", "subgraph", "community_report"], "must_not": {"exists": "source_id"}},
|
||||||
|
search.index_name(tenant_id), doc.kb_id)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
return cls.delete_by_id(doc.id)
|
return cls.delete_by_id(doc.id)
|
||||||
|
@ -204,7 +204,7 @@ async def merge_subgraph(
|
|||||||
):
|
):
|
||||||
start = trio.current_time()
|
start = trio.current_time()
|
||||||
change = GraphChange()
|
change = GraphChange()
|
||||||
old_graph = await get_graph(tenant_id, kb_id)
|
old_graph = await get_graph(tenant_id, kb_id, subgraph.graph["source_id"])
|
||||||
if old_graph is not None:
|
if old_graph is not None:
|
||||||
logging.info("Merge with an exiting graph...................")
|
logging.info("Merge with an exiting graph...................")
|
||||||
tidy_graph(old_graph, callback)
|
tidy_graph(old_graph, callback)
|
||||||
|
@ -406,32 +406,33 @@ async def get_graph_doc_ids(tenant_id, kb_id) -> list[str]:
|
|||||||
return doc_ids
|
return doc_ids
|
||||||
|
|
||||||
|
|
||||||
async def get_graph(tenant_id, kb_id):
|
async def get_graph(tenant_id, kb_id, exclude_rebuild=None):
|
||||||
conds = {
|
conds = {
|
||||||
"fields": ["content_with_weight", "source_id"],
|
"fields": ["content_with_weight", "removed_kwd", "source_id"],
|
||||||
"removed_kwd": "N",
|
|
||||||
"size": 1,
|
"size": 1,
|
||||||
"knowledge_graph_kwd": ["graph"]
|
"knowledge_graph_kwd": ["graph"]
|
||||||
}
|
}
|
||||||
res = await trio.to_thread.run_sync(lambda: settings.retrievaler.search(conds, search.index_name(tenant_id), [kb_id]))
|
res = await trio.to_thread.run_sync(lambda: settings.retrievaler.search(conds, search.index_name(tenant_id), [kb_id]))
|
||||||
if res.total == 0:
|
if not res.total == 0:
|
||||||
return None
|
for id in res.ids:
|
||||||
for id in res.ids:
|
try:
|
||||||
try:
|
if res.field[id]["removed_kwd"] == "N":
|
||||||
g = json_graph.node_link_graph(json.loads(res.field[id]["content_with_weight"]), edges="edges")
|
g = json_graph.node_link_graph(json.loads(res.field[id]["content_with_weight"]), edges="edges")
|
||||||
if "source_id" not in g.graph:
|
if "source_id" not in g.graph:
|
||||||
g.graph["source_id"] = res.field[id]["source_id"]
|
g.graph["source_id"] = res.field[id]["source_id"]
|
||||||
return g
|
else:
|
||||||
except Exception:
|
g = await rebuild_graph(tenant_id, kb_id, exclude_rebuild)
|
||||||
continue
|
return g
|
||||||
result = await rebuild_graph(tenant_id, kb_id)
|
except Exception:
|
||||||
|
continue
|
||||||
|
result = None
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
async def set_graph(tenant_id: str, kb_id: str, embd_mdl, graph: nx.Graph, change: GraphChange, callback):
|
async def set_graph(tenant_id: str, kb_id: str, embd_mdl, graph: nx.Graph, change: GraphChange, callback):
|
||||||
start = trio.current_time()
|
start = trio.current_time()
|
||||||
|
|
||||||
await trio.to_thread.run_sync(lambda: settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph"]}, search.index_name(tenant_id), kb_id))
|
await trio.to_thread.run_sync(lambda: settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph"]}, search.index_name(tenant_id), kb_id))
|
||||||
|
|
||||||
if change.removed_nodes:
|
if change.removed_nodes:
|
||||||
await trio.to_thread.run_sync(lambda: settings.docStoreConn.delete({"knowledge_graph_kwd": ["entity"], "entity_kwd": sorted(change.removed_nodes)}, search.index_name(tenant_id), kb_id))
|
await trio.to_thread.run_sync(lambda: settings.docStoreConn.delete({"knowledge_graph_kwd": ["entity"], "entity_kwd": sorted(change.removed_nodes)}, search.index_name(tenant_id), kb_id))
|
||||||
@ -454,6 +455,23 @@ async def set_graph(tenant_id: str, kb_id: str, embd_mdl, graph: nx.Graph, chang
|
|||||||
"available_int": 0,
|
"available_int": 0,
|
||||||
"removed_kwd": "N"
|
"removed_kwd": "N"
|
||||||
}]
|
}]
|
||||||
|
|
||||||
|
# generate updated subgraphs
|
||||||
|
for source in graph.graph["source_id"]:
|
||||||
|
subgraph = graph.subgraph([n for n in graph.nodes if source in graph.nodes[n]["source_id"]]).copy()
|
||||||
|
subgraph.graph["source_id"] = [source]
|
||||||
|
for n in subgraph.nodes:
|
||||||
|
subgraph.nodes[n]["source_id"] = [source]
|
||||||
|
chunks.append({
|
||||||
|
"id": get_uuid(),
|
||||||
|
"content_with_weight": json.dumps(nx.node_link_data(subgraph, edges="edges"), ensure_ascii=False),
|
||||||
|
"knowledge_graph_kwd": "subgraph",
|
||||||
|
"kb_id": kb_id,
|
||||||
|
"source_id": [source],
|
||||||
|
"available_int": 0,
|
||||||
|
"removed_kwd": "N"
|
||||||
|
})
|
||||||
|
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
for node in change.added_updated_nodes:
|
for node in change.added_updated_nodes:
|
||||||
node_attrs = graph.nodes[node]
|
node_attrs = graph.nodes[node]
|
||||||
@ -554,48 +572,45 @@ def flat_uniq_list(arr, key):
|
|||||||
return list(set(res))
|
return list(set(res))
|
||||||
|
|
||||||
|
|
||||||
async def rebuild_graph(tenant_id, kb_id):
|
async def rebuild_graph(tenant_id, kb_id, exclude_rebuild=None):
|
||||||
graph = nx.Graph()
|
graph = nx.Graph()
|
||||||
src_ids = set()
|
flds = ["knowledge_graph_kwd", "content_with_weight", "source_id"]
|
||||||
flds = ["entity_kwd", "from_entity_kwd", "to_entity_kwd", "knowledge_graph_kwd", "content_with_weight", "source_id"]
|
|
||||||
bs = 256
|
bs = 256
|
||||||
for i in range(0, 1024*bs, bs):
|
for i in range(0, 1024*bs, bs):
|
||||||
es_res = await trio.to_thread.run_sync(lambda: settings.docStoreConn.search(flds, [],
|
es_res = await trio.to_thread.run_sync(lambda: settings.docStoreConn.search(flds, [],
|
||||||
{"kb_id": kb_id, "knowledge_graph_kwd": ["entity"]},
|
{"kb_id": kb_id, "knowledge_graph_kwd": ["subgraph"]},
|
||||||
[],
|
[],
|
||||||
OrderByExpr(),
|
OrderByExpr(),
|
||||||
i, bs, search.index_name(tenant_id), [kb_id]
|
i, bs, search.index_name(tenant_id), [kb_id]
|
||||||
))
|
))
|
||||||
tot = settings.docStoreConn.getTotal(es_res)
|
# tot = settings.docStoreConn.getTotal(es_res)
|
||||||
if tot == 0:
|
es_res = settings.docStoreConn.getFields(es_res, flds)
|
||||||
|
|
||||||
|
if len(es_res) == 0:
|
||||||
break
|
break
|
||||||
|
|
||||||
es_res = settings.docStoreConn.getFields(es_res, flds)
|
|
||||||
for id, d in es_res.items():
|
for id, d in es_res.items():
|
||||||
assert d["knowledge_graph_kwd"] == "relation"
|
assert d["knowledge_graph_kwd"] == "subgraph"
|
||||||
src_ids.update(d.get("source_id", []))
|
if isinstance(exclude_rebuild, list):
|
||||||
attrs = json.load(d["content_with_weight"])
|
if sum([n in d["source_id"] for n in exclude_rebuild]):
|
||||||
graph.add_node(d["entity_kwd"], **attrs)
|
continue
|
||||||
|
elif exclude_rebuild in d["source_id"]:
|
||||||
|
continue
|
||||||
|
|
||||||
|
next_graph = json_graph.node_link_graph(json.loads(d["content_with_weight"]), edges="edges")
|
||||||
|
merged_graph = nx.compose(graph, next_graph)
|
||||||
|
merged_source = {
|
||||||
|
n: graph.nodes[n]["source_id"] + next_graph.nodes[n]["source_id"]
|
||||||
|
for n in graph.nodes & next_graph.nodes
|
||||||
|
}
|
||||||
|
nx.set_node_attributes(merged_graph, merged_source, "source_id")
|
||||||
|
if "source_id" in graph.graph:
|
||||||
|
merged_graph.graph["source_id"] = graph.graph["source_id"] + next_graph.graph["source_id"]
|
||||||
|
else:
|
||||||
|
merged_graph.graph["source_id"] = next_graph.graph["source_id"]
|
||||||
|
graph = merged_graph
|
||||||
|
|
||||||
for i in range(0, 1024*bs, bs):
|
if len(graph.nodes) == 0:
|
||||||
es_res = await trio.to_thread.run_sync(lambda: settings.docStoreConn.search(flds, [],
|
return None
|
||||||
{"kb_id": kb_id, "knowledge_graph_kwd": ["relation"]},
|
graph.graph["source_id"] = sorted(graph.graph["source_id"])
|
||||||
[],
|
|
||||||
OrderByExpr(),
|
|
||||||
i, bs, search.index_name(tenant_id), [kb_id]
|
|
||||||
))
|
|
||||||
tot = settings.docStoreConn.getTotal(es_res)
|
|
||||||
if tot == 0:
|
|
||||||
return None
|
|
||||||
|
|
||||||
es_res = settings.docStoreConn.getFields(es_res, flds)
|
|
||||||
for id, d in es_res.items():
|
|
||||||
assert d["knowledge_graph_kwd"] == "relation"
|
|
||||||
src_ids.update(d.get("source_id", []))
|
|
||||||
if graph.has_node(d["from_entity_kwd"]) and graph.has_node(d["to_entity_kwd"]):
|
|
||||||
attrs = json.load(d["content_with_weight"])
|
|
||||||
graph.add_edge(d["from_entity_kwd"], d["to_entity_kwd"], **attrs)
|
|
||||||
|
|
||||||
src_ids = sorted(src_ids)
|
|
||||||
graph.graph["source_id"] = src_ids
|
|
||||||
return graph
|
return graph
|
||||||
|
@ -42,6 +42,11 @@ from rag.utils.doc_store_conn import (
|
|||||||
|
|
||||||
logger = logging.getLogger('ragflow.infinity_conn')
|
logger = logging.getLogger('ragflow.infinity_conn')
|
||||||
|
|
||||||
|
def field_keyword(field_name: str):
|
||||||
|
# The "docnm_kwd" field is always a string, not list.
|
||||||
|
if field_name == "source_id" or (field_name.endswith("_kwd") and field_name != "docnm_kwd" and field_name != "knowledge_graph_kwd"):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
def equivalent_condition_to_str(condition: dict, table_instance=None) -> str | None:
|
def equivalent_condition_to_str(condition: dict, table_instance=None) -> str | None:
|
||||||
assert "_id" not in condition
|
assert "_id" not in condition
|
||||||
@ -64,7 +69,20 @@ def equivalent_condition_to_str(condition: dict, table_instance=None) -> str | N
|
|||||||
for k, v in condition.items():
|
for k, v in condition.items():
|
||||||
if not isinstance(k, str) or k in ["kb_id"] or not v:
|
if not isinstance(k, str) or k in ["kb_id"] or not v:
|
||||||
continue
|
continue
|
||||||
if isinstance(v, list):
|
if field_keyword(k):
|
||||||
|
if isinstance(v, list):
|
||||||
|
inCond = list()
|
||||||
|
for item in v:
|
||||||
|
if isinstance(item, str):
|
||||||
|
item = item.replace("'","''")
|
||||||
|
inCond.append(f"filter_fulltext('{k}', '{item}')")
|
||||||
|
if inCond:
|
||||||
|
strInCond = " or ".join(inCond)
|
||||||
|
strInCond = f"({strInCond})"
|
||||||
|
cond.append(strInCond)
|
||||||
|
else:
|
||||||
|
cond.append(f"filter_fulltext('{k}', '{v}')")
|
||||||
|
elif isinstance(v, list):
|
||||||
inCond = list()
|
inCond = list()
|
||||||
for item in v:
|
for item in v:
|
||||||
if isinstance(item, str):
|
if isinstance(item, str):
|
||||||
@ -118,7 +136,7 @@ class InfinityConnection(DocStoreConnection):
|
|||||||
logger.info(f"Use Infinity {infinity_uri} as the doc engine.")
|
logger.info(f"Use Infinity {infinity_uri} as the doc engine.")
|
||||||
for _ in range(24):
|
for _ in range(24):
|
||||||
try:
|
try:
|
||||||
connPool = ConnectionPool(infinity_uri)
|
connPool = ConnectionPool(infinity_uri, max_size=32)
|
||||||
inf_conn = connPool.get_conn()
|
inf_conn = connPool.get_conn()
|
||||||
res = inf_conn.show_current_node()
|
res = inf_conn.show_current_node()
|
||||||
if res.error_code == ErrorCode.OK and res.server_status in ["started", "alive"]:
|
if res.error_code == ErrorCode.OK and res.server_status in ["started", "alive"]:
|
||||||
@ -173,12 +191,6 @@ class InfinityConnection(DocStoreConnection):
|
|||||||
ConflictType.Ignore,
|
ConflictType.Ignore,
|
||||||
)
|
)
|
||||||
|
|
||||||
def field_keyword(self, field_name: str):
|
|
||||||
# The "docnm_kwd" field is always a string, not list.
|
|
||||||
if field_name == "source_id" or (field_name.endswith("_kwd") and field_name != "docnm_kwd" and field_name != "knowledge_graph_kwd"):
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Database operations
|
Database operations
|
||||||
"""
|
"""
|
||||||
@ -487,7 +499,7 @@ class InfinityConnection(DocStoreConnection):
|
|||||||
assert "_id" not in d
|
assert "_id" not in d
|
||||||
assert "id" in d
|
assert "id" in d
|
||||||
for k, v in d.items():
|
for k, v in d.items():
|
||||||
if self.field_keyword(k):
|
if field_keyword(k):
|
||||||
if isinstance(v, list):
|
if isinstance(v, list):
|
||||||
d[k] = "###".join(v)
|
d[k] = "###".join(v)
|
||||||
else:
|
else:
|
||||||
@ -534,9 +546,15 @@ class InfinityConnection(DocStoreConnection):
|
|||||||
table_instance = db_instance.get_table(table_name)
|
table_instance = db_instance.get_table(table_name)
|
||||||
#if "exists" in condition:
|
#if "exists" in condition:
|
||||||
# del condition["exists"]
|
# del condition["exists"]
|
||||||
|
|
||||||
|
clmns = {}
|
||||||
|
if table_instance:
|
||||||
|
for n, ty, de, _ in table_instance.show_columns().rows():
|
||||||
|
clmns[n] = (ty, de)
|
||||||
filter = equivalent_condition_to_str(condition, table_instance)
|
filter = equivalent_condition_to_str(condition, table_instance)
|
||||||
|
removeValue = {}
|
||||||
for k, v in list(newValue.items()):
|
for k, v in list(newValue.items()):
|
||||||
if self.field_keyword(k):
|
if field_keyword(k):
|
||||||
if isinstance(v, list):
|
if isinstance(v, list):
|
||||||
newValue[k] = "###".join(v)
|
newValue[k] = "###".join(v)
|
||||||
else:
|
else:
|
||||||
@ -554,13 +572,42 @@ class InfinityConnection(DocStoreConnection):
|
|||||||
assert isinstance(v, list)
|
assert isinstance(v, list)
|
||||||
newValue[k] = "_".join(f"{num:08x}" for num in v)
|
newValue[k] = "_".join(f"{num:08x}" for num in v)
|
||||||
elif k == "remove":
|
elif k == "remove":
|
||||||
del newValue[k]
|
if isinstance(v, str):
|
||||||
if v in [PAGERANK_FLD]:
|
assert v in clmns, f"'{v}' should be in '{clmns}'."
|
||||||
newValue[v] = 0
|
ty, de = clmns[v]
|
||||||
|
if ty.lower().find("cha"):
|
||||||
|
if not de:
|
||||||
|
de = ""
|
||||||
|
newValue[v] = de
|
||||||
|
else:
|
||||||
|
for kk, vv in v.items():
|
||||||
|
removeValue[kk] = vv
|
||||||
|
del newValue[k]
|
||||||
else:
|
else:
|
||||||
newValue[k] = v
|
newValue[k] = v
|
||||||
|
|
||||||
|
remove_opt = {} # "[k,new_value]": [id_to_update, ...]
|
||||||
|
if removeValue:
|
||||||
|
col_to_remove = list(removeValue.keys())
|
||||||
|
row_to_opt = table_instance.output(col_to_remove + ['id']).filter(filter).to_df()
|
||||||
|
logger.debug(f"INFINITY search table {str(table_name)}, filter {filter}, result: {str(row_to_opt[0])}")
|
||||||
|
row_to_opt = self.getFields(row_to_opt, col_to_remove)
|
||||||
|
for id, old_v in row_to_opt.items():
|
||||||
|
for k, remove_v in removeValue.items():
|
||||||
|
if remove_v in old_v[k]:
|
||||||
|
new_v = old_v[k].copy()
|
||||||
|
new_v.remove(remove_v)
|
||||||
|
kv_key = json.dumps([k, new_v])
|
||||||
|
if kv_key not in remove_opt:
|
||||||
|
remove_opt[kv_key] = [id]
|
||||||
|
else:
|
||||||
|
remove_opt[kv_key].append(id)
|
||||||
|
|
||||||
logger.debug(f"INFINITY update table {table_name}, filter {filter}, newValue {newValue}.")
|
logger.debug(f"INFINITY update table {table_name}, filter {filter}, newValue {newValue}.")
|
||||||
|
for update_kv, ids in remove_opt.items():
|
||||||
|
k, v = json.loads(update_kv)
|
||||||
|
table_instance.update(filter + " AND id in ({0})".format(",".join([f"'{id}'" for id in ids])), {k:"###".join(v)})
|
||||||
|
|
||||||
table_instance.update(filter, newValue)
|
table_instance.update(filter, newValue)
|
||||||
self.connPool.release_conn(inf_conn)
|
self.connPool.release_conn(inf_conn)
|
||||||
return True
|
return True
|
||||||
@ -613,7 +660,7 @@ class InfinityConnection(DocStoreConnection):
|
|||||||
|
|
||||||
for column in res2.columns:
|
for column in res2.columns:
|
||||||
k = column.lower()
|
k = column.lower()
|
||||||
if self.field_keyword(k):
|
if field_keyword(k):
|
||||||
res2[column] = res2[column].apply(lambda v:[kwd for kwd in v.split("###") if kwd])
|
res2[column] = res2[column].apply(lambda v:[kwd for kwd in v.split("###") if kwd])
|
||||||
elif k == "position_int":
|
elif k == "position_int":
|
||||||
def to_position_int(v):
|
def to_position_int(v):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user