fix: exclude additional unreachable nodes (#16329)

This commit is contained in:
Novice 2025-03-20 16:53:56 +08:00 committed by GitHub
parent 3e84c77bbb
commit 437dcbdd68
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 370 additions and 1 deletions

View File

@ -65,7 +65,7 @@ class StreamProcessor(ABC):
# Issues: #13626
if (
finished_node_id in self.graph.node_parallel_mapping
and edge.target_node_id not in self.graph.parallel_mapping
and edge.target_node_id not in self.graph.node_parallel_mapping
):
continue
unreachable_first_node_ids.append(edge.target_node_id)

View File

@ -1,5 +1,8 @@
from unittest.mock import patch
import pytest
from flask import Flask
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult
from core.workflow.entities.variable_pool import VariablePool
@ -17,12 +20,20 @@ from core.workflow.graph_engine.entities.event import (
from core.workflow.graph_engine.entities.graph import Graph
from core.workflow.graph_engine.entities.runtime_route_state import RouteNodeState
from core.workflow.graph_engine.graph_engine import GraphEngine
from core.workflow.nodes.code.code_node import CodeNode
from core.workflow.nodes.event import RunCompletedEvent, RunStreamChunkEvent
from core.workflow.nodes.llm.node import LLMNode
from core.workflow.nodes.question_classifier.question_classifier_node import QuestionClassifierNode
from models.enums import UserFrom
from models.workflow import WorkflowNodeExecutionStatus, WorkflowType
@pytest.fixture
def app():
app = Flask(__name__)
return app
@patch("extensions.ext_database.db.session.remove")
@patch("extensions.ext_database.db.session.close")
def test_run_parallel_in_workflow(mock_close, mock_remove):
@ -502,3 +513,361 @@ def test_run_branch(mock_close, mock_remove):
assert isinstance(items[9], GraphRunSucceededEvent)
# print(graph_engine.graph_runtime_state.model_dump_json(indent=2))
@patch("extensions.ext_database.db.session.remove")
@patch("extensions.ext_database.db.session.close")
def test_condition_parallel_correct_output(mock_close, mock_remove, app):
"""issue #16238, workflow got unexpected additional output"""
graph_config = {
"edges": [
{
"data": {
"isInIteration": False,
"isInLoop": False,
"sourceType": "question-classifier",
"targetType": "question-classifier",
},
"id": "1742382406742-1-1742382480077-target",
"source": "1742382406742",
"sourceHandle": "1",
"target": "1742382480077",
"targetHandle": "target",
"type": "custom",
"zIndex": 0,
},
{
"data": {
"isInIteration": False,
"isInLoop": False,
"sourceType": "question-classifier",
"targetType": "answer",
},
"id": "1742382480077-1-1742382531085-target",
"source": "1742382480077",
"sourceHandle": "1",
"target": "1742382531085",
"targetHandle": "target",
"type": "custom",
"zIndex": 0,
},
{
"data": {
"isInIteration": False,
"isInLoop": False,
"sourceType": "question-classifier",
"targetType": "answer",
},
"id": "1742382480077-2-1742382534798-target",
"source": "1742382480077",
"sourceHandle": "2",
"target": "1742382534798",
"targetHandle": "target",
"type": "custom",
"zIndex": 0,
},
{
"data": {
"isInIteration": False,
"isInLoop": False,
"sourceType": "question-classifier",
"targetType": "answer",
},
"id": "1742382480077-1742382525856-1742382538517-target",
"source": "1742382480077",
"sourceHandle": "1742382525856",
"target": "1742382538517",
"targetHandle": "target",
"type": "custom",
"zIndex": 0,
},
{
"data": {"isInLoop": False, "sourceType": "start", "targetType": "question-classifier"},
"id": "1742382361944-source-1742382406742-target",
"source": "1742382361944",
"sourceHandle": "source",
"target": "1742382406742",
"targetHandle": "target",
"type": "custom",
"zIndex": 0,
},
{
"data": {
"isInIteration": False,
"isInLoop": False,
"sourceType": "question-classifier",
"targetType": "code",
},
"id": "1742382406742-1-1742451801533-target",
"source": "1742382406742",
"sourceHandle": "1",
"target": "1742451801533",
"targetHandle": "target",
"type": "custom",
"zIndex": 0,
},
{
"data": {"isInLoop": False, "sourceType": "code", "targetType": "answer"},
"id": "1742451801533-source-1742434464898-target",
"source": "1742451801533",
"sourceHandle": "source",
"target": "1742434464898",
"targetHandle": "target",
"type": "custom",
"zIndex": 0,
},
],
"nodes": [
{
"data": {"desc": "", "selected": False, "title": "开始", "type": "start", "variables": []},
"height": 54,
"id": "1742382361944",
"position": {"x": 30, "y": 286},
"positionAbsolute": {"x": 30, "y": 286},
"sourcePosition": "right",
"targetPosition": "left",
"type": "custom",
"width": 244,
},
{
"data": {
"classes": [{"id": "1", "name": "financial"}, {"id": "2", "name": "other"}],
"desc": "",
"instruction": "",
"instructions": "",
"model": {
"completion_params": {"temperature": 0.7},
"mode": "chat",
"name": "qwen-max-latest",
"provider": "langgenius/tongyi/tongyi",
},
"query_variable_selector": ["1742382361944", "sys.query"],
"selected": False,
"title": "qc",
"topics": [],
"type": "question-classifier",
"vision": {"enabled": False},
},
"height": 172,
"id": "1742382406742",
"position": {"x": 334, "y": 286},
"positionAbsolute": {"x": 334, "y": 286},
"selected": False,
"sourcePosition": "right",
"targetPosition": "left",
"type": "custom",
"width": 244,
},
{
"data": {
"classes": [
{"id": "1", "name": "VAT"},
{"id": "2", "name": "Stamp Duty"},
{"id": "1742382525856", "name": "other"},
],
"desc": "",
"instruction": "",
"instructions": "",
"model": {
"completion_params": {"temperature": 0.7},
"mode": "chat",
"name": "qwen-max-latest",
"provider": "langgenius/tongyi/tongyi",
},
"query_variable_selector": ["1742382361944", "sys.query"],
"selected": False,
"title": "qc 2",
"topics": [],
"type": "question-classifier",
"vision": {"enabled": False},
},
"height": 210,
"id": "1742382480077",
"position": {"x": 638, "y": 452},
"positionAbsolute": {"x": 638, "y": 452},
"selected": False,
"sourcePosition": "right",
"targetPosition": "left",
"type": "custom",
"width": 244,
},
{
"data": {
"answer": "VAT:{{#sys.query#}}\n",
"desc": "",
"selected": False,
"title": "answer 2",
"type": "answer",
"variables": [],
},
"height": 105,
"id": "1742382531085",
"position": {"x": 942, "y": 486.5},
"positionAbsolute": {"x": 942, "y": 486.5},
"selected": False,
"sourcePosition": "right",
"targetPosition": "left",
"type": "custom",
"width": 244,
},
{
"data": {
"answer": "Stamp Duty:{{#sys.query#}}\n",
"desc": "",
"selected": False,
"title": "answer 3",
"type": "answer",
"variables": [],
},
"height": 105,
"id": "1742382534798",
"position": {"x": 942, "y": 631.5},
"positionAbsolute": {"x": 942, "y": 631.5},
"selected": False,
"sourcePosition": "right",
"targetPosition": "left",
"type": "custom",
"width": 244,
},
{
"data": {
"answer": "other:{{#sys.query#}}\n",
"desc": "",
"selected": False,
"title": "answer 4",
"type": "answer",
"variables": [],
},
"height": 105,
"id": "1742382538517",
"position": {"x": 942, "y": 776.5},
"positionAbsolute": {"x": 942, "y": 776.5},
"selected": False,
"sourcePosition": "right",
"targetPosition": "left",
"type": "custom",
"width": 244,
},
{
"data": {
"answer": "{{#1742451801533.result#}}",
"desc": "",
"selected": False,
"title": "Answer 5",
"type": "answer",
"variables": [],
},
"height": 105,
"id": "1742434464898",
"position": {"x": 942, "y": 274.70425695336615},
"positionAbsolute": {"x": 942, "y": 274.70425695336615},
"selected": True,
"sourcePosition": "right",
"targetPosition": "left",
"type": "custom",
"width": 244,
},
{
"data": {
"code": '\ndef main(arg1: str, arg2: str) -> dict:\n return {\n "result": arg1 + arg2,\n }\n', # noqa: E501
"code_language": "python3",
"desc": "",
"outputs": {"result": {"children": None, "type": "string"}},
"selected": False,
"title": "Code",
"type": "code",
"variables": [
{"value_selector": ["sys", "query"], "variable": "arg1"},
{"value_selector": ["sys", "query"], "variable": "arg2"},
],
},
"height": 54,
"id": "1742451801533",
"position": {"x": 627.8839285786928, "y": 286},
"positionAbsolute": {"x": 627.8839285786928, "y": 286},
"selected": False,
"sourcePosition": "right",
"targetPosition": "left",
"type": "custom",
"width": 244,
},
],
}
graph = Graph.init(graph_config)
# construct variable pool
pool = VariablePool(
system_variables={
SystemVariableKey.QUERY: "dify",
SystemVariableKey.FILES: [],
SystemVariableKey.CONVERSATION_ID: "abababa",
SystemVariableKey.USER_ID: "1",
},
user_inputs={},
environment_variables=[],
)
pool.add(["pe", "list_output"], ["dify-1", "dify-2"])
variable_pool = VariablePool(
system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, user_inputs={"query": "hi"}
)
graph_engine = GraphEngine(
tenant_id="111",
app_id="222",
workflow_type=WorkflowType.CHAT,
workflow_id="333",
graph_config=graph_config,
user_id="444",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.WEB_APP,
call_depth=0,
graph=graph,
variable_pool=variable_pool,
max_execution_steps=500,
max_execution_time=1200,
)
def qc_generator(self):
yield RunCompletedEvent(
run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs={},
process_data={},
outputs={"class_name": "financial", "class_id": "1"},
metadata={
NodeRunMetadataKey.TOTAL_TOKENS: 1,
NodeRunMetadataKey.TOTAL_PRICE: 1,
NodeRunMetadataKey.CURRENCY: "USD",
},
edge_source_handle="1",
)
)
def code_generator(self):
yield RunCompletedEvent(
run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs={},
process_data={},
outputs={"result": "dify 123"},
metadata={
NodeRunMetadataKey.TOTAL_TOKENS: 1,
NodeRunMetadataKey.TOTAL_PRICE: 1,
NodeRunMetadataKey.CURRENCY: "USD",
},
)
)
with patch.object(QuestionClassifierNode, "_run", new=qc_generator):
with app.app_context():
with patch.object(CodeNode, "_run", new=code_generator):
generator = graph_engine.run()
stream_content = ""
res_content = "VAT:\ndify 123"
for item in generator:
if isinstance(item, NodeRunStreamChunkEvent):
stream_content += f"{item.chunk_content}\n"
if isinstance(item, GraphRunSucceededEvent):
assert item.outputs == {"answer": res_content}
assert stream_content == res_content + "\n"