From 2980e31ddf536f1b73138ec457e1be1a7d1f9007 Mon Sep 17 00:00:00 2001 From: takatost Date: Tue, 13 Aug 2024 17:11:19 +0800 Subject: [PATCH] fix issues when merging from main --- .../task_pipeline/workflow_cycle_manage.py | 1 - .../workflow/nodes/iterable_node_mixin.py | 14 ------------ api/core/workflow/nodes/node_mapping.py | 4 +++- .../nodes/variable_assigner/__init__.py | 12 +++++----- api/services/workflow_service.py | 22 ------------------- 5 files changed, 9 insertions(+), 44 deletions(-) delete mode 100644 api/core/workflow/nodes/iterable_node_mixin.py diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index ce7653e884..ad3aa85b7f 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -43,7 +43,6 @@ from models.workflow import ( WorkflowRunStatus, WorkflowRunTriggeredFrom, ) -from services.workflow_service import WorkflowService class WorkflowCycleManage: diff --git a/api/core/workflow/nodes/iterable_node_mixin.py b/api/core/workflow/nodes/iterable_node_mixin.py deleted file mode 100644 index e45ba7df60..0000000000 --- a/api/core/workflow/nodes/iterable_node_mixin.py +++ /dev/null @@ -1,14 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Any - -from core.workflow.utils.condition.entities import Condition - - -class IterableNodeMixin(ABC): - @classmethod - @abstractmethod - def get_conditions(cls, node_config: dict[str, Any]) -> list[Condition]: - """ - Get conditions. - """ - raise NotImplementedError diff --git a/api/core/workflow/nodes/node_mapping.py b/api/core/workflow/nodes/node_mapping.py index df1eb98989..87e4d4be18 100644 --- a/api/core/workflow/nodes/node_mapping.py +++ b/api/core/workflow/nodes/node_mapping.py @@ -13,6 +13,7 @@ from core.workflow.nodes.start.start_node import StartNode from core.workflow.nodes.template_transform.template_transform_node import TemplateTransformNode from core.workflow.nodes.tool.tool_node import ToolNode from core.workflow.nodes.variable_aggregator.variable_aggregator_node import VariableAggregatorNode +from core.workflow.nodes.variable_assigner import VariableAssignerNode node_classes = { NodeType.START: StartNode, @@ -29,5 +30,6 @@ node_classes = { NodeType.VARIABLE_AGGREGATOR: VariableAggregatorNode, NodeType.VARIABLE_ASSIGNER: VariableAggregatorNode, # original name of VARIABLE_AGGREGATOR NodeType.ITERATION: IterationNode, - NodeType.PARAMETER_EXTRACTOR: ParameterExtractorNode + NodeType.PARAMETER_EXTRACTOR: ParameterExtractorNode, + NodeType.CONVERSATION_VARIABLE_ASSIGNER: VariableAssignerNode, } diff --git a/api/core/workflow/nodes/variable_assigner/__init__.py b/api/core/workflow/nodes/variable_assigner/__init__.py index 552cc367f2..63fe6cb0b8 100644 --- a/api/core/workflow/nodes/variable_assigner/__init__.py +++ b/api/core/workflow/nodes/variable_assigner/__init__.py @@ -36,23 +36,23 @@ class VariableAssignerNode(BaseNode): _node_data_cls: type[BaseNodeData] = VariableAssignerData _node_type: NodeType = NodeType.CONVERSATION_VARIABLE_ASSIGNER - def _run(self, variable_pool: VariablePool) -> NodeRunResult: + def _run(self) -> NodeRunResult: data = cast(VariableAssignerData, self.node_data) # Should be String, Number, Object, ArrayString, ArrayNumber, ArrayObject - original_variable = variable_pool.get(data.assigned_variable_selector) + original_variable = self.graph_runtime_state.variable_pool.get(data.assigned_variable_selector) if not isinstance(original_variable, Variable): raise VariableAssignerNodeError('assigned variable not found') match data.write_mode: case WriteMode.OVER_WRITE: - income_value = variable_pool.get(data.input_variable_selector) + income_value = self.graph_runtime_state.variable_pool.get(data.input_variable_selector) if not income_value: raise VariableAssignerNodeError('input value not found') updated_variable = original_variable.model_copy(update={'value': income_value.value}) case WriteMode.APPEND: - income_value = variable_pool.get(data.input_variable_selector) + income_value = self.graph_runtime_state.variable_pool.get(data.input_variable_selector) if not income_value: raise VariableAssignerNodeError('input value not found') updated_value = original_variable.value + [income_value.value] @@ -66,11 +66,11 @@ class VariableAssignerNode(BaseNode): raise VariableAssignerNodeError(f'unsupported write mode: {data.write_mode}') # Over write the variable. - variable_pool.add(data.assigned_variable_selector, updated_variable) + self.graph_runtime_state.variable_pool.add(data.assigned_variable_selector, updated_variable) # Update conversation variable. # TODO: Find a better way to use the database. - conversation_id = variable_pool.get(['sys', 'conversation_id']) + conversation_id = self.graph_runtime_state.variable_pool.get(['sys', 'conversation_id']) if not conversation_id: raise VariableAssignerNodeError('conversation_id not found') update_conversation_variable(conversation_id=conversation_id.text, variable=updated_variable) diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 42ae0f2cd3..98b84cb98a 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -336,25 +336,3 @@ class WorkflowService: ) else: raise ValueError(f"Invalid app mode: {app_model.mode}") - - @classmethod - def get_elapsed_time(cls, workflow_run_id: str) -> float: - """ - Get elapsed time - """ - elapsed_time = 0.0 - - # fetch workflow node execution by workflow_run_id - workflow_nodes = ( - db.session.query(WorkflowNodeExecution) - .filter(WorkflowNodeExecution.workflow_run_id == workflow_run_id) - .order_by(WorkflowNodeExecution.created_at.asc()) - .all() - ) - if not workflow_nodes: - return elapsed_time - - for node in workflow_nodes: - elapsed_time += node.elapsed_time - - return elapsed_time