From ffd2f61dd940d64974e457a6327cecb7bb602922 Mon Sep 17 00:00:00 2001 From: takatost Date: Thu, 19 Sep 2024 15:34:56 +0800 Subject: [PATCH] fix: thread_pool submit count in parallel workflow not releasing (#8549) --- .../workflow/graph_engine/graph_engine.py | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 1db9b690ab..57e4f716fd 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -61,6 +61,9 @@ class GraphEngineThreadPool(ThreadPoolExecutor): return super().submit(fn, *args, **kwargs) + def task_done_callback(self, future): + self.submit_count -= 1 + def check_is_full(self) -> None: print(f"submit_count: {self.submit_count}, max_submit_count: {self.max_submit_count}") if self.submit_count > self.max_submit_count: @@ -426,20 +429,22 @@ class GraphEngine: ): continue - futures.append( - self.thread_pool.submit( - self._run_parallel_node, - **{ - "flask_app": current_app._get_current_object(), # type: ignore[attr-defined] - "q": q, - "parallel_id": parallel_id, - "parallel_start_node_id": edge.target_node_id, - "parent_parallel_id": in_parallel_id, - "parent_parallel_start_node_id": parallel_start_node_id, - }, - ) + future = self.thread_pool.submit( + self._run_parallel_node, + **{ + "flask_app": current_app._get_current_object(), # type: ignore[attr-defined] + "q": q, + "parallel_id": parallel_id, + "parallel_start_node_id": edge.target_node_id, + "parent_parallel_id": in_parallel_id, + "parent_parallel_start_node_id": parallel_start_node_id, + }, ) + future.add_done_callback(self.thread_pool.task_done_callback) + + futures.append(future) + succeeded_count = 0 while True: try: