From 5d21cc3660b8d2f7140cf58d6bd3c406b846be00 Mon Sep 17 00:00:00 2001 From: S0b3Rr <94901095+S0b3Rr@users.noreply.github.com> Date: Mon, 19 May 2025 10:25:56 +0800 Subject: [PATCH] fix: Fix the problem that concurrent execution limit in task executor fails and causes OOM (issue#7580) (#7700) ### What problem does this PR solve? ## Cause of the bug: During the execution process, due to improper use of trio CapacityLimiter, the configuration parameter MAX_CONCURRENT_TASKS is invalid, causing the executor to take out a large number of tasks from the Redis queue at one time. This behavior will cause the task executor to occupy too much memory and be killed by the OS when a large number of tasks exist at the same time. As a result, all executing tasks are suspended. ## Fix: Added the task_manager method to the entry of /rag/svr/task_executor.py to make CapacityLimiter effective. Deleted the invalid async with statement. ## Fix result: After testing, the task executor execution meets expectations, that is: concurrent execution of up to $MAX_CONCURRENT_TASKS tasks. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [ ] New Feature (non-breaking change which adds functionality) - [ ] Documentation Update - [ ] Refactoring - [ ] Performance Improvement - [ ] Other (please describe): --- rag/svr/task_executor.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 51de6950b..6448c4e81 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -713,7 +713,10 @@ def recover_pending_tasks(): redis_lock.release() stop_event.wait(60) - +async def task_manager(): + global task_limiter + async with task_limiter: + await handle_task() async def main(): @@ -742,8 +745,8 @@ async def main(): async with trio.open_nursery() as nursery: nursery.start_soon(report_status) while not stop_event.is_set(): - async with task_limiter: - nursery.start_soon(handle_task) + nursery.start_soon(task_manager) + await trio.sleep(0.1) logging.error("BUG!!! You should not reach here!!!") if __name__ == "__main__":