From efd557568397545f8523e374517b1a21d2ddbc2d Mon Sep 17 00:00:00 2001 From: yihong Date: Sun, 15 Dec 2024 17:22:13 +0800 Subject: [PATCH] fix: _handle_workflow_run_partial_success args is wrong (#11562) Signed-off-by: yihong0618 --- .../apps/workflow/generate_task_pipeline.py | 66 +++++++++---------- 1 file changed, 30 insertions(+), 36 deletions(-) diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 8483fa91f8..bdece7e665 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -259,36 +259,36 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event) - response = self._workflow_node_start_to_stream_response( + node_start_response = self._workflow_node_start_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, ) - if response: - yield response + if node_start_response: + yield node_start_response elif isinstance(event, QueueNodeSucceededEvent): workflow_node_execution = self._handle_workflow_node_execution_success(event) - response = self._workflow_node_finish_to_stream_response( + node_success_response = self._workflow_node_finish_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, ) - if response: - yield response + if node_success_response: + yield node_success_response elif isinstance(event, QueueNodeFailedEvent | QueueNodeInIterationFailedEvent | QueueNodeExceptionEvent): workflow_node_execution = self._handle_workflow_node_execution_failed(event) - response = self._workflow_node_finish_to_stream_response( + node_failed_response = self._workflow_node_finish_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, ) - if response: - yield response + if node_failed_response: + yield node_failed_response elif isinstance(event, QueueParallelBranchRunStartedEvent): if not workflow_run: raise Exception("Workflow run not initialized.") @@ -377,20 +377,19 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa if not graph_runtime_state: raise Exception("Graph runtime state not initialized.") - handle_args = { - "workflow_run": workflow_run, - "start_at": graph_runtime_state.start_at, - "total_tokens": graph_runtime_state.total_tokens, - "total_steps": graph_runtime_state.node_run_steps, - "status": WorkflowRunStatus.FAILED + workflow_run = self._handle_workflow_run_failed( + workflow_run=workflow_run, + start_at=graph_runtime_state.start_at, + total_tokens=graph_runtime_state.total_tokens, + total_steps=graph_runtime_state.node_run_steps, + status=WorkflowRunStatus.FAILED if isinstance(event, QueueWorkflowFailedEvent) else WorkflowRunStatus.STOPPED, - "error": event.error if isinstance(event, QueueWorkflowFailedEvent) else event.get_stop_reason(), - "conversation_id": None, - "trace_manager": trace_manager, - "exceptions_count": event.exceptions_count if isinstance(event, QueueWorkflowFailedEvent) else 0, - } - workflow_run = self._handle_workflow_run_failed(**handle_args) + error=event.error if isinstance(event, QueueWorkflowFailedEvent) else event.get_stop_reason(), + conversation_id=None, + trace_manager=trace_manager, + exceptions_count=event.exceptions_count if isinstance(event, QueueWorkflowFailedEvent) else 0, + ) # save workflow app log self._save_workflow_app_log(workflow_run) @@ -404,21 +403,16 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa if not graph_runtime_state: raise Exception("Graph runtime state not initialized.") - handle_args = { - "workflow_run": workflow_run, - "start_at": graph_runtime_state.start_at, - "total_tokens": graph_runtime_state.total_tokens, - "total_steps": graph_runtime_state.node_run_steps, - "status": WorkflowRunStatus.FAILED - if isinstance(event, QueueWorkflowFailedEvent) - else WorkflowRunStatus.STOPPED, - "error": event.error if isinstance(event, QueueWorkflowFailedEvent) else event.get_stop_reason(), - "conversation_id": None, - "trace_manager": trace_manager, - "exceptions_count": event.exceptions_count, - } - workflow_run = self._handle_workflow_run_partial_success(**handle_args) - + workflow_run = self._handle_workflow_run_partial_success( + workflow_run=workflow_run, + start_at=graph_runtime_state.start_at, + total_tokens=graph_runtime_state.total_tokens, + total_steps=graph_runtime_state.node_run_steps, + outputs=event.outputs, + exceptions_count=event.exceptions_count, + conversation_id=None, + trace_manager=trace_manager, + ) # save workflow app log self._save_workflow_app_log(workflow_run)