diff --git a/api/core/app/apps/advanced_chat/generate_response_converter.py b/api/core/app/apps/advanced_chat/generate_response_converter.py index 80e8e22e88..08069332ba 100644 --- a/api/core/app/apps/advanced_chat/generate_response_converter.py +++ b/api/core/app/apps/advanced_chat/generate_response_converter.py @@ -8,6 +8,8 @@ from core.app.entities.task_entities import ( ChatbotAppStreamResponse, ErrorStreamResponse, MessageEndStreamResponse, + NodeFinishStreamResponse, + NodeStartStreamResponse, PingStreamResponse, ) @@ -111,6 +113,8 @@ class AdvancedChatAppGenerateResponseConverter(AppGenerateResponseConverter): if isinstance(sub_stream_response, ErrorStreamResponse): data = cls._error_to_stream_response(sub_stream_response.err) response_chunk.update(data) + elif isinstance(sub_stream_response, NodeStartStreamResponse | NodeFinishStreamResponse): + response_chunk.update(sub_stream_response.to_ignore_detail_dict()) else: response_chunk.update(sub_stream_response.to_dict()) diff --git a/api/core/app/apps/workflow/generate_response_converter.py b/api/core/app/apps/workflow/generate_response_converter.py index d907b82c99..88bde58ba0 100644 --- a/api/core/app/apps/workflow/generate_response_converter.py +++ b/api/core/app/apps/workflow/generate_response_converter.py @@ -5,6 +5,8 @@ from typing import cast from core.app.apps.base_app_generate_response_converter import AppGenerateResponseConverter from core.app.entities.task_entities import ( ErrorStreamResponse, + NodeFinishStreamResponse, + NodeStartStreamResponse, PingStreamResponse, WorkflowAppBlockingResponse, WorkflowAppStreamResponse, @@ -68,4 +70,24 @@ class WorkflowAppGenerateResponseConverter(AppGenerateResponseConverter): :param stream_response: stream response :return: """ - return cls.convert_stream_full_response(stream_response) + for chunk in stream_response: + chunk = cast(WorkflowAppStreamResponse, chunk) + sub_stream_response = chunk.stream_response + + if isinstance(sub_stream_response, PingStreamResponse): + yield 'ping' + continue + + response_chunk = { + 'event': sub_stream_response.event.value, + 'workflow_run_id': chunk.workflow_run_id, + } + + if isinstance(sub_stream_response, ErrorStreamResponse): + data = cls._error_to_stream_response(sub_stream_response.err) + response_chunk.update(data) + elif isinstance(sub_stream_response, NodeStartStreamResponse | NodeFinishStreamResponse): + response_chunk.update(sub_stream_response.to_ignore_detail_dict()) + else: + response_chunk.update(sub_stream_response.to_dict()) + yield json.dumps(response_chunk) diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 4994efe2e9..1a11ac9aa3 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -246,6 +246,24 @@ class NodeStartStreamResponse(StreamResponse): workflow_run_id: str data: Data + def to_ignore_detail_dict(self): + return { + "event": self.event.value, + "task_id": self.task_id, + "workflow_run_id": self.workflow_run_id, + "data": { + "id": self.data.id, + "node_id": self.data.node_id, + "node_type": self.data.node_type, + "title": self.data.title, + "index": self.data.index, + "predecessor_node_id": self.data.predecessor_node_id, + "inputs": None, + "created_at": self.data.created_at, + "extras": {} + } + } + class NodeFinishStreamResponse(StreamResponse): """ @@ -276,6 +294,31 @@ class NodeFinishStreamResponse(StreamResponse): workflow_run_id: str data: Data + def to_ignore_detail_dict(self): + return { + "event": self.event.value, + "task_id": self.task_id, + "workflow_run_id": self.workflow_run_id, + "data": { + "id": self.data.id, + "node_id": self.data.node_id, + "node_type": self.data.node_type, + "title": self.data.title, + "index": self.data.index, + "predecessor_node_id": self.data.predecessor_node_id, + "inputs": None, + "process_data": None, + "outputs": None, + "status": self.data.status, + "error": None, + "elapsed_time": self.data.elapsed_time, + "execution_metadata": None, + "created_at": self.data.created_at, + "finished_at": self.data.finished_at, + "files": [] + } + } + class TextChunkStreamResponse(StreamResponse): """