diff --git a/api/core/workflow/nodes/answer/base_stream_processor.py b/api/core/workflow/nodes/answer/base_stream_processor.py index f33ba6cd5d..e4f2478890 100644 --- a/api/core/workflow/nodes/answer/base_stream_processor.py +++ b/api/core/workflow/nodes/answer/base_stream_processor.py @@ -65,7 +65,7 @@ class StreamProcessor(ABC): # Issues: #13626 if ( finished_node_id in self.graph.node_parallel_mapping - and edge.target_node_id not in self.graph.parallel_mapping + and edge.target_node_id not in self.graph.node_parallel_mapping ): continue unreachable_first_node_ids.append(edge.target_node_id) diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py index b7d8f69e8c..2a29ad3e41 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_graph_engine.py @@ -1,5 +1,8 @@ from unittest.mock import patch +import pytest +from flask import Flask + from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult from core.workflow.entities.variable_pool import VariablePool @@ -17,12 +20,20 @@ from core.workflow.graph_engine.entities.event import ( from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.runtime_route_state import RouteNodeState from core.workflow.graph_engine.graph_engine import GraphEngine +from core.workflow.nodes.code.code_node import CodeNode from core.workflow.nodes.event import RunCompletedEvent, RunStreamChunkEvent from core.workflow.nodes.llm.node import LLMNode +from core.workflow.nodes.question_classifier.question_classifier_node import QuestionClassifierNode from models.enums import UserFrom from models.workflow import WorkflowNodeExecutionStatus, WorkflowType +@pytest.fixture +def app(): + app = Flask(__name__) + return app + + @patch("extensions.ext_database.db.session.remove") @patch("extensions.ext_database.db.session.close") def test_run_parallel_in_workflow(mock_close, mock_remove): @@ -502,3 +513,361 @@ def test_run_branch(mock_close, mock_remove): assert isinstance(items[9], GraphRunSucceededEvent) # print(graph_engine.graph_runtime_state.model_dump_json(indent=2)) + + +@patch("extensions.ext_database.db.session.remove") +@patch("extensions.ext_database.db.session.close") +def test_condition_parallel_correct_output(mock_close, mock_remove, app): + """issue #16238, workflow got unexpected additional output""" + + graph_config = { + "edges": [ + { + "data": { + "isInIteration": False, + "isInLoop": False, + "sourceType": "question-classifier", + "targetType": "question-classifier", + }, + "id": "1742382406742-1-1742382480077-target", + "source": "1742382406742", + "sourceHandle": "1", + "target": "1742382480077", + "targetHandle": "target", + "type": "custom", + "zIndex": 0, + }, + { + "data": { + "isInIteration": False, + "isInLoop": False, + "sourceType": "question-classifier", + "targetType": "answer", + }, + "id": "1742382480077-1-1742382531085-target", + "source": "1742382480077", + "sourceHandle": "1", + "target": "1742382531085", + "targetHandle": "target", + "type": "custom", + "zIndex": 0, + }, + { + "data": { + "isInIteration": False, + "isInLoop": False, + "sourceType": "question-classifier", + "targetType": "answer", + }, + "id": "1742382480077-2-1742382534798-target", + "source": "1742382480077", + "sourceHandle": "2", + "target": "1742382534798", + "targetHandle": "target", + "type": "custom", + "zIndex": 0, + }, + { + "data": { + "isInIteration": False, + "isInLoop": False, + "sourceType": "question-classifier", + "targetType": "answer", + }, + "id": "1742382480077-1742382525856-1742382538517-target", + "source": "1742382480077", + "sourceHandle": "1742382525856", + "target": "1742382538517", + "targetHandle": "target", + "type": "custom", + "zIndex": 0, + }, + { + "data": {"isInLoop": False, "sourceType": "start", "targetType": "question-classifier"}, + "id": "1742382361944-source-1742382406742-target", + "source": "1742382361944", + "sourceHandle": "source", + "target": "1742382406742", + "targetHandle": "target", + "type": "custom", + "zIndex": 0, + }, + { + "data": { + "isInIteration": False, + "isInLoop": False, + "sourceType": "question-classifier", + "targetType": "code", + }, + "id": "1742382406742-1-1742451801533-target", + "source": "1742382406742", + "sourceHandle": "1", + "target": "1742451801533", + "targetHandle": "target", + "type": "custom", + "zIndex": 0, + }, + { + "data": {"isInLoop": False, "sourceType": "code", "targetType": "answer"}, + "id": "1742451801533-source-1742434464898-target", + "source": "1742451801533", + "sourceHandle": "source", + "target": "1742434464898", + "targetHandle": "target", + "type": "custom", + "zIndex": 0, + }, + ], + "nodes": [ + { + "data": {"desc": "", "selected": False, "title": "开始", "type": "start", "variables": []}, + "height": 54, + "id": "1742382361944", + "position": {"x": 30, "y": 286}, + "positionAbsolute": {"x": 30, "y": 286}, + "sourcePosition": "right", + "targetPosition": "left", + "type": "custom", + "width": 244, + }, + { + "data": { + "classes": [{"id": "1", "name": "financial"}, {"id": "2", "name": "other"}], + "desc": "", + "instruction": "", + "instructions": "", + "model": { + "completion_params": {"temperature": 0.7}, + "mode": "chat", + "name": "qwen-max-latest", + "provider": "langgenius/tongyi/tongyi", + }, + "query_variable_selector": ["1742382361944", "sys.query"], + "selected": False, + "title": "qc", + "topics": [], + "type": "question-classifier", + "vision": {"enabled": False}, + }, + "height": 172, + "id": "1742382406742", + "position": {"x": 334, "y": 286}, + "positionAbsolute": {"x": 334, "y": 286}, + "selected": False, + "sourcePosition": "right", + "targetPosition": "left", + "type": "custom", + "width": 244, + }, + { + "data": { + "classes": [ + {"id": "1", "name": "VAT"}, + {"id": "2", "name": "Stamp Duty"}, + {"id": "1742382525856", "name": "other"}, + ], + "desc": "", + "instruction": "", + "instructions": "", + "model": { + "completion_params": {"temperature": 0.7}, + "mode": "chat", + "name": "qwen-max-latest", + "provider": "langgenius/tongyi/tongyi", + }, + "query_variable_selector": ["1742382361944", "sys.query"], + "selected": False, + "title": "qc 2", + "topics": [], + "type": "question-classifier", + "vision": {"enabled": False}, + }, + "height": 210, + "id": "1742382480077", + "position": {"x": 638, "y": 452}, + "positionAbsolute": {"x": 638, "y": 452}, + "selected": False, + "sourcePosition": "right", + "targetPosition": "left", + "type": "custom", + "width": 244, + }, + { + "data": { + "answer": "VAT:{{#sys.query#}}\n", + "desc": "", + "selected": False, + "title": "answer 2", + "type": "answer", + "variables": [], + }, + "height": 105, + "id": "1742382531085", + "position": {"x": 942, "y": 486.5}, + "positionAbsolute": {"x": 942, "y": 486.5}, + "selected": False, + "sourcePosition": "right", + "targetPosition": "left", + "type": "custom", + "width": 244, + }, + { + "data": { + "answer": "Stamp Duty:{{#sys.query#}}\n", + "desc": "", + "selected": False, + "title": "answer 3", + "type": "answer", + "variables": [], + }, + "height": 105, + "id": "1742382534798", + "position": {"x": 942, "y": 631.5}, + "positionAbsolute": {"x": 942, "y": 631.5}, + "selected": False, + "sourcePosition": "right", + "targetPosition": "left", + "type": "custom", + "width": 244, + }, + { + "data": { + "answer": "other:{{#sys.query#}}\n", + "desc": "", + "selected": False, + "title": "answer 4", + "type": "answer", + "variables": [], + }, + "height": 105, + "id": "1742382538517", + "position": {"x": 942, "y": 776.5}, + "positionAbsolute": {"x": 942, "y": 776.5}, + "selected": False, + "sourcePosition": "right", + "targetPosition": "left", + "type": "custom", + "width": 244, + }, + { + "data": { + "answer": "{{#1742451801533.result#}}", + "desc": "", + "selected": False, + "title": "Answer 5", + "type": "answer", + "variables": [], + }, + "height": 105, + "id": "1742434464898", + "position": {"x": 942, "y": 274.70425695336615}, + "positionAbsolute": {"x": 942, "y": 274.70425695336615}, + "selected": True, + "sourcePosition": "right", + "targetPosition": "left", + "type": "custom", + "width": 244, + }, + { + "data": { + "code": '\ndef main(arg1: str, arg2: str) -> dict:\n return {\n "result": arg1 + arg2,\n }\n', # noqa: E501 + "code_language": "python3", + "desc": "", + "outputs": {"result": {"children": None, "type": "string"}}, + "selected": False, + "title": "Code", + "type": "code", + "variables": [ + {"value_selector": ["sys", "query"], "variable": "arg1"}, + {"value_selector": ["sys", "query"], "variable": "arg2"}, + ], + }, + "height": 54, + "id": "1742451801533", + "position": {"x": 627.8839285786928, "y": 286}, + "positionAbsolute": {"x": 627.8839285786928, "y": 286}, + "selected": False, + "sourcePosition": "right", + "targetPosition": "left", + "type": "custom", + "width": 244, + }, + ], + } + graph = Graph.init(graph_config) + + # construct variable pool + pool = VariablePool( + system_variables={ + SystemVariableKey.QUERY: "dify", + SystemVariableKey.FILES: [], + SystemVariableKey.CONVERSATION_ID: "abababa", + SystemVariableKey.USER_ID: "1", + }, + user_inputs={}, + environment_variables=[], + ) + pool.add(["pe", "list_output"], ["dify-1", "dify-2"]) + variable_pool = VariablePool( + system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, user_inputs={"query": "hi"} + ) + + graph_engine = GraphEngine( + tenant_id="111", + app_id="222", + workflow_type=WorkflowType.CHAT, + workflow_id="333", + graph_config=graph_config, + user_id="444", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.WEB_APP, + call_depth=0, + graph=graph, + variable_pool=variable_pool, + max_execution_steps=500, + max_execution_time=1200, + ) + + def qc_generator(self): + yield RunCompletedEvent( + run_result=NodeRunResult( + status=WorkflowNodeExecutionStatus.SUCCEEDED, + inputs={}, + process_data={}, + outputs={"class_name": "financial", "class_id": "1"}, + metadata={ + NodeRunMetadataKey.TOTAL_TOKENS: 1, + NodeRunMetadataKey.TOTAL_PRICE: 1, + NodeRunMetadataKey.CURRENCY: "USD", + }, + edge_source_handle="1", + ) + ) + + def code_generator(self): + yield RunCompletedEvent( + run_result=NodeRunResult( + status=WorkflowNodeExecutionStatus.SUCCEEDED, + inputs={}, + process_data={}, + outputs={"result": "dify 123"}, + metadata={ + NodeRunMetadataKey.TOTAL_TOKENS: 1, + NodeRunMetadataKey.TOTAL_PRICE: 1, + NodeRunMetadataKey.CURRENCY: "USD", + }, + ) + ) + + with patch.object(QuestionClassifierNode, "_run", new=qc_generator): + with app.app_context(): + with patch.object(CodeNode, "_run", new=code_generator): + generator = graph_engine.run() + stream_content = "" + res_content = "VAT:\ndify 123" + for item in generator: + if isinstance(item, NodeRunStreamChunkEvent): + stream_content += f"{item.chunk_content}\n" + if isinstance(item, GraphRunSucceededEvent): + assert item.outputs == {"answer": res_content} + assert stream_content == res_content + "\n"