From 7201b56a6d63c75a809a7df26e389b1be29fcb8a Mon Sep 17 00:00:00 2001 From: Joe <79627742+ZhouhaoJiang@users.noreply.github.com> Date: Fri, 9 Aug 2024 14:46:31 +0800 Subject: [PATCH] fix: workflow log run time error (#7130) --- .../task_pipeline/workflow_cycle_manage.py | 12 +++------- api/services/workflow_service.py | 22 +++++++++++++++++++ 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index b4859edbd9..69951e9371 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -40,6 +40,7 @@ from models.workflow import ( WorkflowRunStatus, WorkflowRunTriggeredFrom, ) +from services.workflow_service import WorkflowService class WorkflowCycleManage(WorkflowIterationCycleManage): @@ -97,7 +98,6 @@ class WorkflowCycleManage(WorkflowIterationCycleManage): def _workflow_run_success( self, workflow_run: WorkflowRun, - start_at: float, total_tokens: int, total_steps: int, outputs: Optional[str] = None, @@ -107,7 +107,6 @@ class WorkflowCycleManage(WorkflowIterationCycleManage): """ Workflow run success :param workflow_run: workflow run - :param start_at: start time :param total_tokens: total tokens :param total_steps: total steps :param outputs: outputs @@ -116,7 +115,7 @@ class WorkflowCycleManage(WorkflowIterationCycleManage): """ workflow_run.status = WorkflowRunStatus.SUCCEEDED.value workflow_run.outputs = outputs - workflow_run.elapsed_time = time.perf_counter() - start_at + workflow_run.elapsed_time = WorkflowService.get_elapsed_time(workflow_run_id=workflow_run.id) workflow_run.total_tokens = total_tokens workflow_run.total_steps = total_steps workflow_run.finished_at = datetime.now(timezone.utc).replace(tzinfo=None) @@ -139,7 +138,6 @@ class WorkflowCycleManage(WorkflowIterationCycleManage): def _workflow_run_failed( self, workflow_run: WorkflowRun, - start_at: float, total_tokens: int, total_steps: int, status: WorkflowRunStatus, @@ -150,7 +148,6 @@ class WorkflowCycleManage(WorkflowIterationCycleManage): """ Workflow run failed :param workflow_run: workflow run - :param start_at: start time :param total_tokens: total tokens :param total_steps: total steps :param status: status @@ -159,7 +156,7 @@ class WorkflowCycleManage(WorkflowIterationCycleManage): """ workflow_run.status = status.value workflow_run.error = error - workflow_run.elapsed_time = time.perf_counter() - start_at + workflow_run.elapsed_time = WorkflowService.get_elapsed_time(workflow_run_id=workflow_run.id) workflow_run.total_tokens = total_tokens workflow_run.total_steps = total_steps workflow_run.finished_at = datetime.now(timezone.utc).replace(tzinfo=None) @@ -542,7 +539,6 @@ class WorkflowCycleManage(WorkflowIterationCycleManage): if isinstance(event, QueueStopEvent): workflow_run = self._workflow_run_failed( workflow_run=workflow_run, - start_at=self._task_state.start_at, total_tokens=self._task_state.total_tokens, total_steps=self._task_state.total_steps, status=WorkflowRunStatus.STOPPED, @@ -565,7 +561,6 @@ class WorkflowCycleManage(WorkflowIterationCycleManage): elif isinstance(event, QueueWorkflowFailedEvent): workflow_run = self._workflow_run_failed( workflow_run=workflow_run, - start_at=self._task_state.start_at, total_tokens=self._task_state.total_tokens, total_steps=self._task_state.total_steps, status=WorkflowRunStatus.FAILED, @@ -583,7 +578,6 @@ class WorkflowCycleManage(WorkflowIterationCycleManage): workflow_run = self._workflow_run_success( workflow_run=workflow_run, - start_at=self._task_state.start_at, total_tokens=self._task_state.total_tokens, total_steps=self._task_state.total_steps, outputs=outputs, diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index d868255f96..38de538a19 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -319,3 +319,25 @@ class WorkflowService: ) else: raise ValueError(f"Invalid app mode: {app_model.mode}") + + @classmethod + def get_elapsed_time(cls, workflow_run_id: str) -> float: + """ + Get elapsed time + """ + elapsed_time = 0.0 + + # fetch workflow node execution by workflow_run_id + workflow_nodes = ( + db.session.query(WorkflowNodeExecution) + .filter(WorkflowNodeExecution.workflow_run_id == workflow_run_id) + .order_by(WorkflowNodeExecution.created_at.asc()) + .all() + ) + if not workflow_nodes: + return elapsed_time + + for node in workflow_nodes: + elapsed_time += node.elapsed_time + + return elapsed_time \ No newline at end of file