From f3fc3cf5302ee2cca46dad208a840f8c99c3b63d Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Thu, 22 May 2025 17:24:12 +0800 Subject: [PATCH] feat(api): Save output variables as draft variables in single stepping --- api/services/errors/app.py | 4 ++ api/services/workflow_service.py | 76 +++++++++++++++++++++++++++++--- 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/api/services/errors/app.py b/api/services/errors/app.py index 87e9e9247d..5d348c61be 100644 --- a/api/services/errors/app.py +++ b/api/services/errors/app.py @@ -4,3 +4,7 @@ class MoreLikeThisDisabledError(Exception): class WorkflowHashNotEqualError(Exception): pass + + +class IsDraftWorkflowError(Exception): + pass diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 3418fcb6fe..6d52be7033 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -1,18 +1,21 @@ import json -import time +import logging from collections.abc import Callable, Generator, Sequence from datetime import UTC, datetime +from inspect import isgenerator from typing import Any, Optional from uuid import uuid4 +import time from sqlalchemy import select from sqlalchemy.orm import Session from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager +from core.app.entities.app_invoke_entities import InvokeFrom from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.variables import Variable -from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult from core.workflow.entities.node_execution_entities import NodeExecution, NodeExecutionStatus from core.workflow.errors import WorkflowNodeRunFailedError from core.workflow.graph_engine.entities.event import InNodeEvent @@ -35,10 +38,10 @@ from models.workflow import ( WorkflowNodeExecutionTriggeredFrom, WorkflowType, ) -from services.errors.app import WorkflowHashNotEqualError +from services.errors.app import IsDraftWorkflowError, WorkflowHashNotEqualError from services.workflow.workflow_converter import WorkflowConverter - from .errors.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError +from .workflow_draft_variable_service import WorkflowDraftVariableService, should_save_output_variables_for_draft class WorkflowService: @@ -89,6 +92,21 @@ class WorkflowService: # return draft workflow return workflow + def get_published_workflow_by_id(self, app_model: App, workflow_id: str) -> Optional[Workflow]: + # fetch published workflow by workflow_id + workflow = ( + db.session.query(Workflow) + .filter( + Workflow.tenant_id == app_model.tenant_id, + Workflow.app_id == app_model.id, + Workflow.id == workflow_id, + ) + .first() + ) + if workflow.version == Workflow.VERSION_DRAFT: + raise IsDraftWorkflowError(f"Workflow is draft version, id={workflow_id}") + return workflow + def get_published_workflow(self, app_model: App) -> Optional[Workflow]: """ Get published workflow @@ -227,7 +245,7 @@ class WorkflowService: tenant_id=app_model.tenant_id, app_id=app_model.id, type=draft_workflow.type, - version=str(datetime.now(UTC).replace(tzinfo=None)), + version=Workflow.version_from_datetime(datetime.now(UTC).replace(tzinfo=None)), graph=draft_workflow.graph, features=draft_workflow.features, created_by=account.id, @@ -291,8 +309,17 @@ class WorkflowService: if not draft_workflow: raise ValueError("Workflow not initialized") + # conv_vars = common_helpers.get_conversation_variables() + # run draft workflow node start_at = time.perf_counter() + with Session(bind=db.engine) as session: + # TODO(QunatumGhost): inject conversation variables + # to variable pool. + draft_var_srv = WorkflowDraftVariableService(session) + + conv_vars_list = draft_var_srv.list_conversation_variables(app_id=app_model.id) + conv_var_mapping = {v.name: v.get_value().value for v in conv_vars_list.variables} node_execution = self._handle_node_run_result( invoke_node_fn=lambda: WorkflowEntry.single_step_run( @@ -300,6 +327,7 @@ class WorkflowService: node_id=node_id, user_inputs=user_inputs, user_id=account.id, + conversation_variables=conv_var_mapping, ), start_at=start_at, node_id=node_id, @@ -319,6 +347,27 @@ class WorkflowService: # Convert node_execution to WorkflowNodeExecution after save workflow_node_execution = repository.to_db_model(node_execution) + output = workflow_node_execution.outputs_dict or {} + + exec_metadata = workflow_node_execution.execution_metadata_dict or {} + + should_save = should_save_output_variables_for_draft( + invoke_from=InvokeFrom.DEBUGGER, + loop_id=exec_metadata.get(NodeRunMetadataKey.LOOP_ID, None), + iteration_id=exec_metadata.get(NodeRunMetadataKey.ITERATION_ID, None), + ) + if not should_save: + return workflow_node_execution + # TODO(QuantumGhost): single step does not include loop_id or iteration_id in execution_metadata. + with Session(bind=db.engine) as session: + draft_var_srv = WorkflowDraftVariableService(session) + draft_var_srv.save_output_variables( + app_id=app_model.id, + node_id=workflow_node_execution.node_id, + node_type=NodeType(workflow_node_execution.node_type), + output=output, + ) + session.commit() return workflow_node_execution @@ -353,6 +402,7 @@ class WorkflowService: ) -> NodeExecution: try: node_instance, generator = invoke_node_fn() + generator = _inspect_generator(generator) node_run_result: NodeRunResult | None = None for event in generator: @@ -559,3 +609,19 @@ class WorkflowService: session.delete(workflow) return True + + +def _inspect_generator(gen: Generator[Any] | Any) -> Any: + if not isgenerator(gen): + return gen + + def wrapper(): + for item in gen: + logging.getLogger(__name__).info( + "received generator item, type=%s, value=%s", + type(item), + item, + ) + yield item + + return wrapper()