From 55503ce77182f4b9cbc017b8b54ea91002eeb9dc Mon Sep 17 00:00:00 2001 From: -LAN- Date: Tue, 27 May 2025 11:24:08 +0800 Subject: [PATCH] fix: persist workflow execution status on partial success and failure (#20264) Signed-off-by: -LAN- --- api/core/workflow/workflow_cycle_manager.py | 39 +++++++++++---------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/api/core/workflow/workflow_cycle_manager.py b/api/core/workflow/workflow_cycle_manager.py index 1918dd9f09..24e23af093 100644 --- a/api/core/workflow/workflow_cycle_manager.py +++ b/api/core/workflow/workflow_cycle_manager.py @@ -125,6 +125,7 @@ class WorkflowCycleManager: ) ) + self._workflow_execution_repository.save(workflow_execution) return workflow_execution def handle_workflow_run_partial_success( @@ -158,6 +159,7 @@ class WorkflowCycleManager: ) ) + self._workflow_execution_repository.save(execution) return execution def handle_workflow_run_failed( @@ -172,44 +174,45 @@ class WorkflowCycleManager: trace_manager: Optional[TraceQueueManager] = None, exceptions_count: int = 0, ) -> WorkflowExecution: - execution = self._get_workflow_execution_or_raise_error(workflow_run_id) + workflow_execution = self._get_workflow_execution_or_raise_error(workflow_run_id) - execution.status = WorkflowExecutionStatus(status.value) - execution.error_message = error_message - execution.total_tokens = total_tokens - execution.total_steps = total_steps - execution.finished_at = datetime.now(UTC).replace(tzinfo=None) - execution.exceptions_count = exceptions_count + workflow_execution.status = WorkflowExecutionStatus(status.value) + workflow_execution.error_message = error_message + workflow_execution.total_tokens = total_tokens + workflow_execution.total_steps = total_steps + workflow_execution.finished_at = datetime.now(UTC).replace(tzinfo=None) + workflow_execution.exceptions_count = exceptions_count # Use the instance repository to find running executions for a workflow run - running_domain_executions = self._workflow_node_execution_repository.get_running_executions( - workflow_run_id=execution.id + running_node_executions = self._workflow_node_execution_repository.get_running_executions( + workflow_run_id=workflow_execution.id ) # Update the domain models now = datetime.now(UTC).replace(tzinfo=None) - for domain_execution in running_domain_executions: - if domain_execution.node_execution_id: + for node_execution in running_node_executions: + if node_execution.node_execution_id: # Update the domain model - domain_execution.status = NodeExecutionStatus.FAILED - domain_execution.error = error_message - domain_execution.finished_at = now - domain_execution.elapsed_time = (now - domain_execution.created_at).total_seconds() + node_execution.status = NodeExecutionStatus.FAILED + node_execution.error = error_message + node_execution.finished_at = now + node_execution.elapsed_time = (now - node_execution.created_at).total_seconds() # Update the repository with the domain model - self._workflow_node_execution_repository.save(domain_execution) + self._workflow_node_execution_repository.save(node_execution) if trace_manager: trace_manager.add_trace_task( TraceTask( TraceTaskName.WORKFLOW_TRACE, - workflow_execution=execution, + workflow_execution=workflow_execution, conversation_id=conversation_id, user_id=trace_manager.user_id, ) ) - return execution + self._workflow_execution_repository.save(workflow_execution) + return workflow_execution def handle_node_execution_start( self,