From 3b45b87d02304b5d29f514fa976b45a34794c698 Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Mon, 26 May 2025 14:22:15 +0800 Subject: [PATCH] fix(api): Fix input variable handling for `Start` node. --- .../workflow_draft_variable_service.py | 70 +++++-------------- 1 file changed, 18 insertions(+), 52 deletions(-) diff --git a/api/services/workflow_draft_variable_service.py b/api/services/workflow_draft_variable_service.py index a37efd9000..504d693742 100644 --- a/api/services/workflow_draft_variable_service.py +++ b/api/services/workflow_draft_variable_service.py @@ -96,29 +96,25 @@ class WorkflowDraftVariableService: if not draft_variables: return - # We may use SQLAlchemy ORM operation here. However, considering the fact that: + # Although we could use SQLAlchemy ORM operations here, we choose not to for several reasons: # - # 1. The variable saving process writes multiple rows into one table (`workflow_draft_variables`). - # Use batch insertion may increase performance dramatically. - # 2. If we use ORM operation, we need to either: + # 1. The variable saving process involves writing multiple rows to the + # `workflow_draft_variables` table. Batch insertion significantly improves performance. + # 2. Using the ORM would require either: # - # a. Check the existence for each variable before insertion. - # b. Try insertion first, then do update if insertion fails due to unique index violation. + # a. Checking for the existence of each variable before insertion, + # resulting in 2n SQL statements for n variables and potential concurrency issues. + # b. Attempting insertion first, then updating if a unique index violation occurs, + # which still results in n to 2n SQL statements. # - # Neither of the above is satisfactory. + # Both approaches are inefficient and suboptimal. + # 3. We do not need to retrieve the results of the SQL execution or populate ORM + # model instances with the returned values. + # 4. Batch insertion with `ON CONFLICT DO UPDATE` allows us to insert or update all + # variables in a single SQL statement, avoiding the issues above. # - # - For implementation "a", we need to issue `2n` sqls for `n` variables in output. - # Besides, it's still suffer from concurrency issues. - # - For implementation "b", we need to issue `n` - `2n` sqls (depending on the existence of - # specific variable), which is lesser than plan "a" but still far from ideal. - # - # 3. We do not need the value of SQL execution, nor do we need populate those values back to ORM model - # instances. - # 4. Batch insertion can be combined with `ON CONFLICT DO UPDATE`, allows us to insert or update - # all variables in one SQL statement, and avoid all problems above. - # - # Given reasons above, we use query builder instead of using ORM layer, - # and rely on dialect specific insert operations. + # For these reasons, we use the SQLAlchemy query builder and rely on dialect-specific + # insert operations instead of the ORM layer. if node_type == NodeType.CODE: # Clear existing variable for code node. self._session.query(WorkflowDraftVariable).filter( @@ -267,26 +263,6 @@ def should_save_output_variables_for_draft( return True -# def should_save_output_variables_for_draft(invoke_from: InvokeFrom, node_exec: WorkflowNodeExecution) -> bool: -# # Only save output variables for debugging execution of workflow. -# if invoke_from != InvokeFrom.DEBUGGER: -# return False -# exec_metadata = node_exec.execution_metadata_dict -# if exec_metadata is None: -# # No execution metadata, assume the node is not in loop or iteration. -# return True -# -# # Currently we do not save output variables for nodes inside loop or iteration. -# loop_id = exec_metadata.get(NodeRunMetadataKey.LOOP_ID) -# if loop_id is not None: -# return False -# iteration_id = exec_metadata.get(NodeRunMetadataKey.ITERATION_ID) -# if iteration_id is not None: -# return False -# return True -# - - class _DraftVariableBuilder: _app_id: str _draft_vars: list[WorkflowDraftVariable] @@ -334,7 +310,8 @@ class _DraftVariableBuilder: original_node_id = node_id for name, value in output.items(): value_seg = variable_factory.build_segment(value) - if is_dummy_output_variable(name): + node_id, name = self._normalize_variable_for_start_node(node_id, name) + if node_id != SYSTEM_VARIABLE_NODE_ID: self._draft_vars.append( WorkflowDraftVariable.new_node_variable( app_id=self._app_id, @@ -356,20 +333,9 @@ class _DraftVariableBuilder: ) @staticmethod - def _normalize_variable_for_start_node(node_type: NodeType, node_id: str, name: str): - if node_type != NodeType.START: - return node_id, name - - # TODO(QuantumGhost): need special handling for dummy output variable in - # `Start` node. + def _normalize_variable_for_start_node(node_id: str, name: str) -> tuple[str, str]: if not name.startswith(f"{SYSTEM_VARIABLE_NODE_ID}."): return node_id, name - _logger.debug( - "Normalizing variable: node_type=%s, node_id=%s, name=%s", - node_type, - node_id, - name, - ) node_id, name_ = name.split(".", maxsplit=1) return node_id, name_