From 959793e83c60f48d79a5338c23b45696e62fbc2c Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Tue, 27 May 2025 11:16:29 +0800 Subject: [PATCH] Fix: task limiter issue. (#7873) ### What problem does this PR solve? #7869 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- rag/svr/task_executor.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 1067488aa..1c21274a4 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -103,6 +103,7 @@ MAX_CONCURRENT_MINIO = int(os.environ.get('MAX_CONCURRENT_MINIO', '10')) task_limiter = trio.CapacityLimiter(MAX_CONCURRENT_TASKS) chunk_limiter = trio.CapacityLimiter(MAX_CONCURRENT_CHUNK_BUILDERS) minio_limiter = trio.CapacityLimiter(MAX_CONCURRENT_MINIO) +kg_limiter = trio.CapacityLimiter(2) WORKER_HEARTBEAT_TIMEOUT = int(os.environ.get('WORKER_HEARTBEAT_TIMEOUT', '120')) stop_event = threading.Event() @@ -539,8 +540,6 @@ async def do_handle_task(task): chunks, token_count = await run_raptor(task, chat_model, embedding_model, vector_size, progress_callback) # Either using graphrag or Standard chunking methods elif task.get("task_type", "") == "graphrag": - global task_limiter - task_limiter = trio.CapacityLimiter(2) if not task_parser_config.get("graphrag", {}).get("use_graphrag", False): return graphrag_conf = task["kb_parser_config"].get("graphrag", {}) @@ -548,9 +547,9 @@ async def do_handle_task(task): chat_model = LLMBundle(task_tenant_id, LLMType.CHAT, llm_name=task_llm_id, lang=task_language) with_resolution = graphrag_conf.get("resolution", False) with_community = graphrag_conf.get("community", False) - await run_graphrag(task, task_language, with_resolution, with_community, chat_model, embedding_model, progress_callback) + async with kg_limiter: + await run_graphrag(task, task_language, with_resolution, with_community, chat_model, embedding_model, progress_callback) progress_callback(prog=1.0, msg="Knowledge Graph done ({:.2f}s)".format(timer() - start_ts)) - task_limiter = trio.CapacityLimiter(MAX_CONCURRENT_TASKS) return else: # Standard chunking methods