diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py index 6df25b20e3..8f8e429f0c 100644 --- a/api/core/app/apps/common/workflow_response_converter.py +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -147,12 +147,12 @@ class WorkflowResponseConverter: ) -> Optional[NodeStartStreamResponse]: if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}: return None - if not workflow_node_execution.workflow_run_id: + if not workflow_node_execution.workflow_execution_id: return None response = NodeStartStreamResponse( task_id=task_id, - workflow_run_id=workflow_node_execution.workflow_run_id, + workflow_run_id=workflow_node_execution.workflow_execution_id, data=NodeStartStreamResponse.Data( id=workflow_node_execution.id, node_id=workflow_node_execution.node_id, @@ -197,14 +197,14 @@ class WorkflowResponseConverter: ) -> Optional[NodeFinishStreamResponse]: if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}: return None - if not workflow_node_execution.workflow_run_id: + if not workflow_node_execution.workflow_execution_id: return None if not workflow_node_execution.finished_at: return None return NodeFinishStreamResponse( task_id=task_id, - workflow_run_id=workflow_node_execution.workflow_run_id, + workflow_run_id=workflow_node_execution.workflow_execution_id, data=NodeFinishStreamResponse.Data( id=workflow_node_execution.id, node_id=workflow_node_execution.node_id, @@ -240,14 +240,14 @@ class WorkflowResponseConverter: ) -> Optional[Union[NodeRetryStreamResponse, NodeFinishStreamResponse]]: if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}: return None - if not workflow_node_execution.workflow_run_id: + if not workflow_node_execution.workflow_execution_id: return None if not workflow_node_execution.finished_at: return None return NodeRetryStreamResponse( task_id=task_id, - workflow_run_id=workflow_node_execution.workflow_run_id, + workflow_run_id=workflow_node_execution.workflow_execution_id, data=NodeRetryStreamResponse.Data( id=workflow_node_execution.id, node_id=workflow_node_execution.node_id, diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 613bd8e8fc..facc24b4ca 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -30,7 +30,7 @@ from core.app.entities.queue_entities import ( QueueWorkflowSucceededEvent, ) from core.workflow.entities.variable_pool import VariablePool -from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey from core.workflow.graph_engine.entities.event import ( AgentLogEvent, GraphEngineEvent, @@ -295,7 +295,7 @@ class WorkflowBasedAppRunner(AppRunner): inputs: Mapping[str, Any] | None = {} process_data: Mapping[str, Any] | None = {} outputs: Mapping[str, Any] | None = {} - execution_metadata: Mapping[NodeRunMetadataKey, Any] | None = {} + execution_metadata: Mapping[WorkflowNodeExecutionMetadataKey, Any] | None = {} if node_run_result: inputs = node_run_result.inputs process_data = node_run_result.process_data diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index e4ff123134..bc8573013a 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -7,7 +7,7 @@ from pydantic import BaseModel from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk from core.workflow.entities.node_entities import AgentNodeStrategyInit -from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.nodes import NodeType from core.workflow.nodes.base import BaseNodeData @@ -413,7 +413,7 @@ class QueueNodeSucceededEvent(AppQueueEvent): inputs: Optional[Mapping[str, Any]] = None process_data: Optional[Mapping[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None - execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None + execution_metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None error: Optional[str] = None """single iteration duration map""" @@ -447,7 +447,7 @@ class QueueNodeRetryEvent(QueueNodeStartedEvent): inputs: Optional[Mapping[str, Any]] = None process_data: Optional[Mapping[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None - execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None + execution_metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None error: str retry_index: int # retry index @@ -481,7 +481,7 @@ class QueueNodeInIterationFailedEvent(AppQueueEvent): inputs: Optional[Mapping[str, Any]] = None process_data: Optional[Mapping[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None - execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None + execution_metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None error: str @@ -514,7 +514,7 @@ class QueueNodeInLoopFailedEvent(AppQueueEvent): inputs: Optional[Mapping[str, Any]] = None process_data: Optional[Mapping[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None - execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None + execution_metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None error: str @@ -547,7 +547,7 @@ class QueueNodeExceptionEvent(AppQueueEvent): inputs: Optional[Mapping[str, Any]] = None process_data: Optional[Mapping[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None - execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None + execution_metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None error: str @@ -580,7 +580,7 @@ class QueueNodeFailedEvent(AppQueueEvent): inputs: Optional[Mapping[str, Any]] = None process_data: Optional[Mapping[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None - execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None + execution_metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None error: str diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 39b530cdfe..70748b085d 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -7,7 +7,7 @@ from pydantic import BaseModel, ConfigDict from core.model_runtime.entities.llm_entities import LLMResult from core.model_runtime.utils.encoders import jsonable_encoder from core.workflow.entities.node_entities import AgentNodeStrategyInit -from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus class TaskState(BaseModel): @@ -305,7 +305,7 @@ class NodeFinishStreamResponse(StreamResponse): status: str error: Optional[str] = None elapsed_time: float - execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None + execution_metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None created_at: int finished_at: int files: Optional[Sequence[Mapping[str, Any]]] = [] @@ -374,7 +374,7 @@ class NodeRetryStreamResponse(StreamResponse): status: str error: Optional[str] = None elapsed_time: float - execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None + execution_metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None created_at: int finished_at: int files: Optional[Sequence[Mapping[str, Any]]] = [] diff --git a/api/core/ops/langsmith_trace/langsmith_trace.py b/api/core/ops/langsmith_trace/langsmith_trace.py index 43b866e1af..e4183ea1e2 100644 --- a/api/core/ops/langsmith_trace/langsmith_trace.py +++ b/api/core/ops/langsmith_trace/langsmith_trace.py @@ -28,7 +28,7 @@ from core.ops.langsmith_trace.entities.langsmith_trace_entity import ( ) from core.ops.utils import filter_none_values, generate_dotted_order from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository -from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey from core.workflow.nodes.enums import NodeType from extensions.ext_database import db from models import Account, App, EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom @@ -185,7 +185,7 @@ class LangSmithDataTrace(BaseTraceInstance): finished_at = created_at + timedelta(seconds=elapsed_time) execution_metadata = node_execution.metadata if node_execution.metadata else {} - node_total_tokens = execution_metadata.get(NodeRunMetadataKey.TOTAL_TOKENS) or 0 + node_total_tokens = execution_metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS) or 0 metadata = {str(key): value for key, value in execution_metadata.items()} metadata.update( { diff --git a/api/core/ops/opik_trace/opik_trace.py b/api/core/ops/opik_trace/opik_trace.py index 7d68dca831..f7a4464267 100644 --- a/api/core/ops/opik_trace/opik_trace.py +++ b/api/core/ops/opik_trace/opik_trace.py @@ -22,7 +22,7 @@ from core.ops.entities.trace_entity import ( WorkflowTraceInfo, ) from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository -from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey from core.workflow.nodes.enums import NodeType from extensions.ext_database import db from models import Account, App, EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom @@ -246,7 +246,7 @@ class OpikDataTrace(BaseTraceInstance): parent_span_id = trace_info.workflow_app_log_id or trace_info.workflow_run_id if not total_tokens: - total_tokens = execution_metadata.get(NodeRunMetadataKey.TOTAL_TOKENS) or 0 + total_tokens = execution_metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS) or 0 span_data = { "trace_id": opik_trace_id, diff --git a/api/core/ops/weave_trace/weave_trace.py b/api/core/ops/weave_trace/weave_trace.py index d1bd97176e..b12380be47 100644 --- a/api/core/ops/weave_trace/weave_trace.py +++ b/api/core/ops/weave_trace/weave_trace.py @@ -23,7 +23,7 @@ from core.ops.entities.trace_entity import ( ) from core.ops.weave_trace.entities.weave_trace_entity import WeaveTraceModel from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository -from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey from core.workflow.nodes.enums import NodeType from extensions.ext_database import db from models import Account, App, EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom @@ -179,7 +179,7 @@ class WeaveDataTrace(BaseTraceInstance): finished_at = created_at + timedelta(seconds=elapsed_time) execution_metadata = node_execution.metadata if node_execution.metadata else {} - node_total_tokens = execution_metadata.get(NodeRunMetadataKey.TOTAL_TOKENS) or 0 + node_total_tokens = execution_metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS) or 0 attributes = {str(k): v for k, v in execution_metadata.items()} attributes.update( { diff --git a/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py b/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py index ee4465db5d..e8a84d58ad 100644 --- a/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py +++ b/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py @@ -14,7 +14,7 @@ from sqlalchemy.orm import sessionmaker from core.model_runtime.utils.encoders import jsonable_encoder from core.workflow.entities.workflow_node_execution import ( NodeExecution, - NodeRunMetadataKey, + WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus, ) from core.workflow.nodes.enums import NodeType @@ -102,7 +102,7 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository) inputs = db_model.inputs_dict process_data = db_model.process_data_dict outputs = db_model.outputs_dict - metadata = {NodeRunMetadataKey(k): v for k, v in db_model.execution_metadata_dict.items()} + metadata = {WorkflowNodeExecutionMetadataKey(k): v for k, v in db_model.execution_metadata_dict.items()} # Convert status to domain enum status = WorkflowNodeExecutionStatus(db_model.status) @@ -111,7 +111,7 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository) id=db_model.id, node_execution_id=db_model.node_execution_id, workflow_id=db_model.workflow_id, - workflow_run_id=db_model.workflow_run_id, + workflow_execution_id=db_model.workflow_run_id, index=db_model.index, predecessor_node_id=db_model.predecessor_node_id, node_id=db_model.node_id, @@ -153,7 +153,7 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository) db_model.app_id = self._app_id db_model.workflow_id = domain_model.workflow_id db_model.triggered_from = self._triggered_from - db_model.workflow_run_id = domain_model.workflow_run_id + db_model.workflow_run_id = domain_model.workflow_execution_id db_model.index = domain_model.index db_model.predecessor_node_id = domain_model.predecessor_node_id db_model.node_execution_id = domain_model.node_execution_id diff --git a/api/core/workflow/entities/node_entities.py b/api/core/workflow/entities/node_entities.py index 6d01028ffc..687ec8e47c 100644 --- a/api/core/workflow/entities/node_entities.py +++ b/api/core/workflow/entities/node_entities.py @@ -4,7 +4,7 @@ from typing import Any, Optional from pydantic import BaseModel from core.model_runtime.entities.llm_entities import LLMUsage -from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus class NodeRunResult(BaseModel): @@ -17,7 +17,7 @@ class NodeRunResult(BaseModel): inputs: Optional[Mapping[str, Any]] = None # node inputs process_data: Optional[Mapping[str, Any]] = None # process data outputs: Optional[Mapping[str, Any]] = None # node outputs - metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None # node metadata + metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None # node metadata llm_usage: Optional[LLMUsage] = None # llm usage edge_source_handle: Optional[str] = None # source handle id of node with multiple branches diff --git a/api/core/workflow/entities/workflow_node_execution.py b/api/core/workflow/entities/workflow_node_execution.py index dccb6b1539..8e19fea43f 100644 --- a/api/core/workflow/entities/workflow_node_execution.py +++ b/api/core/workflow/entities/workflow_node_execution.py @@ -16,7 +16,7 @@ from pydantic import BaseModel, Field from core.workflow.nodes.enums import NodeType -class NodeRunMetadataKey(StrEnum): +class WorkflowNodeExecutionMetadataKey(StrEnum): """ Node Run Metadata Key. """ @@ -70,7 +70,7 @@ class NodeExecution(BaseModel): id: str # Unique identifier for this execution record node_execution_id: Optional[str] = None # Optional secondary ID for cross-referencing workflow_id: str # ID of the workflow this node belongs to - workflow_run_id: Optional[str] = None # ID of the specific workflow run (null for single-step debugging) + workflow_execution_id: Optional[str] = None # ID of the specific workflow run (null for single-step debugging) # Execution positioning and flow index: int # Sequence number for ordering in trace visualization @@ -90,7 +90,7 @@ class NodeExecution(BaseModel): elapsed_time: float = Field(default=0.0) # Time taken for execution in seconds # Additional metadata - metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None # Execution metadata (tokens, cost, etc.) + metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None # Execution metadata (tokens, cost, etc.) # Timing information created_at: datetime # When execution started @@ -101,7 +101,7 @@ class NodeExecution(BaseModel): inputs: Optional[Mapping[str, Any]] = None, process_data: Optional[Mapping[str, Any]] = None, outputs: Optional[Mapping[str, Any]] = None, - metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None, + metadata: Optional[Mapping[WorkflowNodeExecutionMetadataKey, Any]] = None, ) -> None: """ Update the model from mappings. diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 0d71a70971..3eb99fde81 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -16,7 +16,7 @@ from core.app.apps.base_app_queue_manager import GenerateTaskStoppedError from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.node_entities import AgentNodeStrategyInit, NodeRunResult from core.workflow.entities.variable_pool import VariablePool, VariableValue -from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from core.workflow.graph_engine.condition_handlers.condition_manager import ConditionManager from core.workflow.graph_engine.entities.event import ( BaseAgentEvent, @@ -760,10 +760,12 @@ class GraphEngine: and node_instance.node_data.error_strategy is ErrorStrategy.FAIL_BRANCH ): run_result.edge_source_handle = FailBranchSourceHandle.SUCCESS - if run_result.metadata and run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS): + if run_result.metadata and run_result.metadata.get( + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS + ): # plus state total_tokens self.graph_runtime_state.total_tokens += int( - run_result.metadata.get(NodeRunMetadataKey.TOTAL_TOKENS) # type: ignore[arg-type] + run_result.metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS) # type: ignore[arg-type] ) if run_result.llm_usage: @@ -786,13 +788,17 @@ class GraphEngine: if parallel_id and parallel_start_node_id: metadata_dict = dict(run_result.metadata) - metadata_dict[NodeRunMetadataKey.PARALLEL_ID] = parallel_id - metadata_dict[NodeRunMetadataKey.PARALLEL_START_NODE_ID] = parallel_start_node_id + metadata_dict[WorkflowNodeExecutionMetadataKey.PARALLEL_ID] = parallel_id + metadata_dict[WorkflowNodeExecutionMetadataKey.PARALLEL_START_NODE_ID] = ( + parallel_start_node_id + ) if parent_parallel_id and parent_parallel_start_node_id: - metadata_dict[NodeRunMetadataKey.PARENT_PARALLEL_ID] = parent_parallel_id - metadata_dict[NodeRunMetadataKey.PARENT_PARALLEL_START_NODE_ID] = ( - parent_parallel_start_node_id + metadata_dict[WorkflowNodeExecutionMetadataKey.PARENT_PARALLEL_ID] = ( + parent_parallel_id ) + metadata_dict[ + WorkflowNodeExecutionMetadataKey.PARENT_PARALLEL_START_NODE_ID + ] = parent_parallel_start_node_id run_result.metadata = metadata_dict yield NodeRunSucceededEvent( @@ -924,7 +930,7 @@ class GraphEngine: "error": error_result.error, "inputs": error_result.inputs, "metadata": { - NodeRunMetadataKey.ERROR_STRATEGY: node_instance.node_data.error_strategy, + WorkflowNodeExecutionMetadataKey.ERROR_STRATEGY: node_instance.node_data.error_strategy, }, } diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index 7d22a78895..2592823540 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -15,7 +15,7 @@ from core.workflow.entities.node_entities import ( NodeRunResult, ) from core.workflow.entities.variable_pool import VariablePool -from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from core.workflow.graph_engine.entities.event import ( BaseGraphEvent, BaseNodeEvent, @@ -248,8 +248,8 @@ class IterationNode(BaseNode[IterationNodeData]): status=WorkflowNodeExecutionStatus.SUCCEEDED, outputs={"output": outputs}, metadata={ - NodeRunMetadataKey.ITERATION_DURATION_MAP: iter_run_map, - NodeRunMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens, + WorkflowNodeExecutionMetadataKey.ITERATION_DURATION_MAP: iter_run_map, + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens, }, ) ) @@ -360,16 +360,16 @@ class IterationNode(BaseNode[IterationNodeData]): event.parallel_mode_run_id = parallel_mode_run_id iter_metadata = { - NodeRunMetadataKey.ITERATION_ID: self.node_id, - NodeRunMetadataKey.ITERATION_INDEX: iter_run_index, + WorkflowNodeExecutionMetadataKey.ITERATION_ID: self.node_id, + WorkflowNodeExecutionMetadataKey.ITERATION_INDEX: iter_run_index, } if parallel_mode_run_id: # for parallel, the specific branch ID is more important than the sequential index - iter_metadata[NodeRunMetadataKey.PARALLEL_MODE_RUN_ID] = parallel_mode_run_id + iter_metadata[WorkflowNodeExecutionMetadataKey.PARALLEL_MODE_RUN_ID] = parallel_mode_run_id if event.route_node_state.node_run_result: current_metadata = event.route_node_state.node_run_result.metadata or {} - if NodeRunMetadataKey.ITERATION_ID not in current_metadata: + if WorkflowNodeExecutionMetadataKey.ITERATION_ID not in current_metadata: event.route_node_state.node_run_result.metadata = {**current_metadata, **iter_metadata} return event diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py index ceda0287fd..38e4f7af01 100644 --- a/api/core/workflow/nodes/llm/node.py +++ b/api/core/workflow/nodes/llm/node.py @@ -56,7 +56,7 @@ from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_entities import VariableSelector from core.workflow.entities.variable_pool import VariablePool -from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.event import InNodeEvent from core.workflow.nodes.base import BaseNode @@ -267,9 +267,9 @@ class LLMNode(BaseNode[LLMNodeData]): process_data=process_data, outputs=outputs, metadata={ - NodeRunMetadataKey.TOTAL_TOKENS: usage.total_tokens, - NodeRunMetadataKey.TOTAL_PRICE: usage.total_price, - NodeRunMetadataKey.CURRENCY: usage.currency, + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens, + WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price, + WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency, }, llm_usage=usage, ) diff --git a/api/core/workflow/nodes/loop/loop_node.py b/api/core/workflow/nodes/loop/loop_node.py index eef63c5a92..fafa205386 100644 --- a/api/core/workflow/nodes/loop/loop_node.py +++ b/api/core/workflow/nodes/loop/loop_node.py @@ -16,7 +16,7 @@ from core.variables import ( StringSegment, ) from core.workflow.entities.node_entities import NodeRunResult -from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from core.workflow.graph_engine.entities.event import ( BaseGraphEvent, BaseNodeEvent, @@ -187,10 +187,10 @@ class LoopNode(BaseNode[LoopNodeData]): outputs=self.node_data.outputs, steps=loop_count, metadata={ - NodeRunMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens, + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens, "completed_reason": "loop_break" if check_break_result else "loop_completed", - NodeRunMetadataKey.LOOP_DURATION_MAP: loop_duration_map, - NodeRunMetadataKey.LOOP_VARIABLE_MAP: single_loop_variable_map, + WorkflowNodeExecutionMetadataKey.LOOP_DURATION_MAP: loop_duration_map, + WorkflowNodeExecutionMetadataKey.LOOP_VARIABLE_MAP: single_loop_variable_map, }, ) @@ -198,9 +198,9 @@ class LoopNode(BaseNode[LoopNodeData]): run_result=NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, metadata={ - NodeRunMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens, - NodeRunMetadataKey.LOOP_DURATION_MAP: loop_duration_map, - NodeRunMetadataKey.LOOP_VARIABLE_MAP: single_loop_variable_map, + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens, + WorkflowNodeExecutionMetadataKey.LOOP_DURATION_MAP: loop_duration_map, + WorkflowNodeExecutionMetadataKey.LOOP_VARIABLE_MAP: single_loop_variable_map, }, outputs=self.node_data.outputs, inputs=inputs, @@ -221,8 +221,8 @@ class LoopNode(BaseNode[LoopNodeData]): metadata={ "total_tokens": graph_engine.graph_runtime_state.total_tokens, "completed_reason": "error", - NodeRunMetadataKey.LOOP_DURATION_MAP: loop_duration_map, - NodeRunMetadataKey.LOOP_VARIABLE_MAP: single_loop_variable_map, + WorkflowNodeExecutionMetadataKey.LOOP_DURATION_MAP: loop_duration_map, + WorkflowNodeExecutionMetadataKey.LOOP_VARIABLE_MAP: single_loop_variable_map, }, error=str(e), ) @@ -232,9 +232,9 @@ class LoopNode(BaseNode[LoopNodeData]): status=WorkflowNodeExecutionStatus.FAILED, error=str(e), metadata={ - NodeRunMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens, - NodeRunMetadataKey.LOOP_DURATION_MAP: loop_duration_map, - NodeRunMetadataKey.LOOP_VARIABLE_MAP: single_loop_variable_map, + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens, + WorkflowNodeExecutionMetadataKey.LOOP_DURATION_MAP: loop_duration_map, + WorkflowNodeExecutionMetadataKey.LOOP_VARIABLE_MAP: single_loop_variable_map, }, ) ) @@ -322,7 +322,9 @@ class LoopNode(BaseNode[LoopNodeData]): inputs=inputs, steps=current_index, metadata={ - NodeRunMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens, + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: ( + graph_engine.graph_runtime_state.total_tokens + ), "completed_reason": "error", }, error=event.error, @@ -331,7 +333,11 @@ class LoopNode(BaseNode[LoopNodeData]): run_result=NodeRunResult( status=WorkflowNodeExecutionStatus.FAILED, error=event.error, - metadata={NodeRunMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens}, + metadata={ + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: ( + graph_engine.graph_runtime_state.total_tokens + ) + }, ) ) return {"check_break_result": True} @@ -347,7 +353,7 @@ class LoopNode(BaseNode[LoopNodeData]): inputs=inputs, steps=current_index, metadata={ - NodeRunMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens, + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens, "completed_reason": "error", }, error=event.error, @@ -356,7 +362,9 @@ class LoopNode(BaseNode[LoopNodeData]): run_result=NodeRunResult( status=WorkflowNodeExecutionStatus.FAILED, error=event.error, - metadata={NodeRunMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens}, + metadata={ + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: graph_engine.graph_runtime_state.total_tokens + }, ) ) return {"check_break_result": True} @@ -411,11 +419,11 @@ class LoopNode(BaseNode[LoopNodeData]): metadata = event.route_node_state.node_run_result.metadata if not metadata: metadata = {} - if NodeRunMetadataKey.LOOP_ID not in metadata: + if WorkflowNodeExecutionMetadataKey.LOOP_ID not in metadata: metadata = { **metadata, - NodeRunMetadataKey.LOOP_ID: self.node_id, - NodeRunMetadataKey.LOOP_INDEX: iter_run_index, + WorkflowNodeExecutionMetadataKey.LOOP_ID: self.node_id, + WorkflowNodeExecutionMetadataKey.LOOP_INDEX: iter_run_index, } event.route_node_state.node_run_result.metadata = metadata return event diff --git a/api/core/workflow/nodes/parameter_extractor/parameter_extractor_node.py b/api/core/workflow/nodes/parameter_extractor/parameter_extractor_node.py index 244b15594e..3d13cf7ecd 100644 --- a/api/core/workflow/nodes/parameter_extractor/parameter_extractor_node.py +++ b/api/core/workflow/nodes/parameter_extractor/parameter_extractor_node.py @@ -27,7 +27,7 @@ from core.prompt.simple_prompt_transform import ModelMode from core.prompt.utils.prompt_message_util import PromptMessageUtil from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_pool import VariablePool -from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from core.workflow.nodes.enums import NodeType from core.workflow.nodes.llm import LLMNode, ModelConfig from core.workflow.utils import variable_template_parser @@ -244,9 +244,9 @@ class ParameterExtractorNode(LLMNode): process_data=process_data, outputs={"__is_success": 1 if not error else 0, "__reason": error, **result}, metadata={ - NodeRunMetadataKey.TOTAL_TOKENS: usage.total_tokens, - NodeRunMetadataKey.TOTAL_PRICE: usage.total_price, - NodeRunMetadataKey.CURRENCY: usage.currency, + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens, + WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price, + WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency, }, llm_usage=usage, ) diff --git a/api/core/workflow/nodes/question_classifier/question_classifier_node.py b/api/core/workflow/nodes/question_classifier/question_classifier_node.py index 47626e983d..e846b76280 100644 --- a/api/core/workflow/nodes/question_classifier/question_classifier_node.py +++ b/api/core/workflow/nodes/question_classifier/question_classifier_node.py @@ -11,7 +11,7 @@ from core.prompt.advanced_prompt_transform import AdvancedPromptTransform from core.prompt.simple_prompt_transform import ModelMode from core.prompt.utils.prompt_message_util import PromptMessageUtil from core.workflow.entities.node_entities import NodeRunResult -from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from core.workflow.nodes.enums import NodeType from core.workflow.nodes.event import ModelInvokeCompletedEvent from core.workflow.nodes.llm import ( @@ -142,9 +142,9 @@ class QuestionClassifierNode(LLMNode): outputs=outputs, edge_source_handle=category_id, metadata={ - NodeRunMetadataKey.TOTAL_TOKENS: usage.total_tokens, - NodeRunMetadataKey.TOTAL_PRICE: usage.total_price, - NodeRunMetadataKey.CURRENCY: usage.currency, + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens, + WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price, + WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency, }, llm_usage=usage, ) @@ -154,9 +154,9 @@ class QuestionClassifierNode(LLMNode): inputs=variables, error=str(e), metadata={ - NodeRunMetadataKey.TOTAL_TOKENS: usage.total_tokens, - NodeRunMetadataKey.TOTAL_PRICE: usage.total_price, - NodeRunMetadataKey.CURRENCY: usage.currency, + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: usage.total_tokens, + WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: usage.total_price, + WorkflowNodeExecutionMetadataKey.CURRENCY: usage.currency, }, llm_usage=usage, ) diff --git a/api/core/workflow/nodes/tool/tool_node.py b/api/core/workflow/nodes/tool/tool_node.py index 077e21ade4..aaecc7b989 100644 --- a/api/core/workflow/nodes/tool/tool_node.py +++ b/api/core/workflow/nodes/tool/tool_node.py @@ -16,7 +16,7 @@ from core.variables.segments import ArrayAnySegment from core.variables.variables import ArrayAnyVariable from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_pool import VariablePool -from core.workflow.entities.workflow_node_execution import NodeRunMetadataKey, WorkflowNodeExecutionStatus +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.event import AgentLogEvent from core.workflow.nodes.base import BaseNode @@ -70,7 +70,7 @@ class ToolNode(BaseNode[ToolNodeData]): run_result=NodeRunResult( status=WorkflowNodeExecutionStatus.FAILED, inputs={}, - metadata={NodeRunMetadataKey.TOOL_INFO: tool_info}, + metadata={WorkflowNodeExecutionMetadataKey.TOOL_INFO: tool_info}, error=f"Failed to get tool runtime: {str(e)}", error_type=type(e).__name__, ) @@ -110,7 +110,7 @@ class ToolNode(BaseNode[ToolNodeData]): run_result=NodeRunResult( status=WorkflowNodeExecutionStatus.FAILED, inputs=parameters_for_log, - metadata={NodeRunMetadataKey.TOOL_INFO: tool_info}, + metadata={WorkflowNodeExecutionMetadataKey.TOOL_INFO: tool_info}, error=f"Failed to invoke tool: {str(e)}", error_type=type(e).__name__, ) @@ -125,7 +125,7 @@ class ToolNode(BaseNode[ToolNodeData]): run_result=NodeRunResult( status=WorkflowNodeExecutionStatus.FAILED, inputs=parameters_for_log, - metadata={NodeRunMetadataKey.TOOL_INFO: tool_info}, + metadata={WorkflowNodeExecutionMetadataKey.TOOL_INFO: tool_info}, error=f"Failed to transform tool message: {str(e)}", error_type=type(e).__name__, ) @@ -201,7 +201,7 @@ class ToolNode(BaseNode[ToolNodeData]): json: list[dict] = [] agent_logs: list[AgentLogEvent] = [] - agent_execution_metadata: Mapping[NodeRunMetadataKey, Any] = {} + agent_execution_metadata: Mapping[WorkflowNodeExecutionMetadataKey, Any] = {} variables: dict[str, Any] = {} @@ -274,7 +274,7 @@ class ToolNode(BaseNode[ToolNodeData]): agent_execution_metadata = { key: value for key, value in msg_metadata.items() - if key in NodeRunMetadataKey.__members__.values() + if key in WorkflowNodeExecutionMetadataKey.__members__.values() } json.append(message.message.json_object) elif message.type == ToolInvokeMessage.MessageType.LINK: @@ -366,8 +366,8 @@ class ToolNode(BaseNode[ToolNodeData]): outputs={"text": text, "files": files, "json": json, **variables}, metadata={ **agent_execution_metadata, - NodeRunMetadataKey.TOOL_INFO: tool_info, - NodeRunMetadataKey.AGENT_LOG: agent_logs, + WorkflowNodeExecutionMetadataKey.TOOL_INFO: tool_info, + WorkflowNodeExecutionMetadataKey.AGENT_LOG: agent_logs, }, inputs=parameters_for_log, ) diff --git a/api/core/workflow/workflow_cycle_manager.py b/api/core/workflow/workflow_cycle_manager.py index 6e3c2f3f78..a6a5a04cc7 100644 --- a/api/core/workflow/workflow_cycle_manager.py +++ b/api/core/workflow/workflow_cycle_manager.py @@ -20,7 +20,7 @@ from core.ops.ops_trace_manager import TraceQueueManager, TraceTask from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType from core.workflow.entities.workflow_node_execution import ( NodeExecution, - NodeRunMetadataKey, + WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus, ) from core.workflow.enums import SystemVariableKey @@ -210,15 +210,15 @@ class WorkflowCycleManager: # Create a domain model created_at = datetime.now(UTC).replace(tzinfo=None) metadata = { - NodeRunMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id, - NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id, - NodeRunMetadataKey.LOOP_ID: event.in_loop_id, + WorkflowNodeExecutionMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id, + WorkflowNodeExecutionMetadataKey.ITERATION_ID: event.in_iteration_id, + WorkflowNodeExecutionMetadataKey.LOOP_ID: event.in_loop_id, } domain_execution = NodeExecution( id=str(uuid4()), workflow_id=workflow_execution.workflow_id, - workflow_run_id=workflow_execution.id_, + workflow_execution_id=workflow_execution.id_, predecessor_node_id=event.predecessor_node_id, index=event.node_run_index, node_execution_id=event.node_execution_id, @@ -330,13 +330,13 @@ class WorkflowCycleManager: # Convert metadata keys to strings origin_metadata = { - NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id, - NodeRunMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id, - NodeRunMetadataKey.LOOP_ID: event.in_loop_id, + WorkflowNodeExecutionMetadataKey.ITERATION_ID: event.in_iteration_id, + WorkflowNodeExecutionMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id, + WorkflowNodeExecutionMetadataKey.LOOP_ID: event.in_loop_id, } # Convert execution metadata keys to strings - execution_metadata_dict: dict[NodeRunMetadataKey, str | None] = {} + execution_metadata_dict: dict[WorkflowNodeExecutionMetadataKey, str | None] = {} if event.execution_metadata: for key, value in event.execution_metadata.items(): execution_metadata_dict[key] = value @@ -347,7 +347,7 @@ class WorkflowCycleManager: domain_execution = NodeExecution( id=str(uuid4()), workflow_id=workflow_execution.workflow_id, - workflow_run_id=workflow_execution.id_, + workflow_execution_id=workflow_execution.id_, predecessor_node_id=event.predecessor_node_id, node_execution_id=event.node_execution_id, node_id=event.node_id, diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py index 34c64121af..7535ec4866 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py @@ -4,7 +4,7 @@ import pytest from flask import Flask from core.app.entities.app_invoke_entities import InvokeFrom -from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult +from core.workflow.entities.node_entities import NodeRunResult, WorkflowNodeExecutionMetadataKey from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey @@ -202,9 +202,9 @@ def test_run_parallel_in_workflow(mock_close, mock_remove): process_data={}, outputs={}, metadata={ - NodeRunMetadataKey.TOTAL_TOKENS: 1, - NodeRunMetadataKey.TOTAL_PRICE: 1, - NodeRunMetadataKey.CURRENCY: "USD", + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 1, + WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: 1, + WorkflowNodeExecutionMetadataKey.CURRENCY: "USD", }, ) ) @@ -837,9 +837,9 @@ def test_condition_parallel_correct_output(mock_close, mock_remove, app): process_data={}, outputs={"class_name": "financial", "class_id": "1"}, metadata={ - NodeRunMetadataKey.TOTAL_TOKENS: 1, - NodeRunMetadataKey.TOTAL_PRICE: 1, - NodeRunMetadataKey.CURRENCY: "USD", + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 1, + WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: 1, + WorkflowNodeExecutionMetadataKey.CURRENCY: "USD", }, edge_source_handle="1", ) @@ -853,9 +853,9 @@ def test_condition_parallel_correct_output(mock_close, mock_remove, app): process_data={}, outputs={"result": "dify 123"}, metadata={ - NodeRunMetadataKey.TOTAL_TOKENS: 1, - NodeRunMetadataKey.TOTAL_PRICE: 1, - NodeRunMetadataKey.CURRENCY: "USD", + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 1, + WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: 1, + WorkflowNodeExecutionMetadataKey.CURRENCY: "USD", }, ) ) diff --git a/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py b/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py index c429ac7dd3..ff60d5974b 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_continue_on_error.py @@ -1,7 +1,7 @@ from unittest.mock import patch from core.app.entities.app_invoke_entities import InvokeFrom -from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult +from core.workflow.entities.node_entities import NodeRunResult, WorkflowNodeExecutionMetadataKey from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.graph_engine.entities.event import ( @@ -543,9 +543,9 @@ def test_stream_output_with_fail_branch_continue_on_error(): process_data={}, outputs={}, metadata={ - NodeRunMetadataKey.TOTAL_TOKENS: 1, - NodeRunMetadataKey.TOTAL_PRICE: 1, - NodeRunMetadataKey.CURRENCY: "USD", + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 1, + WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: 1, + WorkflowNodeExecutionMetadataKey.CURRENCY: "USD", }, ) ) diff --git a/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py index a5574d309b..fc4a8f1844 100644 --- a/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py +++ b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py @@ -15,7 +15,7 @@ from core.app.entities.queue_entities import ( from core.workflow.entities.workflow_execution import WorkflowExecution, WorkflowExecutionStatus, WorkflowType from core.workflow.entities.workflow_node_execution import ( NodeExecution, - NodeRunMetadataKey, + WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus, ) from core.workflow.enums import SystemVariableKey @@ -318,7 +318,7 @@ def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_execu # Verify the result assert result.workflow_id == workflow_execution.workflow_id - assert result.workflow_run_id == workflow_execution.id_ + assert result.workflow_execution_id == workflow_execution.id_ assert result.node_execution_id == event.node_execution_id assert result.node_id == event.node_id assert result.node_type == event.node_type @@ -368,7 +368,7 @@ def test_handle_workflow_node_execution_success(workflow_cycle_manager): event.inputs = {"input": "test input"} event.process_data = {"process": "test process"} event.outputs = {"output": "test output"} - event.execution_metadata = {NodeRunMetadataKey.TOTAL_TOKENS: 100} + event.execution_metadata = {WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 100} event.start_at = datetime.now(UTC).replace(tzinfo=None) # Create a real node execution @@ -377,7 +377,7 @@ def test_handle_workflow_node_execution_success(workflow_cycle_manager): id="test-node-execution-record-id", node_execution_id="test-node-execution-id", workflow_id="test-workflow-id", - workflow_run_id="test-workflow-run-id", + workflow_execution_id="test-workflow-run-id", index=1, node_id="test-node-id", node_type=NodeType.LLM, @@ -445,7 +445,7 @@ def test_handle_workflow_node_execution_failed(workflow_cycle_manager): event.inputs = {"input": "test input"} event.process_data = {"process": "test process"} event.outputs = {"output": "test output"} - event.execution_metadata = {NodeRunMetadataKey.TOTAL_TOKENS: 100} + event.execution_metadata = {WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 100} event.start_at = datetime.now(UTC).replace(tzinfo=None) event.error = "Test error message" @@ -455,7 +455,7 @@ def test_handle_workflow_node_execution_failed(workflow_cycle_manager): id="test-node-execution-record-id", node_execution_id="test-node-execution-id", workflow_id="test-workflow-id", - workflow_run_id="test-workflow-run-id", + workflow_execution_id="test-workflow-run-id", index=1, node_id="test-node-id", node_type=NodeType.LLM, diff --git a/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py b/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py index f3cdfd135b..93282d33b0 100644 --- a/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py +++ b/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py @@ -15,7 +15,7 @@ from core.model_runtime.utils.encoders import jsonable_encoder from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.workflow.entities.workflow_node_execution import ( NodeExecution, - NodeRunMetadataKey, + WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus, ) from core.workflow.nodes.enums import NodeType @@ -291,7 +291,7 @@ def test_to_db_model(repository): id="test-id", workflow_id="test-workflow-id", node_execution_id="test-node-execution-id", - workflow_run_id="test-workflow-run-id", + workflow_execution_id="test-workflow-run-id", index=1, predecessor_node_id="test-predecessor-id", node_id="test-node-id", @@ -303,7 +303,10 @@ def test_to_db_model(repository): status=WorkflowNodeExecutionStatus.RUNNING, error=None, elapsed_time=1.5, - metadata={NodeRunMetadataKey.TOTAL_TOKENS: 100, NodeRunMetadataKey.TOTAL_PRICE: Decimal("0.0")}, + metadata={ + WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: 100, + WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: Decimal("0.0"), + }, created_at=datetime.now(), finished_at=None, ) @@ -318,7 +321,7 @@ def test_to_db_model(repository): assert db_model.app_id == repository._app_id assert db_model.workflow_id == domain_model.workflow_id assert db_model.triggered_from == repository._triggered_from - assert db_model.workflow_run_id == domain_model.workflow_run_id + assert db_model.workflow_run_id == domain_model.workflow_execution_id assert db_model.index == domain_model.index assert db_model.predecessor_node_id == domain_model.predecessor_node_id assert db_model.node_execution_id == domain_model.node_execution_id @@ -346,7 +349,7 @@ def test_to_domain_model(repository): inputs_dict = {"input_key": "input_value"} process_data_dict = {"process_key": "process_value"} outputs_dict = {"output_key": "output_value"} - metadata_dict = {str(NodeRunMetadataKey.TOTAL_TOKENS): 100} + metadata_dict = {str(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS): 100} # Create a DB model using our custom subclass db_model = WorkflowNodeExecution() @@ -381,7 +384,7 @@ def test_to_domain_model(repository): assert isinstance(domain_model, NodeExecution) assert domain_model.id == db_model.id assert domain_model.workflow_id == db_model.workflow_id - assert domain_model.workflow_run_id == db_model.workflow_run_id + assert domain_model.workflow_execution_id == db_model.workflow_run_id assert domain_model.index == db_model.index assert domain_model.predecessor_node_id == db_model.predecessor_node_id assert domain_model.node_execution_id == db_model.node_execution_id