feat(api): save output variables to draft variables while debugging workflow

This commit is contained in:
QuantumGhost 2025-05-22 17:29:34 +08:00
parent 25b4a96aed
commit 0f7ea8d5fa
3 changed files with 33 additions and 0 deletions

View File

@ -48,6 +48,9 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
self.message = message self.message = message
self._dialogue_count = dialogue_count self._dialogue_count = dialogue_count
def _get_app_id(self) -> str:
return self.application_generate_entity.app_config.app_id
def run(self) -> None: def run(self) -> None:
app_config = self.application_generate_entity.app_config app_config = self.application_generate_entity.app_config
app_config = cast(AdvancedChatAppConfig, app_config) app_config = cast(AdvancedChatAppConfig, app_config)

View File

@ -41,6 +41,9 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
self.queue_manager = queue_manager self.queue_manager = queue_manager
self.workflow_thread_pool_id = workflow_thread_pool_id self.workflow_thread_pool_id = workflow_thread_pool_id
def _get_app_id(self) -> str:
return self.application_generate_entity.app_config.app_id
def run(self) -> None: def run(self) -> None:
""" """
Run application Run application

View File

@ -1,6 +1,8 @@
from collections.abc import Mapping from collections.abc import Mapping
from typing import Any, Optional, cast from typing import Any, Optional, cast
from sqlalchemy.orm import Session
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.base_app_runner import AppRunner from core.app.apps.base_app_runner import AppRunner
from core.app.entities.queue_entities import ( from core.app.entities.queue_entities import (
@ -66,12 +68,19 @@ from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_database import db from extensions.ext_database import db
from models.model import App from models.model import App
from models.workflow import Workflow from models.workflow import Workflow
from services.workflow_draft_variable_service import (
WorkflowDraftVariableService,
should_save_output_variables_for_draft,
)
class WorkflowBasedAppRunner(AppRunner): class WorkflowBasedAppRunner(AppRunner):
def __init__(self, queue_manager: AppQueueManager): def __init__(self, queue_manager: AppQueueManager):
self.queue_manager = queue_manager self.queue_manager = queue_manager
def _get_app_id(self) -> str:
raise NotImplementedError("not implemented")
def _init_graph(self, graph_config: Mapping[str, Any]) -> Graph: def _init_graph(self, graph_config: Mapping[str, Any]) -> Graph:
""" """
Init graph Init graph
@ -376,6 +385,24 @@ class WorkflowBasedAppRunner(AppRunner):
in_loop_id=event.in_loop_id, in_loop_id=event.in_loop_id,
) )
) )
# FIXME(QuantumGhost): rely on private state of queue_manager is not ideal.
should_save = should_save_output_variables_for_draft(
self.queue_manager._invoke_from,
loop_id=event.in_loop_id,
iteration_id=event.in_iteration_id,
)
if should_save and outputs is not None:
with Session(bind=db.engine) as session:
draft_var_srv = WorkflowDraftVariableService(session)
draft_var_srv.save_output_variables(
app_id=self._get_app_id(),
node_id=event.node_id,
node_type=event.node_type,
output=outputs,
)
session.commit()
elif isinstance(event, NodeRunFailedEvent): elif isinstance(event, NodeRunFailedEvent):
self._publish_event( self._publish_event(
QueueNodeFailedEvent( QueueNodeFailedEvent(