feat(api): Save output variables as draft variables in single stepping

This commit is contained in:
QuantumGhost 2025-05-22 17:24:12 +08:00
parent ecaf4b0e51
commit f3fc3cf530
2 changed files with 75 additions and 5 deletions

View File

@ -4,3 +4,7 @@ class MoreLikeThisDisabledError(Exception):
class WorkflowHashNotEqualError(Exception):
pass
class IsDraftWorkflowError(Exception):
pass

View File

@ -1,18 +1,21 @@
import json
import time
import logging
from collections.abc import Callable, Generator, Sequence
from datetime import UTC, datetime
from inspect import isgenerator
from typing import Any, Optional
from uuid import uuid4
import time
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.variables import Variable
from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult
from core.workflow.entities.node_execution_entities import NodeExecution, NodeExecutionStatus
from core.workflow.errors import WorkflowNodeRunFailedError
from core.workflow.graph_engine.entities.event import InNodeEvent
@ -35,10 +38,10 @@ from models.workflow import (
WorkflowNodeExecutionTriggeredFrom,
WorkflowType,
)
from services.errors.app import WorkflowHashNotEqualError
from services.errors.app import IsDraftWorkflowError, WorkflowHashNotEqualError
from services.workflow.workflow_converter import WorkflowConverter
from .errors.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError
from .workflow_draft_variable_service import WorkflowDraftVariableService, should_save_output_variables_for_draft
class WorkflowService:
@ -89,6 +92,21 @@ class WorkflowService:
# return draft workflow
return workflow
def get_published_workflow_by_id(self, app_model: App, workflow_id: str) -> Optional[Workflow]:
# fetch published workflow by workflow_id
workflow = (
db.session.query(Workflow)
.filter(
Workflow.tenant_id == app_model.tenant_id,
Workflow.app_id == app_model.id,
Workflow.id == workflow_id,
)
.first()
)
if workflow.version == Workflow.VERSION_DRAFT:
raise IsDraftWorkflowError(f"Workflow is draft version, id={workflow_id}")
return workflow
def get_published_workflow(self, app_model: App) -> Optional[Workflow]:
"""
Get published workflow
@ -227,7 +245,7 @@ class WorkflowService:
tenant_id=app_model.tenant_id,
app_id=app_model.id,
type=draft_workflow.type,
version=str(datetime.now(UTC).replace(tzinfo=None)),
version=Workflow.version_from_datetime(datetime.now(UTC).replace(tzinfo=None)),
graph=draft_workflow.graph,
features=draft_workflow.features,
created_by=account.id,
@ -291,8 +309,17 @@ class WorkflowService:
if not draft_workflow:
raise ValueError("Workflow not initialized")
# conv_vars = common_helpers.get_conversation_variables()
# run draft workflow node
start_at = time.perf_counter()
with Session(bind=db.engine) as session:
# TODO(QunatumGhost): inject conversation variables
# to variable pool.
draft_var_srv = WorkflowDraftVariableService(session)
conv_vars_list = draft_var_srv.list_conversation_variables(app_id=app_model.id)
conv_var_mapping = {v.name: v.get_value().value for v in conv_vars_list.variables}
node_execution = self._handle_node_run_result(
invoke_node_fn=lambda: WorkflowEntry.single_step_run(
@ -300,6 +327,7 @@ class WorkflowService:
node_id=node_id,
user_inputs=user_inputs,
user_id=account.id,
conversation_variables=conv_var_mapping,
),
start_at=start_at,
node_id=node_id,
@ -319,6 +347,27 @@ class WorkflowService:
# Convert node_execution to WorkflowNodeExecution after save
workflow_node_execution = repository.to_db_model(node_execution)
output = workflow_node_execution.outputs_dict or {}
exec_metadata = workflow_node_execution.execution_metadata_dict or {}
should_save = should_save_output_variables_for_draft(
invoke_from=InvokeFrom.DEBUGGER,
loop_id=exec_metadata.get(NodeRunMetadataKey.LOOP_ID, None),
iteration_id=exec_metadata.get(NodeRunMetadataKey.ITERATION_ID, None),
)
if not should_save:
return workflow_node_execution
# TODO(QuantumGhost): single step does not include loop_id or iteration_id in execution_metadata.
with Session(bind=db.engine) as session:
draft_var_srv = WorkflowDraftVariableService(session)
draft_var_srv.save_output_variables(
app_id=app_model.id,
node_id=workflow_node_execution.node_id,
node_type=NodeType(workflow_node_execution.node_type),
output=output,
)
session.commit()
return workflow_node_execution
@ -353,6 +402,7 @@ class WorkflowService:
) -> NodeExecution:
try:
node_instance, generator = invoke_node_fn()
generator = _inspect_generator(generator)
node_run_result: NodeRunResult | None = None
for event in generator:
@ -559,3 +609,19 @@ class WorkflowService:
session.delete(workflow)
return True
def _inspect_generator(gen: Generator[Any] | Any) -> Any:
if not isgenerator(gen):
return gen
def wrapper():
for item in gen:
logging.getLogger(__name__).info(
"received generator item, type=%s, value=%s",
type(item),
item,
)
yield item
return wrapper()