diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 1db9b690ab..57e4f716fd 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -61,6 +61,9 @@ class GraphEngineThreadPool(ThreadPoolExecutor): return super().submit(fn, *args, **kwargs) + def task_done_callback(self, future): + self.submit_count -= 1 + def check_is_full(self) -> None: print(f"submit_count: {self.submit_count}, max_submit_count: {self.max_submit_count}") if self.submit_count > self.max_submit_count: @@ -426,20 +429,22 @@ class GraphEngine: ): continue - futures.append( - self.thread_pool.submit( - self._run_parallel_node, - **{ - "flask_app": current_app._get_current_object(), # type: ignore[attr-defined] - "q": q, - "parallel_id": parallel_id, - "parallel_start_node_id": edge.target_node_id, - "parent_parallel_id": in_parallel_id, - "parent_parallel_start_node_id": parallel_start_node_id, - }, - ) + future = self.thread_pool.submit( + self._run_parallel_node, + **{ + "flask_app": current_app._get_current_object(), # type: ignore[attr-defined] + "q": q, + "parallel_id": parallel_id, + "parallel_start_node_id": edge.target_node_id, + "parent_parallel_id": in_parallel_id, + "parent_parallel_start_node_id": parallel_start_node_id, + }, ) + future.add_done_callback(self.thread_pool.task_done_callback) + + futures.append(future) + succeeded_count = 0 while True: try: