diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index cd764d56a5..e43417668d 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -10,6 +10,7 @@ from sqlalchemy.orm import Session from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY_YIELD_CPU_TIME from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom +from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter from core.app.entities.app_invoke_entities import ( AdvancedChatAppGenerateEntity, InvokeFrom, @@ -131,6 +132,10 @@ class AdvancedChatAppGenerateTaskPipeline: workflow_node_execution_repository=workflow_node_execution_repository, ) + self._workflow_response_converter = WorkflowResponseConverter( + application_generate_entity=application_generate_entity, + ) + self._task_state = WorkflowTaskState() self._message_cycle_manager = MessageCycleManage( application_generate_entity=application_generate_entity, task_state=self._task_state @@ -306,7 +311,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not message: raise ValueError(f"Message not found: {self._message_id}") message.workflow_run_id = workflow_execution.id - workflow_start_resp = self._workflow_cycle_manager.workflow_start_to_stream_response( + workflow_start_resp = self._workflow_response_converter.workflow_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, ) @@ -323,7 +328,7 @@ class AdvancedChatAppGenerateTaskPipeline: workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried( workflow_execution_id=self._workflow_run_id, event=event ) - node_retry_resp = self._workflow_cycle_manager.workflow_node_retry_to_stream_response( + node_retry_resp = self._workflow_response_converter.workflow_node_retry_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -340,7 +345,7 @@ class AdvancedChatAppGenerateTaskPipeline: workflow_execution_id=self._workflow_run_id, event=event ) - node_start_resp = self._workflow_cycle_manager.workflow_node_start_to_stream_response( + node_start_resp = self._workflow_response_converter.workflow_node_start_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -352,7 +357,7 @@ class AdvancedChatAppGenerateTaskPipeline: # Record files if it's an answer node or end node if event.node_type in [NodeType.ANSWER, NodeType.END]: self._recorded_files.extend( - self._workflow_cycle_manager.fetch_files_from_node_outputs(event.outputs or {}) + self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {}) ) with Session(db.engine, expire_on_commit=False) as session: @@ -360,7 +365,7 @@ class AdvancedChatAppGenerateTaskPipeline: event=event ) - node_finish_resp = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( + node_finish_resp = self._workflow_response_converter.workflow_node_finish_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -380,7 +385,7 @@ class AdvancedChatAppGenerateTaskPipeline: event=event ) - node_finish_resp = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( + node_finish_resp = self._workflow_response_converter.workflow_node_finish_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -392,10 +397,12 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - parallel_start_resp = self._workflow_cycle_manager.workflow_parallel_branch_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, + parallel_start_resp = ( + self._workflow_response_converter.workflow_parallel_branch_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) ) yield parallel_start_resp @@ -404,7 +411,7 @@ class AdvancedChatAppGenerateTaskPipeline: raise ValueError("workflow run not initialized.") parallel_finish_resp = ( - self._workflow_cycle_manager.workflow_parallel_branch_finished_to_stream_response( + self._workflow_response_converter.workflow_parallel_branch_finished_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -416,7 +423,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_start_resp = self._workflow_cycle_manager.workflow_iteration_start_to_stream_response( + iter_start_resp = self._workflow_response_converter.workflow_iteration_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -427,7 +434,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_next_resp = self._workflow_cycle_manager.workflow_iteration_next_to_stream_response( + iter_next_resp = self._workflow_response_converter.workflow_iteration_next_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -438,7 +445,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_finish_resp = self._workflow_cycle_manager.workflow_iteration_completed_to_stream_response( + iter_finish_resp = self._workflow_response_converter.workflow_iteration_completed_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -449,7 +456,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_start_resp = self._workflow_cycle_manager.workflow_loop_start_to_stream_response( + loop_start_resp = self._workflow_response_converter.workflow_loop_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -460,7 +467,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_next_resp = self._workflow_cycle_manager.workflow_loop_next_to_stream_response( + loop_next_resp = self._workflow_response_converter.workflow_loop_next_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -471,7 +478,7 @@ class AdvancedChatAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_finish_resp = self._workflow_cycle_manager.workflow_loop_completed_to_stream_response( + loop_finish_resp = self._workflow_response_converter.workflow_loop_completed_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -495,7 +502,7 @@ class AdvancedChatAppGenerateTaskPipeline: trace_manager=trace_manager, ) - workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -521,7 +528,7 @@ class AdvancedChatAppGenerateTaskPipeline: conversation_id=None, trace_manager=trace_manager, ) - workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -548,7 +555,7 @@ class AdvancedChatAppGenerateTaskPipeline: trace_manager=trace_manager, exceptions_count=event.exceptions_count, ) - workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -573,7 +580,7 @@ class AdvancedChatAppGenerateTaskPipeline: conversation_id=self._conversation_id, trace_manager=trace_manager, ) - workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -657,7 +664,7 @@ class AdvancedChatAppGenerateTaskPipeline: yield self._message_end_to_stream_response() elif isinstance(event, QueueAgentLogEvent): - yield self._workflow_cycle_manager.handle_agent_log( + yield self._workflow_response_converter.handle_agent_log( task_id=self._application_generate_entity.task_id, event=event ) else: diff --git a/api/core/app/apps/common/__init__.py b/api/core/app/apps/common/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py new file mode 100644 index 0000000000..7669bf74bb --- /dev/null +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -0,0 +1,564 @@ +import time +from collections.abc import Mapping, Sequence +from datetime import UTC, datetime +from typing import Any, Optional, Union, cast + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity +from core.app.entities.queue_entities import ( + QueueAgentLogEvent, + QueueIterationCompletedEvent, + QueueIterationNextEvent, + QueueIterationStartEvent, + QueueLoopCompletedEvent, + QueueLoopNextEvent, + QueueLoopStartEvent, + QueueNodeExceptionEvent, + QueueNodeFailedEvent, + QueueNodeInIterationFailedEvent, + QueueNodeInLoopFailedEvent, + QueueNodeRetryEvent, + QueueNodeStartedEvent, + QueueNodeSucceededEvent, + QueueParallelBranchRunFailedEvent, + QueueParallelBranchRunStartedEvent, + QueueParallelBranchRunSucceededEvent, +) +from core.app.entities.task_entities import ( + AgentLogStreamResponse, + IterationNodeCompletedStreamResponse, + IterationNodeNextStreamResponse, + IterationNodeStartStreamResponse, + LoopNodeCompletedStreamResponse, + LoopNodeNextStreamResponse, + LoopNodeStartStreamResponse, + NodeFinishStreamResponse, + NodeRetryStreamResponse, + NodeStartStreamResponse, + ParallelBranchFinishedStreamResponse, + ParallelBranchStartStreamResponse, + WorkflowFinishStreamResponse, + WorkflowStartStreamResponse, +) +from core.file import FILE_MODEL_IDENTITY, File +from core.tools.tool_manager import ToolManager +from core.workflow.entities.node_execution_entities import NodeExecution +from core.workflow.entities.workflow_execution_entities import WorkflowExecution +from core.workflow.nodes import NodeType +from core.workflow.nodes.tool.entities import ToolNodeData +from models import ( + Account, + CreatorUserRole, + EndUser, + WorkflowNodeExecutionStatus, + WorkflowRun, +) + + +class WorkflowResponseConverter: + def __init__( + self, + *, + application_generate_entity: Union[AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity], + ) -> None: + self._application_generate_entity = application_generate_entity + + def workflow_start_to_stream_response( + self, + *, + task_id: str, + workflow_execution: WorkflowExecution, + ) -> WorkflowStartStreamResponse: + return WorkflowStartStreamResponse( + task_id=task_id, + workflow_run_id=workflow_execution.id, + data=WorkflowStartStreamResponse.Data( + id=workflow_execution.id, + workflow_id=workflow_execution.workflow_id, + sequence_number=workflow_execution.sequence_number, + inputs=workflow_execution.inputs, + created_at=int(workflow_execution.started_at.timestamp()), + ), + ) + + def workflow_finish_to_stream_response( + self, + *, + session: Session, + task_id: str, + workflow_execution: WorkflowExecution, + ) -> WorkflowFinishStreamResponse: + created_by = None + workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id)) + assert workflow_run is not None + if workflow_run.created_by_role == CreatorUserRole.ACCOUNT: + stmt = select(Account).where(Account.id == workflow_run.created_by) + account = session.scalar(stmt) + if account: + created_by = { + "id": account.id, + "name": account.name, + "email": account.email, + } + elif workflow_run.created_by_role == CreatorUserRole.END_USER: + stmt = select(EndUser).where(EndUser.id == workflow_run.created_by) + end_user = session.scalar(stmt) + if end_user: + created_by = { + "id": end_user.id, + "user": end_user.session_id, + } + else: + raise NotImplementedError(f"unknown created_by_role: {workflow_run.created_by_role}") + + # Handle the case where finished_at is None by using current time as default + finished_at_timestamp = ( + int(workflow_execution.finished_at.timestamp()) + if workflow_execution.finished_at + else int(datetime.now(UTC).timestamp()) + ) + + return WorkflowFinishStreamResponse( + task_id=task_id, + workflow_run_id=workflow_execution.id, + data=WorkflowFinishStreamResponse.Data( + id=workflow_execution.id, + workflow_id=workflow_execution.workflow_id, + sequence_number=workflow_execution.sequence_number, + status=workflow_execution.status, + outputs=workflow_execution.outputs, + error=workflow_execution.error_message, + elapsed_time=workflow_execution.elapsed_time, + total_tokens=workflow_execution.total_tokens, + total_steps=workflow_execution.total_steps, + created_by=created_by, + created_at=int(workflow_execution.started_at.timestamp()), + finished_at=finished_at_timestamp, + files=self.fetch_files_from_node_outputs(workflow_execution.outputs), + exceptions_count=workflow_execution.exceptions_count, + ), + ) + + def workflow_node_start_to_stream_response( + self, + *, + event: QueueNodeStartedEvent, + task_id: str, + workflow_node_execution: NodeExecution, + ) -> Optional[NodeStartStreamResponse]: + if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}: + return None + if not workflow_node_execution.workflow_run_id: + return None + + response = NodeStartStreamResponse( + task_id=task_id, + workflow_run_id=workflow_node_execution.workflow_run_id, + data=NodeStartStreamResponse.Data( + id=workflow_node_execution.id, + node_id=workflow_node_execution.node_id, + node_type=workflow_node_execution.node_type, + title=workflow_node_execution.title, + index=workflow_node_execution.index, + predecessor_node_id=workflow_node_execution.predecessor_node_id, + inputs=workflow_node_execution.inputs, + created_at=int(workflow_node_execution.created_at.timestamp()), + parallel_id=event.parallel_id, + parallel_start_node_id=event.parallel_start_node_id, + parent_parallel_id=event.parent_parallel_id, + parent_parallel_start_node_id=event.parent_parallel_start_node_id, + iteration_id=event.in_iteration_id, + loop_id=event.in_loop_id, + parallel_run_id=event.parallel_mode_run_id, + agent_strategy=event.agent_strategy, + ), + ) + + # extras logic + if event.node_type == NodeType.TOOL: + node_data = cast(ToolNodeData, event.node_data) + response.data.extras["icon"] = ToolManager.get_tool_icon( + tenant_id=self._application_generate_entity.app_config.tenant_id, + provider_type=node_data.provider_type, + provider_id=node_data.provider_id, + ) + + return response + + def workflow_node_finish_to_stream_response( + self, + *, + event: QueueNodeSucceededEvent + | QueueNodeFailedEvent + | QueueNodeInIterationFailedEvent + | QueueNodeInLoopFailedEvent + | QueueNodeExceptionEvent, + task_id: str, + workflow_node_execution: NodeExecution, + ) -> Optional[NodeFinishStreamResponse]: + if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}: + return None + if not workflow_node_execution.workflow_run_id: + return None + if not workflow_node_execution.finished_at: + return None + + return NodeFinishStreamResponse( + task_id=task_id, + workflow_run_id=workflow_node_execution.workflow_run_id, + data=NodeFinishStreamResponse.Data( + id=workflow_node_execution.id, + node_id=workflow_node_execution.node_id, + node_type=workflow_node_execution.node_type, + index=workflow_node_execution.index, + title=workflow_node_execution.title, + predecessor_node_id=workflow_node_execution.predecessor_node_id, + inputs=workflow_node_execution.inputs, + process_data=workflow_node_execution.process_data, + outputs=workflow_node_execution.outputs, + status=workflow_node_execution.status, + error=workflow_node_execution.error, + elapsed_time=workflow_node_execution.elapsed_time, + execution_metadata=workflow_node_execution.metadata, + created_at=int(workflow_node_execution.created_at.timestamp()), + finished_at=int(workflow_node_execution.finished_at.timestamp()), + files=self.fetch_files_from_node_outputs(workflow_node_execution.outputs or {}), + parallel_id=event.parallel_id, + parallel_start_node_id=event.parallel_start_node_id, + parent_parallel_id=event.parent_parallel_id, + parent_parallel_start_node_id=event.parent_parallel_start_node_id, + iteration_id=event.in_iteration_id, + loop_id=event.in_loop_id, + ), + ) + + def workflow_node_retry_to_stream_response( + self, + *, + event: QueueNodeRetryEvent, + task_id: str, + workflow_node_execution: NodeExecution, + ) -> Optional[Union[NodeRetryStreamResponse, NodeFinishStreamResponse]]: + if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}: + return None + if not workflow_node_execution.workflow_run_id: + return None + if not workflow_node_execution.finished_at: + return None + + return NodeRetryStreamResponse( + task_id=task_id, + workflow_run_id=workflow_node_execution.workflow_run_id, + data=NodeRetryStreamResponse.Data( + id=workflow_node_execution.id, + node_id=workflow_node_execution.node_id, + node_type=workflow_node_execution.node_type, + index=workflow_node_execution.index, + title=workflow_node_execution.title, + predecessor_node_id=workflow_node_execution.predecessor_node_id, + inputs=workflow_node_execution.inputs, + process_data=workflow_node_execution.process_data, + outputs=workflow_node_execution.outputs, + status=workflow_node_execution.status, + error=workflow_node_execution.error, + elapsed_time=workflow_node_execution.elapsed_time, + execution_metadata=workflow_node_execution.metadata, + created_at=int(workflow_node_execution.created_at.timestamp()), + finished_at=int(workflow_node_execution.finished_at.timestamp()), + files=self.fetch_files_from_node_outputs(workflow_node_execution.outputs or {}), + parallel_id=event.parallel_id, + parallel_start_node_id=event.parallel_start_node_id, + parent_parallel_id=event.parent_parallel_id, + parent_parallel_start_node_id=event.parent_parallel_start_node_id, + iteration_id=event.in_iteration_id, + loop_id=event.in_loop_id, + retry_index=event.retry_index, + ), + ) + + def workflow_parallel_branch_start_to_stream_response( + self, + *, + task_id: str, + workflow_execution_id: str, + event: QueueParallelBranchRunStartedEvent, + ) -> ParallelBranchStartStreamResponse: + return ParallelBranchStartStreamResponse( + task_id=task_id, + workflow_run_id=workflow_execution_id, + data=ParallelBranchStartStreamResponse.Data( + parallel_id=event.parallel_id, + parallel_branch_id=event.parallel_start_node_id, + parent_parallel_id=event.parent_parallel_id, + parent_parallel_start_node_id=event.parent_parallel_start_node_id, + iteration_id=event.in_iteration_id, + loop_id=event.in_loop_id, + created_at=int(time.time()), + ), + ) + + def workflow_parallel_branch_finished_to_stream_response( + self, + *, + task_id: str, + workflow_execution_id: str, + event: QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent, + ) -> ParallelBranchFinishedStreamResponse: + return ParallelBranchFinishedStreamResponse( + task_id=task_id, + workflow_run_id=workflow_execution_id, + data=ParallelBranchFinishedStreamResponse.Data( + parallel_id=event.parallel_id, + parallel_branch_id=event.parallel_start_node_id, + parent_parallel_id=event.parent_parallel_id, + parent_parallel_start_node_id=event.parent_parallel_start_node_id, + iteration_id=event.in_iteration_id, + loop_id=event.in_loop_id, + status="succeeded" if isinstance(event, QueueParallelBranchRunSucceededEvent) else "failed", + error=event.error if isinstance(event, QueueParallelBranchRunFailedEvent) else None, + created_at=int(time.time()), + ), + ) + + def workflow_iteration_start_to_stream_response( + self, + *, + task_id: str, + workflow_execution_id: str, + event: QueueIterationStartEvent, + ) -> IterationNodeStartStreamResponse: + return IterationNodeStartStreamResponse( + task_id=task_id, + workflow_run_id=workflow_execution_id, + data=IterationNodeStartStreamResponse.Data( + id=event.node_id, + node_id=event.node_id, + node_type=event.node_type.value, + title=event.node_data.title, + created_at=int(time.time()), + extras={}, + inputs=event.inputs or {}, + metadata=event.metadata or {}, + parallel_id=event.parallel_id, + parallel_start_node_id=event.parallel_start_node_id, + ), + ) + + def workflow_iteration_next_to_stream_response( + self, + *, + task_id: str, + workflow_execution_id: str, + event: QueueIterationNextEvent, + ) -> IterationNodeNextStreamResponse: + return IterationNodeNextStreamResponse( + task_id=task_id, + workflow_run_id=workflow_execution_id, + data=IterationNodeNextStreamResponse.Data( + id=event.node_id, + node_id=event.node_id, + node_type=event.node_type.value, + title=event.node_data.title, + index=event.index, + pre_iteration_output=event.output, + created_at=int(time.time()), + extras={}, + parallel_id=event.parallel_id, + parallel_start_node_id=event.parallel_start_node_id, + parallel_mode_run_id=event.parallel_mode_run_id, + duration=event.duration, + ), + ) + + def workflow_iteration_completed_to_stream_response( + self, + *, + task_id: str, + workflow_execution_id: str, + event: QueueIterationCompletedEvent, + ) -> IterationNodeCompletedStreamResponse: + return IterationNodeCompletedStreamResponse( + task_id=task_id, + workflow_run_id=workflow_execution_id, + data=IterationNodeCompletedStreamResponse.Data( + id=event.node_id, + node_id=event.node_id, + node_type=event.node_type.value, + title=event.node_data.title, + outputs=event.outputs, + created_at=int(time.time()), + extras={}, + inputs=event.inputs or {}, + status=WorkflowNodeExecutionStatus.SUCCEEDED + if event.error is None + else WorkflowNodeExecutionStatus.FAILED, + error=None, + elapsed_time=(datetime.now(UTC).replace(tzinfo=None) - event.start_at).total_seconds(), + total_tokens=event.metadata.get("total_tokens", 0) if event.metadata else 0, + execution_metadata=event.metadata, + finished_at=int(time.time()), + steps=event.steps, + parallel_id=event.parallel_id, + parallel_start_node_id=event.parallel_start_node_id, + ), + ) + + def workflow_loop_start_to_stream_response( + self, *, task_id: str, workflow_execution_id: str, event: QueueLoopStartEvent + ) -> LoopNodeStartStreamResponse: + return LoopNodeStartStreamResponse( + task_id=task_id, + workflow_run_id=workflow_execution_id, + data=LoopNodeStartStreamResponse.Data( + id=event.node_id, + node_id=event.node_id, + node_type=event.node_type.value, + title=event.node_data.title, + created_at=int(time.time()), + extras={}, + inputs=event.inputs or {}, + metadata=event.metadata or {}, + parallel_id=event.parallel_id, + parallel_start_node_id=event.parallel_start_node_id, + ), + ) + + def workflow_loop_next_to_stream_response( + self, + *, + task_id: str, + workflow_execution_id: str, + event: QueueLoopNextEvent, + ) -> LoopNodeNextStreamResponse: + return LoopNodeNextStreamResponse( + task_id=task_id, + workflow_run_id=workflow_execution_id, + data=LoopNodeNextStreamResponse.Data( + id=event.node_id, + node_id=event.node_id, + node_type=event.node_type.value, + title=event.node_data.title, + index=event.index, + pre_loop_output=event.output, + created_at=int(time.time()), + extras={}, + parallel_id=event.parallel_id, + parallel_start_node_id=event.parallel_start_node_id, + parallel_mode_run_id=event.parallel_mode_run_id, + duration=event.duration, + ), + ) + + def workflow_loop_completed_to_stream_response( + self, + *, + task_id: str, + workflow_execution_id: str, + event: QueueLoopCompletedEvent, + ) -> LoopNodeCompletedStreamResponse: + return LoopNodeCompletedStreamResponse( + task_id=task_id, + workflow_run_id=workflow_execution_id, + data=LoopNodeCompletedStreamResponse.Data( + id=event.node_id, + node_id=event.node_id, + node_type=event.node_type.value, + title=event.node_data.title, + outputs=event.outputs, + created_at=int(time.time()), + extras={}, + inputs=event.inputs or {}, + status=WorkflowNodeExecutionStatus.SUCCEEDED + if event.error is None + else WorkflowNodeExecutionStatus.FAILED, + error=None, + elapsed_time=(datetime.now(UTC).replace(tzinfo=None) - event.start_at).total_seconds(), + total_tokens=event.metadata.get("total_tokens", 0) if event.metadata else 0, + execution_metadata=event.metadata, + finished_at=int(time.time()), + steps=event.steps, + parallel_id=event.parallel_id, + parallel_start_node_id=event.parallel_start_node_id, + ), + ) + + def fetch_files_from_node_outputs(self, outputs_dict: Mapping[str, Any] | None) -> Sequence[Mapping[str, Any]]: + """ + Fetch files from node outputs + :param outputs_dict: node outputs dict + :return: + """ + if not outputs_dict: + return [] + + files = [self._fetch_files_from_variable_value(output_value) for output_value in outputs_dict.values()] + # Remove None + files = [file for file in files if file] + # Flatten list + # Flatten the list of sequences into a single list of mappings + flattened_files = [file for sublist in files if sublist for file in sublist] + + # Convert to tuple to match Sequence type + return tuple(flattened_files) + + def _fetch_files_from_variable_value(self, value: Union[dict, list]) -> Sequence[Mapping[str, Any]]: + """ + Fetch files from variable value + :param value: variable value + :return: + """ + if not value: + return [] + + files = [] + if isinstance(value, list): + for item in value: + file = self._get_file_var_from_value(item) + if file: + files.append(file) + elif isinstance(value, dict): + file = self._get_file_var_from_value(value) + if file: + files.append(file) + + return files + + def _get_file_var_from_value(self, value: Union[dict, list]) -> Mapping[str, Any] | None: + """ + Get file var from value + :param value: variable value + :return: + """ + if not value: + return None + + if isinstance(value, dict) and value.get("dify_model_identity") == FILE_MODEL_IDENTITY: + return value + elif isinstance(value, File): + return value.to_dict() + + return None + + def handle_agent_log(self, task_id: str, event: QueueAgentLogEvent) -> AgentLogStreamResponse: + """ + Handle agent log + :param task_id: task id + :param event: agent log event + :return: + """ + return AgentLogStreamResponse( + task_id=task_id, + data=AgentLogStreamResponse.Data( + node_execution_id=event.node_execution_id, + id=event.id, + parent_id=event.parent_id, + label=event.label, + error=event.error, + status=event.status, + data=event.data, + metadata=event.metadata, + node_id=event.node_id, + ), + ) diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index f2ebd78b36..0291f49cac 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -8,6 +8,7 @@ from sqlalchemy.orm import Session from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY_YIELD_CPU_TIME from core.app.apps.base_app_queue_manager import AppQueueManager +from core.app.apps.common.workflow_response_converter import WorkflowResponseConverter from core.app.entities.app_invoke_entities import ( InvokeFrom, WorkflowAppGenerateEntity, @@ -119,6 +120,10 @@ class WorkflowAppGenerateTaskPipeline: workflow_node_execution_repository=workflow_node_execution_repository, ) + self._workflow_response_converter = WorkflowResponseConverter( + application_generate_entity=application_generate_entity, + ) + self._application_generate_entity = application_generate_entity self._workflow_id = workflow.id self._workflow_features_dict = workflow.features_dict @@ -268,7 +273,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_id=self._workflow_id, ) self._workflow_run_id = workflow_execution.id - start_resp = self._workflow_cycle_manager.workflow_start_to_stream_response( + start_resp = self._workflow_response_converter.workflow_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, ) @@ -285,7 +290,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_execution_id=self._workflow_run_id, event=event, ) - response = self._workflow_cycle_manager.workflow_node_retry_to_stream_response( + response = self._workflow_response_converter.workflow_node_retry_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -301,7 +306,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start( workflow_execution_id=self._workflow_run_id, event=event ) - node_start_response = self._workflow_cycle_manager.workflow_node_start_to_stream_response( + node_start_response = self._workflow_response_converter.workflow_node_start_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -313,7 +318,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success( event=event ) - node_success_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( + node_success_response = self._workflow_response_converter.workflow_node_finish_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -331,7 +336,7 @@ class WorkflowAppGenerateTaskPipeline: workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed( event=event, ) - node_failed_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( + node_failed_response = self._workflow_response_converter.workflow_node_finish_to_stream_response( event=event, task_id=self._application_generate_entity.task_id, workflow_node_execution=workflow_node_execution, @@ -344,10 +349,12 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - parallel_start_resp = self._workflow_cycle_manager.workflow_parallel_branch_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, + parallel_start_resp = ( + self._workflow_response_converter.workflow_parallel_branch_start_to_stream_response( + task_id=self._application_generate_entity.task_id, + workflow_execution_id=self._workflow_run_id, + event=event, + ) ) yield parallel_start_resp @@ -357,7 +364,7 @@ class WorkflowAppGenerateTaskPipeline: raise ValueError("workflow run not initialized.") parallel_finish_resp = ( - self._workflow_cycle_manager.workflow_parallel_branch_finished_to_stream_response( + self._workflow_response_converter.workflow_parallel_branch_finished_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -370,7 +377,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_start_resp = self._workflow_cycle_manager.workflow_iteration_start_to_stream_response( + iter_start_resp = self._workflow_response_converter.workflow_iteration_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -382,7 +389,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_next_resp = self._workflow_cycle_manager.workflow_iteration_next_to_stream_response( + iter_next_resp = self._workflow_response_converter.workflow_iteration_next_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -394,7 +401,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - iter_finish_resp = self._workflow_cycle_manager.workflow_iteration_completed_to_stream_response( + iter_finish_resp = self._workflow_response_converter.workflow_iteration_completed_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -406,7 +413,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_start_resp = self._workflow_cycle_manager.workflow_loop_start_to_stream_response( + loop_start_resp = self._workflow_response_converter.workflow_loop_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -418,7 +425,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_next_resp = self._workflow_cycle_manager.workflow_loop_next_to_stream_response( + loop_next_resp = self._workflow_response_converter.workflow_loop_next_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -430,7 +437,7 @@ class WorkflowAppGenerateTaskPipeline: if not self._workflow_run_id: raise ValueError("workflow run not initialized.") - loop_finish_resp = self._workflow_cycle_manager.workflow_loop_completed_to_stream_response( + loop_finish_resp = self._workflow_response_converter.workflow_loop_completed_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_execution_id=self._workflow_run_id, event=event, @@ -457,7 +464,7 @@ class WorkflowAppGenerateTaskPipeline: # save workflow app log self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) - workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -485,7 +492,7 @@ class WorkflowAppGenerateTaskPipeline: # save workflow app log self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) - workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -518,7 +525,7 @@ class WorkflowAppGenerateTaskPipeline: # save workflow app log self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) - workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( + workflow_finish_resp = self._workflow_response_converter.workflow_finish_to_stream_response( session=session, task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, @@ -540,7 +547,7 @@ class WorkflowAppGenerateTaskPipeline: delta_text, from_variable_selector=event.from_variable_selector ) elif isinstance(event, QueueAgentLogEvent): - yield self._workflow_cycle_manager.handle_agent_log( + yield self._workflow_response_converter.handle_agent_log( task_id=self._application_generate_entity.task_id, event=event ) else: diff --git a/api/core/workflow/workflow_app_generate_task_pipeline.py b/api/core/workflow/workflow_app_generate_task_pipeline.py deleted file mode 100644 index f2ebd78b36..0000000000 --- a/api/core/workflow/workflow_app_generate_task_pipeline.py +++ /dev/null @@ -1,591 +0,0 @@ -import logging -import time -from collections.abc import Generator -from typing import Optional, Union - -from sqlalchemy import select -from sqlalchemy.orm import Session - -from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY_YIELD_CPU_TIME -from core.app.apps.base_app_queue_manager import AppQueueManager -from core.app.entities.app_invoke_entities import ( - InvokeFrom, - WorkflowAppGenerateEntity, -) -from core.app.entities.queue_entities import ( - QueueAgentLogEvent, - QueueErrorEvent, - QueueIterationCompletedEvent, - QueueIterationNextEvent, - QueueIterationStartEvent, - QueueLoopCompletedEvent, - QueueLoopNextEvent, - QueueLoopStartEvent, - QueueNodeExceptionEvent, - QueueNodeFailedEvent, - QueueNodeInIterationFailedEvent, - QueueNodeInLoopFailedEvent, - QueueNodeRetryEvent, - QueueNodeStartedEvent, - QueueNodeSucceededEvent, - QueueParallelBranchRunFailedEvent, - QueueParallelBranchRunStartedEvent, - QueueParallelBranchRunSucceededEvent, - QueuePingEvent, - QueueStopEvent, - QueueTextChunkEvent, - QueueWorkflowFailedEvent, - QueueWorkflowPartialSuccessEvent, - QueueWorkflowStartedEvent, - QueueWorkflowSucceededEvent, -) -from core.app.entities.task_entities import ( - ErrorStreamResponse, - MessageAudioEndStreamResponse, - MessageAudioStreamResponse, - StreamResponse, - TextChunkStreamResponse, - WorkflowAppBlockingResponse, - WorkflowAppStreamResponse, - WorkflowFinishStreamResponse, - WorkflowStartStreamResponse, - WorkflowTaskState, -) -from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline -from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk -from core.ops.ops_trace_manager import TraceQueueManager -from core.workflow.entities.workflow_execution_entities import WorkflowExecution -from core.workflow.enums import SystemVariableKey -from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository -from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository -from core.workflow.workflow_cycle_manager import WorkflowCycleManager -from extensions.ext_database import db -from models.account import Account -from models.enums import CreatorUserRole -from models.model import EndUser -from models.workflow import ( - Workflow, - WorkflowAppLog, - WorkflowAppLogCreatedFrom, - WorkflowRun, - WorkflowRunStatus, -) - -logger = logging.getLogger(__name__) - - -class WorkflowAppGenerateTaskPipeline: - """ - WorkflowAppGenerateTaskPipeline is a class that generate stream output and state management for Application. - """ - - def __init__( - self, - application_generate_entity: WorkflowAppGenerateEntity, - workflow: Workflow, - queue_manager: AppQueueManager, - user: Union[Account, EndUser], - stream: bool, - workflow_execution_repository: WorkflowExecutionRepository, - workflow_node_execution_repository: WorkflowNodeExecutionRepository, - ) -> None: - self._base_task_pipeline = BasedGenerateTaskPipeline( - application_generate_entity=application_generate_entity, - queue_manager=queue_manager, - stream=stream, - ) - - if isinstance(user, EndUser): - self._user_id = user.id - user_session_id = user.session_id - self._created_by_role = CreatorUserRole.END_USER - elif isinstance(user, Account): - self._user_id = user.id - user_session_id = user.id - self._created_by_role = CreatorUserRole.ACCOUNT - else: - raise ValueError(f"Invalid user type: {type(user)}") - - self._workflow_cycle_manager = WorkflowCycleManager( - application_generate_entity=application_generate_entity, - workflow_system_variables={ - SystemVariableKey.FILES: application_generate_entity.files, - SystemVariableKey.USER_ID: user_session_id, - SystemVariableKey.APP_ID: application_generate_entity.app_config.app_id, - SystemVariableKey.WORKFLOW_ID: workflow.id, - SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id, - }, - workflow_execution_repository=workflow_execution_repository, - workflow_node_execution_repository=workflow_node_execution_repository, - ) - - self._application_generate_entity = application_generate_entity - self._workflow_id = workflow.id - self._workflow_features_dict = workflow.features_dict - self._task_state = WorkflowTaskState() - self._workflow_run_id = "" - - def process(self) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]: - """ - Process generate task pipeline. - :return: - """ - generator = self._wrapper_process_stream_response(trace_manager=self._application_generate_entity.trace_manager) - if self._base_task_pipeline._stream: - return self._to_stream_response(generator) - else: - return self._to_blocking_response(generator) - - def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]) -> WorkflowAppBlockingResponse: - """ - To blocking response. - :return: - """ - for stream_response in generator: - if isinstance(stream_response, ErrorStreamResponse): - raise stream_response.err - elif isinstance(stream_response, WorkflowFinishStreamResponse): - response = WorkflowAppBlockingResponse( - task_id=self._application_generate_entity.task_id, - workflow_run_id=stream_response.data.id, - data=WorkflowAppBlockingResponse.Data( - id=stream_response.data.id, - workflow_id=stream_response.data.workflow_id, - status=stream_response.data.status, - outputs=stream_response.data.outputs, - error=stream_response.data.error, - elapsed_time=stream_response.data.elapsed_time, - total_tokens=stream_response.data.total_tokens, - total_steps=stream_response.data.total_steps, - created_at=int(stream_response.data.created_at), - finished_at=int(stream_response.data.finished_at), - ), - ) - - return response - else: - continue - - raise ValueError("queue listening stopped unexpectedly.") - - def _to_stream_response( - self, generator: Generator[StreamResponse, None, None] - ) -> Generator[WorkflowAppStreamResponse, None, None]: - """ - To stream response. - :return: - """ - workflow_run_id = None - for stream_response in generator: - if isinstance(stream_response, WorkflowStartStreamResponse): - workflow_run_id = stream_response.workflow_run_id - - yield WorkflowAppStreamResponse(workflow_run_id=workflow_run_id, stream_response=stream_response) - - def _listen_audio_msg(self, publisher: AppGeneratorTTSPublisher | None, task_id: str): - if not publisher: - return None - audio_msg = publisher.check_and_get_audio() - if audio_msg and isinstance(audio_msg, AudioTrunk) and audio_msg.status != "finish": - return MessageAudioStreamResponse(audio=audio_msg.audio, task_id=task_id) - return None - - def _wrapper_process_stream_response( - self, trace_manager: Optional[TraceQueueManager] = None - ) -> Generator[StreamResponse, None, None]: - tts_publisher = None - task_id = self._application_generate_entity.task_id - tenant_id = self._application_generate_entity.app_config.tenant_id - features_dict = self._workflow_features_dict - - if ( - features_dict.get("text_to_speech") - and features_dict["text_to_speech"].get("enabled") - and features_dict["text_to_speech"].get("autoPlay") == "enabled" - ): - tts_publisher = AppGeneratorTTSPublisher( - tenant_id, features_dict["text_to_speech"].get("voice"), features_dict["text_to_speech"].get("language") - ) - - for response in self._process_stream_response(tts_publisher=tts_publisher, trace_manager=trace_manager): - while True: - audio_response = self._listen_audio_msg(publisher=tts_publisher, task_id=task_id) - if audio_response: - yield audio_response - else: - break - yield response - - start_listener_time = time.time() - while (time.time() - start_listener_time) < TTS_AUTO_PLAY_TIMEOUT: - try: - if not tts_publisher: - break - audio_trunk = tts_publisher.check_and_get_audio() - if audio_trunk is None: - # release cpu - # sleep 20 ms ( 40ms => 1280 byte audio file,20ms => 640 byte audio file) - time.sleep(TTS_AUTO_PLAY_YIELD_CPU_TIME) - continue - if audio_trunk.status == "finish": - break - else: - yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id) - except Exception: - logger.exception(f"Fails to get audio trunk, task_id: {task_id}") - break - if tts_publisher: - yield MessageAudioEndStreamResponse(audio="", task_id=task_id) - - def _process_stream_response( - self, - tts_publisher: Optional[AppGeneratorTTSPublisher] = None, - trace_manager: Optional[TraceQueueManager] = None, - ) -> Generator[StreamResponse, None, None]: - """ - Process stream response. - :return: - """ - graph_runtime_state = None - - for queue_message in self._base_task_pipeline._queue_manager.listen(): - event = queue_message.event - - if isinstance(event, QueuePingEvent): - yield self._base_task_pipeline._ping_stream_response() - elif isinstance(event, QueueErrorEvent): - err = self._base_task_pipeline._handle_error(event=event) - yield self._base_task_pipeline._error_to_stream_response(err) - break - elif isinstance(event, QueueWorkflowStartedEvent): - # override graph runtime state - graph_runtime_state = event.graph_runtime_state - - with Session(db.engine, expire_on_commit=False) as session: - # init workflow run - workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start( - session=session, - workflow_id=self._workflow_id, - ) - self._workflow_run_id = workflow_execution.id - start_resp = self._workflow_cycle_manager.workflow_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution=workflow_execution, - ) - - yield start_resp - elif isinstance( - event, - QueueNodeRetryEvent, - ): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - with Session(db.engine, expire_on_commit=False) as session: - workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried( - workflow_execution_id=self._workflow_run_id, - event=event, - ) - response = self._workflow_cycle_manager.workflow_node_retry_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, - ) - session.commit() - - if response: - yield response - elif isinstance(event, QueueNodeStartedEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start( - workflow_execution_id=self._workflow_run_id, event=event - ) - node_start_response = self._workflow_cycle_manager.workflow_node_start_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, - ) - - if node_start_response: - yield node_start_response - elif isinstance(event, QueueNodeSucceededEvent): - workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success( - event=event - ) - node_success_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, - ) - - if node_success_response: - yield node_success_response - elif isinstance( - event, - QueueNodeFailedEvent - | QueueNodeInIterationFailedEvent - | QueueNodeInLoopFailedEvent - | QueueNodeExceptionEvent, - ): - workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed( - event=event, - ) - node_failed_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, - ) - - if node_failed_response: - yield node_failed_response - - elif isinstance(event, QueueParallelBranchRunStartedEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - parallel_start_resp = self._workflow_cycle_manager.workflow_parallel_branch_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield parallel_start_resp - - elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - parallel_finish_resp = ( - self._workflow_cycle_manager.workflow_parallel_branch_finished_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - ) - - yield parallel_finish_resp - - elif isinstance(event, QueueIterationStartEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - iter_start_resp = self._workflow_cycle_manager.workflow_iteration_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield iter_start_resp - - elif isinstance(event, QueueIterationNextEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - iter_next_resp = self._workflow_cycle_manager.workflow_iteration_next_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield iter_next_resp - - elif isinstance(event, QueueIterationCompletedEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - iter_finish_resp = self._workflow_cycle_manager.workflow_iteration_completed_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield iter_finish_resp - - elif isinstance(event, QueueLoopStartEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - loop_start_resp = self._workflow_cycle_manager.workflow_loop_start_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield loop_start_resp - - elif isinstance(event, QueueLoopNextEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - loop_next_resp = self._workflow_cycle_manager.workflow_loop_next_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield loop_next_resp - - elif isinstance(event, QueueLoopCompletedEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - - loop_finish_resp = self._workflow_cycle_manager.workflow_loop_completed_to_stream_response( - task_id=self._application_generate_entity.task_id, - workflow_execution_id=self._workflow_run_id, - event=event, - ) - - yield loop_finish_resp - - elif isinstance(event, QueueWorkflowSucceededEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - if not graph_runtime_state: - raise ValueError("graph runtime state not initialized.") - - with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success( - workflow_run_id=self._workflow_run_id, - total_tokens=graph_runtime_state.total_tokens, - total_steps=graph_runtime_state.node_run_steps, - outputs=event.outputs, - conversation_id=None, - trace_manager=trace_manager, - ) - - # save workflow app log - self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) - - workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( - session=session, - task_id=self._application_generate_entity.task_id, - workflow_execution=workflow_execution, - ) - session.commit() - - yield workflow_finish_resp - elif isinstance(event, QueueWorkflowPartialSuccessEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - if not graph_runtime_state: - raise ValueError("graph runtime state not initialized.") - - with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager.handle_workflow_run_partial_success( - workflow_run_id=self._workflow_run_id, - total_tokens=graph_runtime_state.total_tokens, - total_steps=graph_runtime_state.node_run_steps, - outputs=event.outputs, - exceptions_count=event.exceptions_count, - conversation_id=None, - trace_manager=trace_manager, - ) - - # save workflow app log - self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) - - workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( - session=session, - task_id=self._application_generate_entity.task_id, - workflow_execution=workflow_execution, - ) - session.commit() - - yield workflow_finish_resp - elif isinstance(event, QueueWorkflowFailedEvent | QueueStopEvent): - if not self._workflow_run_id: - raise ValueError("workflow run not initialized.") - if not graph_runtime_state: - raise ValueError("graph runtime state not initialized.") - - with Session(db.engine, expire_on_commit=False) as session: - workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed( - workflow_run_id=self._workflow_run_id, - total_tokens=graph_runtime_state.total_tokens, - total_steps=graph_runtime_state.node_run_steps, - status=WorkflowRunStatus.FAILED - if isinstance(event, QueueWorkflowFailedEvent) - else WorkflowRunStatus.STOPPED, - error_message=event.error - if isinstance(event, QueueWorkflowFailedEvent) - else event.get_stop_reason(), - conversation_id=None, - trace_manager=trace_manager, - exceptions_count=event.exceptions_count if isinstance(event, QueueWorkflowFailedEvent) else 0, - ) - - # save workflow app log - self._save_workflow_app_log(session=session, workflow_execution=workflow_execution) - - workflow_finish_resp = self._workflow_cycle_manager.workflow_finish_to_stream_response( - session=session, - task_id=self._application_generate_entity.task_id, - workflow_execution=workflow_execution, - ) - session.commit() - - yield workflow_finish_resp - elif isinstance(event, QueueTextChunkEvent): - delta_text = event.text - if delta_text is None: - continue - - # only publish tts message at text chunk streaming - if tts_publisher: - tts_publisher.publish(queue_message) - - self._task_state.answer += delta_text - yield self._text_chunk_to_stream_response( - delta_text, from_variable_selector=event.from_variable_selector - ) - elif isinstance(event, QueueAgentLogEvent): - yield self._workflow_cycle_manager.handle_agent_log( - task_id=self._application_generate_entity.task_id, event=event - ) - else: - continue - - if tts_publisher: - tts_publisher.publish(None) - - def _save_workflow_app_log(self, *, session: Session, workflow_execution: WorkflowExecution) -> None: - workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id)) - assert workflow_run is not None - invoke_from = self._application_generate_entity.invoke_from - if invoke_from == InvokeFrom.SERVICE_API: - created_from = WorkflowAppLogCreatedFrom.SERVICE_API - elif invoke_from == InvokeFrom.EXPLORE: - created_from = WorkflowAppLogCreatedFrom.INSTALLED_APP - elif invoke_from == InvokeFrom.WEB_APP: - created_from = WorkflowAppLogCreatedFrom.WEB_APP - else: - # not save log for debugging - return - - workflow_app_log = WorkflowAppLog() - workflow_app_log.tenant_id = workflow_run.tenant_id - workflow_app_log.app_id = workflow_run.app_id - workflow_app_log.workflow_id = workflow_run.workflow_id - workflow_app_log.workflow_run_id = workflow_run.id - workflow_app_log.created_from = created_from.value - workflow_app_log.created_by_role = self._created_by_role - workflow_app_log.created_by = self._user_id - - session.add(workflow_app_log) - session.commit() - - def _text_chunk_to_stream_response( - self, text: str, from_variable_selector: Optional[list[str]] = None - ) -> TextChunkStreamResponse: - """ - Handle completed event. - :param text: text - :return: - """ - response = TextChunkStreamResponse( - task_id=self._application_generate_entity.task_id, - data=TextChunkStreamResponse.Data(text=text, from_variable_selector=from_variable_selector), - ) - - return response diff --git a/api/core/workflow/workflow_cycle_manager.py b/api/core/workflow/workflow_cycle_manager.py index d4c2b1b6bd..1918dd9f09 100644 --- a/api/core/workflow/workflow_cycle_manager.py +++ b/api/core/workflow/workflow_cycle_manager.py @@ -1,7 +1,6 @@ -import time -from collections.abc import Mapping, Sequence +from collections.abc import Mapping from datetime import UTC, datetime -from typing import Any, Optional, Union, cast +from typing import Any, Optional, Union from uuid import uuid4 from sqlalchemy import func, select @@ -9,13 +8,6 @@ from sqlalchemy.orm import Session from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity from core.app.entities.queue_entities import ( - QueueAgentLogEvent, - QueueIterationCompletedEvent, - QueueIterationNextEvent, - QueueIterationStartEvent, - QueueLoopCompletedEvent, - QueueLoopNextEvent, - QueueLoopStartEvent, QueueNodeExceptionEvent, QueueNodeFailedEvent, QueueNodeInIterationFailedEvent, @@ -23,31 +15,10 @@ from core.app.entities.queue_entities import ( QueueNodeRetryEvent, QueueNodeStartedEvent, QueueNodeSucceededEvent, - QueueParallelBranchRunFailedEvent, - QueueParallelBranchRunStartedEvent, - QueueParallelBranchRunSucceededEvent, -) -from core.app.entities.task_entities import ( - AgentLogStreamResponse, - IterationNodeCompletedStreamResponse, - IterationNodeNextStreamResponse, - IterationNodeStartStreamResponse, - LoopNodeCompletedStreamResponse, - LoopNodeNextStreamResponse, - LoopNodeStartStreamResponse, - NodeFinishStreamResponse, - NodeRetryStreamResponse, - NodeStartStreamResponse, - ParallelBranchFinishedStreamResponse, - ParallelBranchStartStreamResponse, - WorkflowFinishStreamResponse, - WorkflowStartStreamResponse, ) from core.app.task_pipeline.exc import WorkflowRunNotFoundError -from core.file import FILE_MODEL_IDENTITY, File from core.ops.entities.trace_entity import TraceTaskName from core.ops.ops_trace_manager import TraceQueueManager, TraceTask -from core.tools.tool_manager import ToolManager from core.workflow.entities.node_entities import NodeRunMetadataKey from core.workflow.entities.node_execution_entities import ( NodeExecution, @@ -55,17 +26,11 @@ from core.workflow.entities.node_execution_entities import ( ) from core.workflow.entities.workflow_execution_entities import WorkflowExecution, WorkflowExecutionStatus, WorkflowType from core.workflow.enums import SystemVariableKey -from core.workflow.nodes import NodeType -from core.workflow.nodes.tool.entities import ToolNodeData from core.workflow.repository.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.workflow_entry import WorkflowEntry from models import ( - Account, - CreatorUserRole, - EndUser, Workflow, - WorkflowNodeExecutionStatus, WorkflowRun, WorkflowRunStatus, ) @@ -416,506 +381,8 @@ class WorkflowCycleManager: return domain_execution - def workflow_start_to_stream_response( - self, - *, - task_id: str, - workflow_execution: WorkflowExecution, - ) -> WorkflowStartStreamResponse: - return WorkflowStartStreamResponse( - task_id=task_id, - workflow_run_id=workflow_execution.id, - data=WorkflowStartStreamResponse.Data( - id=workflow_execution.id, - workflow_id=workflow_execution.workflow_id, - sequence_number=workflow_execution.sequence_number, - inputs=workflow_execution.inputs, - created_at=int(workflow_execution.started_at.timestamp()), - ), - ) - - def workflow_finish_to_stream_response( - self, - *, - session: Session, - task_id: str, - workflow_execution: WorkflowExecution, - ) -> WorkflowFinishStreamResponse: - created_by = None - workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == workflow_execution.id)) - assert workflow_run is not None - if workflow_run.created_by_role == CreatorUserRole.ACCOUNT: - stmt = select(Account).where(Account.id == workflow_run.created_by) - account = session.scalar(stmt) - if account: - created_by = { - "id": account.id, - "name": account.name, - "email": account.email, - } - elif workflow_run.created_by_role == CreatorUserRole.END_USER: - stmt = select(EndUser).where(EndUser.id == workflow_run.created_by) - end_user = session.scalar(stmt) - if end_user: - created_by = { - "id": end_user.id, - "user": end_user.session_id, - } - else: - raise NotImplementedError(f"unknown created_by_role: {workflow_run.created_by_role}") - - # Handle the case where finished_at is None by using current time as default - finished_at_timestamp = ( - int(workflow_execution.finished_at.timestamp()) - if workflow_execution.finished_at - else int(datetime.now(UTC).timestamp()) - ) - - return WorkflowFinishStreamResponse( - task_id=task_id, - workflow_run_id=workflow_execution.id, - data=WorkflowFinishStreamResponse.Data( - id=workflow_execution.id, - workflow_id=workflow_execution.workflow_id, - sequence_number=workflow_execution.sequence_number, - status=workflow_execution.status, - outputs=workflow_execution.outputs, - error=workflow_execution.error_message, - elapsed_time=workflow_execution.elapsed_time, - total_tokens=workflow_execution.total_tokens, - total_steps=workflow_execution.total_steps, - created_by=created_by, - created_at=int(workflow_execution.started_at.timestamp()), - finished_at=finished_at_timestamp, - files=self.fetch_files_from_node_outputs(workflow_execution.outputs), - exceptions_count=workflow_execution.exceptions_count, - ), - ) - - def workflow_node_start_to_stream_response( - self, - *, - event: QueueNodeStartedEvent, - task_id: str, - workflow_node_execution: NodeExecution, - ) -> Optional[NodeStartStreamResponse]: - if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}: - return None - if not workflow_node_execution.workflow_run_id: - return None - - response = NodeStartStreamResponse( - task_id=task_id, - workflow_run_id=workflow_node_execution.workflow_run_id, - data=NodeStartStreamResponse.Data( - id=workflow_node_execution.id, - node_id=workflow_node_execution.node_id, - node_type=workflow_node_execution.node_type, - title=workflow_node_execution.title, - index=workflow_node_execution.index, - predecessor_node_id=workflow_node_execution.predecessor_node_id, - inputs=workflow_node_execution.inputs, - created_at=int(workflow_node_execution.created_at.timestamp()), - parallel_id=event.parallel_id, - parallel_start_node_id=event.parallel_start_node_id, - parent_parallel_id=event.parent_parallel_id, - parent_parallel_start_node_id=event.parent_parallel_start_node_id, - iteration_id=event.in_iteration_id, - loop_id=event.in_loop_id, - parallel_run_id=event.parallel_mode_run_id, - agent_strategy=event.agent_strategy, - ), - ) - - # extras logic - if event.node_type == NodeType.TOOL: - node_data = cast(ToolNodeData, event.node_data) - response.data.extras["icon"] = ToolManager.get_tool_icon( - tenant_id=self._application_generate_entity.app_config.tenant_id, - provider_type=node_data.provider_type, - provider_id=node_data.provider_id, - ) - - return response - - def workflow_node_finish_to_stream_response( - self, - *, - event: QueueNodeSucceededEvent - | QueueNodeFailedEvent - | QueueNodeInIterationFailedEvent - | QueueNodeInLoopFailedEvent - | QueueNodeExceptionEvent, - task_id: str, - workflow_node_execution: NodeExecution, - ) -> Optional[NodeFinishStreamResponse]: - if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}: - return None - if not workflow_node_execution.workflow_run_id: - return None - if not workflow_node_execution.finished_at: - return None - - return NodeFinishStreamResponse( - task_id=task_id, - workflow_run_id=workflow_node_execution.workflow_run_id, - data=NodeFinishStreamResponse.Data( - id=workflow_node_execution.id, - node_id=workflow_node_execution.node_id, - node_type=workflow_node_execution.node_type, - index=workflow_node_execution.index, - title=workflow_node_execution.title, - predecessor_node_id=workflow_node_execution.predecessor_node_id, - inputs=workflow_node_execution.inputs, - process_data=workflow_node_execution.process_data, - outputs=workflow_node_execution.outputs, - status=workflow_node_execution.status, - error=workflow_node_execution.error, - elapsed_time=workflow_node_execution.elapsed_time, - execution_metadata=workflow_node_execution.metadata, - created_at=int(workflow_node_execution.created_at.timestamp()), - finished_at=int(workflow_node_execution.finished_at.timestamp()), - files=self.fetch_files_from_node_outputs(workflow_node_execution.outputs or {}), - parallel_id=event.parallel_id, - parallel_start_node_id=event.parallel_start_node_id, - parent_parallel_id=event.parent_parallel_id, - parent_parallel_start_node_id=event.parent_parallel_start_node_id, - iteration_id=event.in_iteration_id, - loop_id=event.in_loop_id, - ), - ) - - def workflow_node_retry_to_stream_response( - self, - *, - event: QueueNodeRetryEvent, - task_id: str, - workflow_node_execution: NodeExecution, - ) -> Optional[Union[NodeRetryStreamResponse, NodeFinishStreamResponse]]: - if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}: - return None - if not workflow_node_execution.workflow_run_id: - return None - if not workflow_node_execution.finished_at: - return None - - return NodeRetryStreamResponse( - task_id=task_id, - workflow_run_id=workflow_node_execution.workflow_run_id, - data=NodeRetryStreamResponse.Data( - id=workflow_node_execution.id, - node_id=workflow_node_execution.node_id, - node_type=workflow_node_execution.node_type, - index=workflow_node_execution.index, - title=workflow_node_execution.title, - predecessor_node_id=workflow_node_execution.predecessor_node_id, - inputs=workflow_node_execution.inputs, - process_data=workflow_node_execution.process_data, - outputs=workflow_node_execution.outputs, - status=workflow_node_execution.status, - error=workflow_node_execution.error, - elapsed_time=workflow_node_execution.elapsed_time, - execution_metadata=workflow_node_execution.metadata, - created_at=int(workflow_node_execution.created_at.timestamp()), - finished_at=int(workflow_node_execution.finished_at.timestamp()), - files=self.fetch_files_from_node_outputs(workflow_node_execution.outputs or {}), - parallel_id=event.parallel_id, - parallel_start_node_id=event.parallel_start_node_id, - parent_parallel_id=event.parent_parallel_id, - parent_parallel_start_node_id=event.parent_parallel_start_node_id, - iteration_id=event.in_iteration_id, - loop_id=event.in_loop_id, - retry_index=event.retry_index, - ), - ) - - def workflow_parallel_branch_start_to_stream_response( - self, - *, - task_id: str, - workflow_execution_id: str, - event: QueueParallelBranchRunStartedEvent, - ) -> ParallelBranchStartStreamResponse: - return ParallelBranchStartStreamResponse( - task_id=task_id, - workflow_run_id=workflow_execution_id, - data=ParallelBranchStartStreamResponse.Data( - parallel_id=event.parallel_id, - parallel_branch_id=event.parallel_start_node_id, - parent_parallel_id=event.parent_parallel_id, - parent_parallel_start_node_id=event.parent_parallel_start_node_id, - iteration_id=event.in_iteration_id, - loop_id=event.in_loop_id, - created_at=int(time.time()), - ), - ) - - def workflow_parallel_branch_finished_to_stream_response( - self, - *, - task_id: str, - workflow_execution_id: str, - event: QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent, - ) -> ParallelBranchFinishedStreamResponse: - return ParallelBranchFinishedStreamResponse( - task_id=task_id, - workflow_run_id=workflow_execution_id, - data=ParallelBranchFinishedStreamResponse.Data( - parallel_id=event.parallel_id, - parallel_branch_id=event.parallel_start_node_id, - parent_parallel_id=event.parent_parallel_id, - parent_parallel_start_node_id=event.parent_parallel_start_node_id, - iteration_id=event.in_iteration_id, - loop_id=event.in_loop_id, - status="succeeded" if isinstance(event, QueueParallelBranchRunSucceededEvent) else "failed", - error=event.error if isinstance(event, QueueParallelBranchRunFailedEvent) else None, - created_at=int(time.time()), - ), - ) - - def workflow_iteration_start_to_stream_response( - self, - *, - task_id: str, - workflow_execution_id: str, - event: QueueIterationStartEvent, - ) -> IterationNodeStartStreamResponse: - return IterationNodeStartStreamResponse( - task_id=task_id, - workflow_run_id=workflow_execution_id, - data=IterationNodeStartStreamResponse.Data( - id=event.node_id, - node_id=event.node_id, - node_type=event.node_type.value, - title=event.node_data.title, - created_at=int(time.time()), - extras={}, - inputs=event.inputs or {}, - metadata=event.metadata or {}, - parallel_id=event.parallel_id, - parallel_start_node_id=event.parallel_start_node_id, - ), - ) - - def workflow_iteration_next_to_stream_response( - self, - *, - task_id: str, - workflow_execution_id: str, - event: QueueIterationNextEvent, - ) -> IterationNodeNextStreamResponse: - return IterationNodeNextStreamResponse( - task_id=task_id, - workflow_run_id=workflow_execution_id, - data=IterationNodeNextStreamResponse.Data( - id=event.node_id, - node_id=event.node_id, - node_type=event.node_type.value, - title=event.node_data.title, - index=event.index, - pre_iteration_output=event.output, - created_at=int(time.time()), - extras={}, - parallel_id=event.parallel_id, - parallel_start_node_id=event.parallel_start_node_id, - parallel_mode_run_id=event.parallel_mode_run_id, - duration=event.duration, - ), - ) - - def workflow_iteration_completed_to_stream_response( - self, - *, - task_id: str, - workflow_execution_id: str, - event: QueueIterationCompletedEvent, - ) -> IterationNodeCompletedStreamResponse: - return IterationNodeCompletedStreamResponse( - task_id=task_id, - workflow_run_id=workflow_execution_id, - data=IterationNodeCompletedStreamResponse.Data( - id=event.node_id, - node_id=event.node_id, - node_type=event.node_type.value, - title=event.node_data.title, - outputs=event.outputs, - created_at=int(time.time()), - extras={}, - inputs=event.inputs or {}, - status=WorkflowNodeExecutionStatus.SUCCEEDED - if event.error is None - else WorkflowNodeExecutionStatus.FAILED, - error=None, - elapsed_time=(datetime.now(UTC).replace(tzinfo=None) - event.start_at).total_seconds(), - total_tokens=event.metadata.get("total_tokens", 0) if event.metadata else 0, - execution_metadata=event.metadata, - finished_at=int(time.time()), - steps=event.steps, - parallel_id=event.parallel_id, - parallel_start_node_id=event.parallel_start_node_id, - ), - ) - - def workflow_loop_start_to_stream_response( - self, *, task_id: str, workflow_execution_id: str, event: QueueLoopStartEvent - ) -> LoopNodeStartStreamResponse: - return LoopNodeStartStreamResponse( - task_id=task_id, - workflow_run_id=workflow_execution_id, - data=LoopNodeStartStreamResponse.Data( - id=event.node_id, - node_id=event.node_id, - node_type=event.node_type.value, - title=event.node_data.title, - created_at=int(time.time()), - extras={}, - inputs=event.inputs or {}, - metadata=event.metadata or {}, - parallel_id=event.parallel_id, - parallel_start_node_id=event.parallel_start_node_id, - ), - ) - - def workflow_loop_next_to_stream_response( - self, - *, - task_id: str, - workflow_execution_id: str, - event: QueueLoopNextEvent, - ) -> LoopNodeNextStreamResponse: - return LoopNodeNextStreamResponse( - task_id=task_id, - workflow_run_id=workflow_execution_id, - data=LoopNodeNextStreamResponse.Data( - id=event.node_id, - node_id=event.node_id, - node_type=event.node_type.value, - title=event.node_data.title, - index=event.index, - pre_loop_output=event.output, - created_at=int(time.time()), - extras={}, - parallel_id=event.parallel_id, - parallel_start_node_id=event.parallel_start_node_id, - parallel_mode_run_id=event.parallel_mode_run_id, - duration=event.duration, - ), - ) - - def workflow_loop_completed_to_stream_response( - self, - *, - task_id: str, - workflow_execution_id: str, - event: QueueLoopCompletedEvent, - ) -> LoopNodeCompletedStreamResponse: - return LoopNodeCompletedStreamResponse( - task_id=task_id, - workflow_run_id=workflow_execution_id, - data=LoopNodeCompletedStreamResponse.Data( - id=event.node_id, - node_id=event.node_id, - node_type=event.node_type.value, - title=event.node_data.title, - outputs=event.outputs, - created_at=int(time.time()), - extras={}, - inputs=event.inputs or {}, - status=WorkflowNodeExecutionStatus.SUCCEEDED - if event.error is None - else WorkflowNodeExecutionStatus.FAILED, - error=None, - elapsed_time=(datetime.now(UTC).replace(tzinfo=None) - event.start_at).total_seconds(), - total_tokens=event.metadata.get("total_tokens", 0) if event.metadata else 0, - execution_metadata=event.metadata, - finished_at=int(time.time()), - steps=event.steps, - parallel_id=event.parallel_id, - parallel_start_node_id=event.parallel_start_node_id, - ), - ) - - def fetch_files_from_node_outputs(self, outputs_dict: Mapping[str, Any] | None) -> Sequence[Mapping[str, Any]]: - """ - Fetch files from node outputs - :param outputs_dict: node outputs dict - :return: - """ - if not outputs_dict: - return [] - - files = [self._fetch_files_from_variable_value(output_value) for output_value in outputs_dict.values()] - # Remove None - files = [file for file in files if file] - # Flatten list - # Flatten the list of sequences into a single list of mappings - flattened_files = [file for sublist in files if sublist for file in sublist] - - # Convert to tuple to match Sequence type - return tuple(flattened_files) - - def _fetch_files_from_variable_value(self, value: Union[dict, list]) -> Sequence[Mapping[str, Any]]: - """ - Fetch files from variable value - :param value: variable value - :return: - """ - if not value: - return [] - - files = [] - if isinstance(value, list): - for item in value: - file = self._get_file_var_from_value(item) - if file: - files.append(file) - elif isinstance(value, dict): - file = self._get_file_var_from_value(value) - if file: - files.append(file) - - return files - - def _get_file_var_from_value(self, value: Union[dict, list]) -> Mapping[str, Any] | None: - """ - Get file var from value - :param value: variable value - :return: - """ - if not value: - return None - - if isinstance(value, dict) and value.get("dify_model_identity") == FILE_MODEL_IDENTITY: - return value - elif isinstance(value, File): - return value.to_dict() - - return None - def _get_workflow_execution_or_raise_error(self, id: str, /) -> WorkflowExecution: execution = self._workflow_execution_repository.get(id) if not execution: raise WorkflowRunNotFoundError(id) return execution - - def handle_agent_log(self, task_id: str, event: QueueAgentLogEvent) -> AgentLogStreamResponse: - """ - Handle agent log - :param task_id: task id - :param event: agent log event - :return: - """ - return AgentLogStreamResponse( - task_id=task_id, - data=AgentLogStreamResponse.Data( - node_execution_id=event.node_execution_id, - id=event.id, - parent_id=event.parent_id, - label=event.label, - error=event.error, - status=event.status, - data=event.data, - metadata=event.metadata, - node_id=event.node_id, - ), - )