From 0f7ea8d5fa57aa2be5977a953f40cef0ebd8b50c Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Thu, 22 May 2025 17:29:34 +0800 Subject: [PATCH] feat(api): save output variables to draft variables while debugging workflow --- api/core/app/apps/advanced_chat/app_runner.py | 3 +++ api/core/app/apps/workflow/app_runner.py | 3 +++ api/core/app/apps/workflow_app_runner.py | 27 +++++++++++++++++++ 3 files changed, 33 insertions(+) diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index c83e06bf15..18e8310793 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -48,6 +48,9 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): self.message = message self._dialogue_count = dialogue_count + def _get_app_id(self) -> str: + return self.application_generate_entity.app_config.app_id + def run(self) -> None: app_config = self.application_generate_entity.app_config app_config = cast(AdvancedChatAppConfig, app_config) diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index b38ee18ac4..13cf4581ce 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -41,6 +41,9 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): self.queue_manager = queue_manager self.workflow_thread_pool_id = workflow_thread_pool_id + def _get_app_id(self) -> str: + return self.application_generate_entity.app_config.app_id + def run(self) -> None: """ Run application diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 0884fac4a9..1e6c92d59b 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -1,6 +1,8 @@ from collections.abc import Mapping from typing import Any, Optional, cast +from sqlalchemy.orm import Session + from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.base_app_runner import AppRunner from core.app.entities.queue_entities import ( @@ -66,12 +68,19 @@ from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_database import db from models.model import App from models.workflow import Workflow +from services.workflow_draft_variable_service import ( + WorkflowDraftVariableService, + should_save_output_variables_for_draft, +) class WorkflowBasedAppRunner(AppRunner): def __init__(self, queue_manager: AppQueueManager): self.queue_manager = queue_manager + def _get_app_id(self) -> str: + raise NotImplementedError("not implemented") + def _init_graph(self, graph_config: Mapping[str, Any]) -> Graph: """ Init graph @@ -376,6 +385,24 @@ class WorkflowBasedAppRunner(AppRunner): in_loop_id=event.in_loop_id, ) ) + + # FIXME(QuantumGhost): rely on private state of queue_manager is not ideal. + should_save = should_save_output_variables_for_draft( + self.queue_manager._invoke_from, + loop_id=event.in_loop_id, + iteration_id=event.in_iteration_id, + ) + if should_save and outputs is not None: + with Session(bind=db.engine) as session: + draft_var_srv = WorkflowDraftVariableService(session) + draft_var_srv.save_output_variables( + app_id=self._get_app_id(), + node_id=event.node_id, + node_type=event.node_type, + output=outputs, + ) + session.commit() + elif isinstance(event, NodeRunFailedEvent): self._publish_event( QueueNodeFailedEvent(