diff --git a/api/.env.example b/api/.env.example index 1ff6b3be8b..74f83aa06c 100644 --- a/api/.env.example +++ b/api/.env.example @@ -433,3 +433,5 @@ RESET_PASSWORD_TOKEN_EXPIRY_MINUTES=5 CREATE_TIDB_SERVICE_JOB_ENABLED=false +# Maximum number of submitted thread count in a ThreadPool for parallel node execution +MAX_SUBMIT_COUNT=100 diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index f1cb3efda7..e79401bdfd 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -439,6 +439,17 @@ class WorkflowConfig(BaseSettings): ) +class WorkflowNodeExecutionConfig(BaseSettings): + """ + Configuration for workflow node execution + """ + + MAX_SUBMIT_COUNT: PositiveInt = Field( + description="Maximum number of submitted thread count in a ThreadPool for parallel node execution", + default=100, + ) + + class AuthConfig(BaseSettings): """ Configuration for authentication and OAuth @@ -775,6 +786,7 @@ class FeatureConfig( ToolConfig, UpdateConfig, WorkflowConfig, + WorkflowNodeExecutionConfig, WorkspaceConfig, LoginConfig, # hosted services config diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index e03d4a7194..034b4bd399 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -9,6 +9,7 @@ from typing import Any, Optional, cast from flask import Flask, current_app +from configs import dify_config from core.app.apps.base_app_queue_manager import GenerateTaskStoppedError from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult @@ -52,7 +53,12 @@ logger = logging.getLogger(__name__) class GraphEngineThreadPool(ThreadPoolExecutor): def __init__( - self, max_workers=None, thread_name_prefix="", initializer=None, initargs=(), max_submit_count=100 + self, + max_workers=None, + thread_name_prefix="", + initializer=None, + initargs=(), + max_submit_count=dify_config.MAX_SUBMIT_COUNT, ) -> None: super().__init__(max_workers, thread_name_prefix, initializer, initargs) self.max_submit_count = max_submit_count @@ -92,7 +98,7 @@ class GraphEngine: max_execution_time: int, thread_pool_id: Optional[str] = None, ) -> None: - thread_pool_max_submit_count = 100 + thread_pool_max_submit_count = dify_config.MAX_SUBMIT_COUNT thread_pool_max_workers = 10 # init thread pool diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index cbabe7a3c5..d935228c16 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -163,7 +163,9 @@ class IterationNode(BaseNode[IterationNodeData]): if self.node_data.is_parallel: futures: list[Future] = [] q: Queue = Queue() - thread_pool = GraphEngineThreadPool(max_workers=self.node_data.parallel_nums, max_submit_count=100) + thread_pool = GraphEngineThreadPool( + max_workers=self.node_data.parallel_nums, max_submit_count=dify_config.MAX_SUBMIT_COUNT + ) for index, item in enumerate(iterator_list_value): future: Future = thread_pool.submit( self._run_single_iter_parallel,