diff --git a/graphrag/general/index.py b/graphrag/general/index.py index 6335bdf3..d4f5705f 100644 --- a/graphrag/general/index.py +++ b/graphrag/general/index.py @@ -72,12 +72,9 @@ async def run_graphrag( if not subgraph: return - graphrag_task_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value=doc_id, timeout=3600) - while True: - if graphrag_task_lock.acquire(): - break - callback(msg=f"merge_subgraph {doc_id} is waiting graphrag_task_lock") - await trio.sleep(20) + graphrag_task_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value=doc_id, timeout=1200) + await graphrag_task_lock.spin_acquire() + callback(msg=f"run_graphrag {doc_id} graphrag_task_lock acquired") try: subgraph_nodes = set(subgraph.nodes()) @@ -95,6 +92,8 @@ async def run_graphrag( return if with_resolution: + await graphrag_task_lock.spin_acquire() + callback(msg=f"run_graphrag {doc_id} graphrag_task_lock acquired") await resolve_entities( new_graph, subgraph_nodes, @@ -106,6 +105,8 @@ async def run_graphrag( callback, ) if with_community: + await graphrag_task_lock.spin_acquire() + callback(msg=f"run_graphrag {doc_id} graphrag_task_lock acquired") await extract_community( new_graph, tenant_id, diff --git a/graphrag/utils.py b/graphrag/utils.py index 79063871..13e62ec0 100644 --- a/graphrag/utils.py +++ b/graphrag/utils.py @@ -459,7 +459,10 @@ async def set_graph(tenant_id: str, kb_id: str, embd_mdl, graph: nx.Graph, chang node_attrs = graph.nodes[node] nursery.start_soon(lambda: graph_node_to_chunk(kb_id, embd_mdl, node, node_attrs, chunks)) for from_node, to_node in change.added_updated_edges: - edge_attrs = graph.edges[from_node, to_node] + edge_attrs = graph.get_edge_data(from_node, to_node) + if not edge_attrs: + # added_updated_edges could record a non-existing edge if both from_node and to_node participate in nodes merging. + continue nursery.start_soon(lambda: graph_edge_to_chunk(kb_id, embd_mdl, from_node, to_node, edge_attrs, chunks)) now = trio.current_time() if callback: diff --git a/rag/utils/redis_conn.py b/rag/utils/redis_conn.py index bbc22eca..08f889f5 100644 --- a/rag/utils/redis_conn.py +++ b/rag/utils/redis_conn.py @@ -22,6 +22,7 @@ import valkey as redis from rag import settings from rag.utils import singleton from valkey.lock import Lock +import trio class RedisMsg: def __init__(self, consumer, queue_name, group_name, msg_id, message): @@ -317,5 +318,12 @@ class RedisDistributedLock: REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value) return self.lock.acquire(token=self.lock_value) + async def spin_acquire(self): + REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value) + while True: + if self.lock.acquire(token=self.lock_value): + break + await trio.sleep(10) + def release(self): REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value)