Fix set_graph on non-existing edge (#6777)

### What problem does this PR solve?

Fix set_graph on non-existing edge

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
Zhichang Yu 2025-04-03 11:09:04 +08:00 committed by GitHub
parent 5b5558300a
commit fdc410e743
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 19 additions and 7 deletions

View File

@ -72,12 +72,9 @@ async def run_graphrag(
if not subgraph: if not subgraph:
return return
graphrag_task_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value=doc_id, timeout=3600) graphrag_task_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value=doc_id, timeout=1200)
while True: await graphrag_task_lock.spin_acquire()
if graphrag_task_lock.acquire(): callback(msg=f"run_graphrag {doc_id} graphrag_task_lock acquired")
break
callback(msg=f"merge_subgraph {doc_id} is waiting graphrag_task_lock")
await trio.sleep(20)
try: try:
subgraph_nodes = set(subgraph.nodes()) subgraph_nodes = set(subgraph.nodes())
@ -95,6 +92,8 @@ async def run_graphrag(
return return
if with_resolution: if with_resolution:
await graphrag_task_lock.spin_acquire()
callback(msg=f"run_graphrag {doc_id} graphrag_task_lock acquired")
await resolve_entities( await resolve_entities(
new_graph, new_graph,
subgraph_nodes, subgraph_nodes,
@ -106,6 +105,8 @@ async def run_graphrag(
callback, callback,
) )
if with_community: if with_community:
await graphrag_task_lock.spin_acquire()
callback(msg=f"run_graphrag {doc_id} graphrag_task_lock acquired")
await extract_community( await extract_community(
new_graph, new_graph,
tenant_id, tenant_id,

View File

@ -459,7 +459,10 @@ async def set_graph(tenant_id: str, kb_id: str, embd_mdl, graph: nx.Graph, chang
node_attrs = graph.nodes[node] node_attrs = graph.nodes[node]
nursery.start_soon(lambda: graph_node_to_chunk(kb_id, embd_mdl, node, node_attrs, chunks)) nursery.start_soon(lambda: graph_node_to_chunk(kb_id, embd_mdl, node, node_attrs, chunks))
for from_node, to_node in change.added_updated_edges: for from_node, to_node in change.added_updated_edges:
edge_attrs = graph.edges[from_node, to_node] edge_attrs = graph.get_edge_data(from_node, to_node)
if not edge_attrs:
# added_updated_edges could record a non-existing edge if both from_node and to_node participate in nodes merging.
continue
nursery.start_soon(lambda: graph_edge_to_chunk(kb_id, embd_mdl, from_node, to_node, edge_attrs, chunks)) nursery.start_soon(lambda: graph_edge_to_chunk(kb_id, embd_mdl, from_node, to_node, edge_attrs, chunks))
now = trio.current_time() now = trio.current_time()
if callback: if callback:

View File

@ -22,6 +22,7 @@ import valkey as redis
from rag import settings from rag import settings
from rag.utils import singleton from rag.utils import singleton
from valkey.lock import Lock from valkey.lock import Lock
import trio
class RedisMsg: class RedisMsg:
def __init__(self, consumer, queue_name, group_name, msg_id, message): def __init__(self, consumer, queue_name, group_name, msg_id, message):
@ -317,5 +318,12 @@ class RedisDistributedLock:
REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value) REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value)
return self.lock.acquire(token=self.lock_value) return self.lock.acquire(token=self.lock_value)
async def spin_acquire(self):
REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value)
while True:
if self.lock.acquire(token=self.lock_value):
break
await trio.sleep(10)
def release(self): def release(self):
REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value) REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value)