This commit is contained in:
takatost 2024-08-25 22:02:21 +08:00
parent 4771e85630
commit 6c61776ee1
16 changed files with 1304 additions and 1445 deletions

View File

@ -53,7 +53,10 @@ class HttpRequestNode(BaseNode):
node_data: HttpRequestNodeData = cast(HttpRequestNodeData, self.node_data) node_data: HttpRequestNodeData = cast(HttpRequestNodeData, self.node_data)
# TODO: Switch to use segment directly # TODO: Switch to use segment directly
if node_data.authorization.config and node_data.authorization.config.api_key: if node_data.authorization.config and node_data.authorization.config.api_key:
node_data.authorization.config.api_key = parser.convert_template(template=node_data.authorization.config.api_key, variable_pool=variable_pool).text node_data.authorization.config.api_key = parser.convert_template(
template=node_data.authorization.config.api_key,
variable_pool=self.graph_runtime_state.variable_pool
).text
# init http executor # init http executor
http_executor = None http_executor = None

View File

@ -1,17 +1,72 @@
import time
import uuid
from os import getenv from os import getenv
from typing import cast
import pytest import pytest
from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities.node_entities import UserFrom from core.workflow.entities.node_entities import NodeRunResult, UserFrom
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine.entities.graph import Graph
from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from core.workflow.nodes.code.code_node import CodeNode from core.workflow.nodes.code.code_node import CodeNode
from models.workflow import WorkflowNodeExecutionStatus from core.workflow.nodes.code.entities import CodeNodeData
from models.workflow import WorkflowNodeExecutionStatus, WorkflowType
from tests.integration_tests.workflow.nodes.__mock.code_executor import setup_code_executor_mock from tests.integration_tests.workflow.nodes.__mock.code_executor import setup_code_executor_mock
CODE_MAX_STRING_LENGTH = int(getenv("CODE_MAX_STRING_LENGTH", "10000")) CODE_MAX_STRING_LENGTH = int(getenv("CODE_MAX_STRING_LENGTH", "10000"))
def init_code_node(code_config: dict):
graph_config = {
"edges": [
{
"id": "start-source-code-target",
"source": "start",
"target": "code",
},
],
"nodes": [{"data": {"type": "start"}, "id": "start"}, code_config],
}
graph = Graph.init(graph_config=graph_config)
init_params = GraphInitParams(
tenant_id="1",
app_id="1",
workflow_type=WorkflowType.WORKFLOW,
workflow_id="1",
graph_config=graph_config,
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
)
# construct variable pool
variable_pool = VariablePool(
system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"},
user_inputs={},
environment_variables=[],
conversation_variables=[],
)
variable_pool.add(["code", "123", "args1"], 1)
variable_pool.add(["code", "123", "args2"], 2)
node = CodeNode(
id=str(uuid.uuid4()),
graph_init_params=init_params,
graph=graph,
graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()),
config=code_config,
)
return node
@pytest.mark.parametrize("setup_code_executor_mock", [["none"]], indirect=True) @pytest.mark.parametrize("setup_code_executor_mock", [["none"]], indirect=True)
def test_execute_code(setup_code_executor_mock): def test_execute_code(setup_code_executor_mock):
code = """ code = """
@ -22,16 +77,9 @@ def test_execute_code(setup_code_executor_mock):
""" """
# trim first 4 spaces at the beginning of each line # trim first 4 spaces at the beginning of each line
code = "\n".join([line[4:] for line in code.split("\n")]) code = "\n".join([line[4:] for line in code.split("\n")])
node = CodeNode(
id='test', code_config = {
tenant_id="1", "id": "code",
app_id="1",
workflow_id="1",
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.WEB_APP,
config={
"id": "1",
"data": { "data": {
"outputs": { "outputs": {
"result": { "result": {
@ -50,17 +98,15 @@ def test_execute_code(setup_code_executor_mock):
"code_language": "python3", "code_language": "python3",
"code": code, "code": code,
}, },
}, }
)
# construct variable pool node = init_code_node(code_config)
pool = VariablePool(system_variables={}, user_inputs={}, environment_variables=[])
pool.add(["1", "123", "args1"], 1)
pool.add(["1", "123", "args2"], 2)
# execute node # execute node
result = node.run(pool) result = node._run()
assert isinstance(result, NodeRunResult)
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs is not None
assert result.outputs["result"] == 3 assert result.outputs["result"] == 3
assert result.error is None assert result.error is None
@ -75,16 +121,9 @@ def test_execute_code_output_validator(setup_code_executor_mock):
""" """
# trim first 4 spaces at the beginning of each line # trim first 4 spaces at the beginning of each line
code = "\n".join([line[4:] for line in code.split("\n")]) code = "\n".join([line[4:] for line in code.split("\n")])
node = CodeNode(
id='test', code_config = {
tenant_id="1", "id": "code",
app_id="1",
workflow_id="1",
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.WEB_APP,
config={
"id": "1",
"data": { "data": {
"outputs": { "outputs": {
"result": { "result": {
@ -103,17 +142,13 @@ def test_execute_code_output_validator(setup_code_executor_mock):
"code_language": "python3", "code_language": "python3",
"code": code, "code": code,
}, },
}, }
)
# construct variable pool node = init_code_node(code_config)
pool = VariablePool(system_variables={}, user_inputs={}, environment_variables=[])
pool.add(["1", "123", "args1"], 1)
pool.add(["1", "123", "args2"], 2)
# execute node # execute node
result = node.run(pool) result = node._run()
assert isinstance(result, NodeRunResult)
assert result.status == WorkflowNodeExecutionStatus.FAILED assert result.status == WorkflowNodeExecutionStatus.FAILED
assert result.error == "Output variable `result` must be a string" assert result.error == "Output variable `result` must be a string"
@ -129,15 +164,9 @@ def test_execute_code_output_validator_depth():
""" """
# trim first 4 spaces at the beginning of each line # trim first 4 spaces at the beginning of each line
code = "\n".join([line[4:] for line in code.split("\n")]) code = "\n".join([line[4:] for line in code.split("\n")])
node = CodeNode(
tenant_id="1", code_config = {
app_id="1", "id": "code",
workflow_id="1",
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.WEB_APP,
config={
"id": "1",
"data": { "data": {
"outputs": { "outputs": {
"string_validator": { "string_validator": {
@ -186,8 +215,9 @@ def test_execute_code_output_validator_depth():
"code_language": "python3", "code_language": "python3",
"code": code, "code": code,
}, },
}, }
)
node = init_code_node(code_config)
# construct result # construct result
result = { result = {
@ -198,6 +228,8 @@ def test_execute_code_output_validator_depth():
"object_validator": {"result": 1, "depth": {"depth": {"depth": 1}}}, "object_validator": {"result": 1, "depth": {"depth": {"depth": 1}}},
} }
node.node_data = cast(CodeNodeData, node.node_data)
# validate # validate
node._transform_result(result, node.node_data.outputs) node._transform_result(result, node.node_data.outputs)
@ -252,15 +284,9 @@ def test_execute_code_output_object_list():
""" """
# trim first 4 spaces at the beginning of each line # trim first 4 spaces at the beginning of each line
code = "\n".join([line[4:] for line in code.split("\n")]) code = "\n".join([line[4:] for line in code.split("\n")])
node = CodeNode(
tenant_id="1", code_config = {
app_id="1", "id": "code",
workflow_id="1",
user_id="1",
invoke_from=InvokeFrom.WEB_APP,
user_from=UserFrom.ACCOUNT,
config={
"id": "1",
"data": { "data": {
"outputs": { "outputs": {
"object_list": { "object_list": {
@ -279,8 +305,9 @@ def test_execute_code_output_object_list():
"code_language": "python3", "code_language": "python3",
"code": code, "code": code,
}, },
}, }
)
node = init_code_node(code_config)
# construct result # construct result
result = { result = {
@ -297,6 +324,8 @@ def test_execute_code_output_object_list():
] ]
} }
node.node_data = cast(CodeNodeData, node.node_data)
# validate # validate
node._transform_result(result, node.node_data.outputs) node._transform_result(result, node.node_data.outputs)

View File

@ -1,3 +1,5 @@
import time
import uuid
from urllib.parse import urlencode from urllib.parse import urlencode
import pytest import pytest
@ -5,27 +7,63 @@ import pytest
from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities.node_entities import UserFrom from core.workflow.entities.node_entities import UserFrom
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine.entities.graph import Graph
from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from core.workflow.nodes.http_request.http_request_node import HttpRequestNode from core.workflow.nodes.http_request.http_request_node import HttpRequestNode
from models.workflow import WorkflowType
from tests.integration_tests.workflow.nodes.__mock.http import setup_http_mock from tests.integration_tests.workflow.nodes.__mock.http import setup_http_mock
BASIC_NODE_DATA = {
"tenant_id": "1", def init_http_node(config: dict):
"app_id": "1", graph_config = {
"workflow_id": "1", "edges": [
"user_id": "1", {
"user_from": UserFrom.ACCOUNT, "id": "start-source-next-target",
"invoke_from": InvokeFrom.WEB_APP, "source": "start",
"target": "1",
},
],
"nodes": [{"data": {"type": "start"}, "id": "start"}, config],
} }
graph = Graph.init(graph_config=graph_config)
init_params = GraphInitParams(
tenant_id="1",
app_id="1",
workflow_type=WorkflowType.WORKFLOW,
workflow_id="1",
graph_config=graph_config,
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
)
# construct variable pool # construct variable pool
pool = VariablePool(system_variables={}, user_inputs={}, environment_variables=[]) variable_pool = VariablePool(
pool.add(["a", "b123", "args1"], 1) system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"},
pool.add(["a", "b123", "args2"], 2) user_inputs={},
environment_variables=[],
conversation_variables=[],
)
variable_pool.add(["a", "b123", "args1"], 1)
variable_pool.add(["a", "b123", "args2"], 2)
return HttpRequestNode(
id=str(uuid.uuid4()),
graph_init_params=init_params,
graph=graph,
graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()),
config=config,
)
@pytest.mark.parametrize("setup_http_mock", [["none"]], indirect=True) @pytest.mark.parametrize("setup_http_mock", [["none"]], indirect=True)
def test_get(setup_http_mock): def test_get(setup_http_mock):
node = HttpRequestNode( node = init_http_node(
config={ config={
"id": "1", "id": "1",
"data": { "data": {
@ -45,12 +83,11 @@ def test_get(setup_http_mock):
"params": "A:b", "params": "A:b",
"body": None, "body": None,
}, },
}, }
**BASIC_NODE_DATA,
) )
result = node.run(pool) result = node._run()
assert result.process_data is not None
data = result.process_data.get("request", "") data = result.process_data.get("request", "")
assert "?A=b" in data assert "?A=b" in data
@ -59,7 +96,7 @@ def test_get(setup_http_mock):
@pytest.mark.parametrize("setup_http_mock", [["none"]], indirect=True) @pytest.mark.parametrize("setup_http_mock", [["none"]], indirect=True)
def test_no_auth(setup_http_mock): def test_no_auth(setup_http_mock):
node = HttpRequestNode( node = init_http_node(
config={ config={
"id": "1", "id": "1",
"data": { "data": {
@ -75,12 +112,11 @@ def test_no_auth(setup_http_mock):
"params": "A:b", "params": "A:b",
"body": None, "body": None,
}, },
}, }
**BASIC_NODE_DATA,
) )
result = node.run(pool) result = node._run()
assert result.process_data is not None
data = result.process_data.get("request", "") data = result.process_data.get("request", "")
assert "?A=b" in data assert "?A=b" in data
@ -89,7 +125,7 @@ def test_no_auth(setup_http_mock):
@pytest.mark.parametrize("setup_http_mock", [["none"]], indirect=True) @pytest.mark.parametrize("setup_http_mock", [["none"]], indirect=True)
def test_custom_authorization_header(setup_http_mock): def test_custom_authorization_header(setup_http_mock):
node = HttpRequestNode( node = init_http_node(
config={ config={
"id": "1", "id": "1",
"data": { "data": {
@ -109,12 +145,11 @@ def test_custom_authorization_header(setup_http_mock):
"params": "A:b", "params": "A:b",
"body": None, "body": None,
}, },
}, }
**BASIC_NODE_DATA,
) )
result = node.run(pool) result = node._run()
assert result.process_data is not None
data = result.process_data.get("request", "") data = result.process_data.get("request", "")
assert "?A=b" in data assert "?A=b" in data
@ -123,7 +158,7 @@ def test_custom_authorization_header(setup_http_mock):
@pytest.mark.parametrize("setup_http_mock", [["none"]], indirect=True) @pytest.mark.parametrize("setup_http_mock", [["none"]], indirect=True)
def test_template(setup_http_mock): def test_template(setup_http_mock):
node = HttpRequestNode( node = init_http_node(
config={ config={
"id": "1", "id": "1",
"data": { "data": {
@ -143,11 +178,11 @@ def test_template(setup_http_mock):
"params": "A:b\nTemplate:{{#a.b123.args2#}}", "params": "A:b\nTemplate:{{#a.b123.args2#}}",
"body": None, "body": None,
}, },
}, }
**BASIC_NODE_DATA,
) )
result = node.run(pool) result = node._run()
assert result.process_data is not None
data = result.process_data.get("request", "") data = result.process_data.get("request", "")
assert "?A=b" in data assert "?A=b" in data
@ -158,7 +193,7 @@ def test_template(setup_http_mock):
@pytest.mark.parametrize("setup_http_mock", [["none"]], indirect=True) @pytest.mark.parametrize("setup_http_mock", [["none"]], indirect=True)
def test_json(setup_http_mock): def test_json(setup_http_mock):
node = HttpRequestNode( node = init_http_node(
config={ config={
"id": "1", "id": "1",
"data": { "data": {
@ -178,11 +213,11 @@ def test_json(setup_http_mock):
"params": "A:b", "params": "A:b",
"body": {"type": "json", "data": '{"a": "{{#a.b123.args1#}}"}'}, "body": {"type": "json", "data": '{"a": "{{#a.b123.args1#}}"}'},
}, },
}, }
**BASIC_NODE_DATA,
) )
result = node.run(pool) result = node._run()
assert result.process_data is not None
data = result.process_data.get("request", "") data = result.process_data.get("request", "")
assert '{"a": "1"}' in data assert '{"a": "1"}' in data
@ -190,7 +225,7 @@ def test_json(setup_http_mock):
def test_x_www_form_urlencoded(setup_http_mock): def test_x_www_form_urlencoded(setup_http_mock):
node = HttpRequestNode( node = init_http_node(
config={ config={
"id": "1", "id": "1",
"data": { "data": {
@ -210,11 +245,11 @@ def test_x_www_form_urlencoded(setup_http_mock):
"params": "A:b", "params": "A:b",
"body": {"type": "x-www-form-urlencoded", "data": "a:{{#a.b123.args1#}}\nb:{{#a.b123.args2#}}"}, "body": {"type": "x-www-form-urlencoded", "data": "a:{{#a.b123.args1#}}\nb:{{#a.b123.args2#}}"},
}, },
}, }
**BASIC_NODE_DATA,
) )
result = node.run(pool) result = node._run()
assert result.process_data is not None
data = result.process_data.get("request", "") data = result.process_data.get("request", "")
assert "a=1&b=2" in data assert "a=1&b=2" in data
@ -222,7 +257,7 @@ def test_x_www_form_urlencoded(setup_http_mock):
def test_form_data(setup_http_mock): def test_form_data(setup_http_mock):
node = HttpRequestNode( node = init_http_node(
config={ config={
"id": "1", "id": "1",
"data": { "data": {
@ -242,11 +277,11 @@ def test_form_data(setup_http_mock):
"params": "A:b", "params": "A:b",
"body": {"type": "form-data", "data": "a:{{#a.b123.args1#}}\nb:{{#a.b123.args2#}}"}, "body": {"type": "form-data", "data": "a:{{#a.b123.args1#}}\nb:{{#a.b123.args2#}}"},
}, },
}, }
**BASIC_NODE_DATA,
) )
result = node.run(pool) result = node._run()
assert result.process_data is not None
data = result.process_data.get("request", "") data = result.process_data.get("request", "")
assert 'form-data; name="a"' in data assert 'form-data; name="a"' in data
@ -257,7 +292,7 @@ def test_form_data(setup_http_mock):
def test_none_data(setup_http_mock): def test_none_data(setup_http_mock):
node = HttpRequestNode( node = init_http_node(
config={ config={
"id": "1", "id": "1",
"data": { "data": {
@ -277,11 +312,11 @@ def test_none_data(setup_http_mock):
"params": "A:b", "params": "A:b",
"body": {"type": "none", "data": "123123123"}, "body": {"type": "none", "data": "123123123"},
}, },
}, }
**BASIC_NODE_DATA,
) )
result = node.run(pool) result = node._run()
assert result.process_data is not None
data = result.process_data.get("request", "") data = result.process_data.get("request", "")
assert "X-Header: 123" in data assert "X-Header: 123" in data
@ -289,7 +324,7 @@ def test_none_data(setup_http_mock):
def test_mock_404(setup_http_mock): def test_mock_404(setup_http_mock):
node = HttpRequestNode( node = init_http_node(
config={ config={
"id": "1", "id": "1",
"data": { "data": {
@ -305,19 +340,19 @@ def test_mock_404(setup_http_mock):
"params": "", "params": "",
"headers": "X-Header:123", "headers": "X-Header:123",
}, },
}, }
**BASIC_NODE_DATA,
) )
result = node.run(pool) result = node._run()
assert result.outputs is not None
resp = result.outputs resp = result.outputs
assert 404 == resp.get("status_code") assert 404 == resp.get("status_code")
assert "Not Found" in resp.get("body") assert "Not Found" in resp.get("body", "")
def test_multi_colons_parse(setup_http_mock): def test_multi_colons_parse(setup_http_mock):
node = HttpRequestNode( node = init_http_node(
config={ config={
"id": "1", "id": "1",
"data": { "data": {
@ -333,13 +368,14 @@ def test_multi_colons_parse(setup_http_mock):
"headers": "Referer:http://example3.com\nRedirect:http://example4.com", "headers": "Referer:http://example3.com\nRedirect:http://example4.com",
"body": {"type": "form-data", "data": "Referer:http://example5.com\nRedirect:http://example6.com"}, "body": {"type": "form-data", "data": "Referer:http://example5.com\nRedirect:http://example6.com"},
}, },
}, }
**BASIC_NODE_DATA,
) )
result = node.run(pool) result = node._run()
assert result.process_data is not None
assert result.outputs is not None
resp = result.outputs resp = result.outputs
assert urlencode({"Redirect": "http://example2.com"}) in result.process_data.get("request") assert urlencode({"Redirect": "http://example2.com"}) in result.process_data.get("request", "")
assert 'form-data; name="Redirect"\n\nhttp://example6.com' in result.process_data.get("request") assert 'form-data; name="Redirect"\n\nhttp://example6.com' in result.process_data.get("request", "")
assert "http://example3.com" == resp.get("headers").get("referer") assert "http://example3.com" == resp.get("headers", {}).get("referer")

View File

@ -1,5 +1,8 @@
import json import json
import os import os
import time
import uuid
from collections.abc import Generator
from unittest.mock import MagicMock from unittest.mock import MagicMock
import pytest import pytest
@ -13,25 +16,74 @@ from core.model_runtime.model_providers import ModelProviderFactory
from core.workflow.entities.node_entities import UserFrom from core.workflow.entities.node_entities import UserFrom
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
from core.workflow.enums import SystemVariableKey from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine.entities.graph import Graph
from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from core.workflow.nodes.event import RunCompletedEvent
from core.workflow.nodes.llm.llm_node import LLMNode from core.workflow.nodes.llm.llm_node import LLMNode
from extensions.ext_database import db from extensions.ext_database import db
from models.provider import ProviderType from models.provider import ProviderType
from models.workflow import WorkflowNodeExecutionStatus from models.workflow import WorkflowNodeExecutionStatus, WorkflowType
"""FOR MOCK FIXTURES, DO NOT REMOVE""" """FOR MOCK FIXTURES, DO NOT REMOVE"""
from tests.integration_tests.model_runtime.__mock.openai import setup_openai_mock from tests.integration_tests.model_runtime.__mock.openai import setup_openai_mock
from tests.integration_tests.workflow.nodes.__mock.code_executor import setup_code_executor_mock from tests.integration_tests.workflow.nodes.__mock.code_executor import setup_code_executor_mock
@pytest.mark.parametrize("setup_openai_mock", [["chat"]], indirect=True) def init_llm_node(config: dict) -> LLMNode:
def test_execute_llm(setup_openai_mock): graph_config = {
node = LLMNode( "edges": [
{
"id": "start-source-next-target",
"source": "start",
"target": "llm",
},
],
"nodes": [{"data": {"type": "start"}, "id": "start"}, config],
}
graph = Graph.init(graph_config=graph_config)
init_params = GraphInitParams(
tenant_id="1", tenant_id="1",
app_id="1", app_id="1",
workflow_type=WorkflowType.WORKFLOW,
workflow_id="1", workflow_id="1",
graph_config=graph_config,
user_id="1", user_id="1",
invoke_from=InvokeFrom.WEB_APP,
user_from=UserFrom.ACCOUNT, user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
)
# construct variable pool
variable_pool = VariablePool(
system_variables={
SystemVariableKey.QUERY: "what's the weather today?",
SystemVariableKey.FILES: [],
SystemVariableKey.CONVERSATION_ID: "abababa",
SystemVariableKey.USER_ID: "aaa",
},
user_inputs={},
environment_variables=[],
conversation_variables=[],
)
variable_pool.add(["abc", "output"], "sunny")
node = LLMNode(
id=str(uuid.uuid4()),
graph_init_params=init_params,
graph=graph,
graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()),
config=config,
)
return node
@pytest.mark.parametrize("setup_openai_mock", [["chat"]], indirect=True)
def test_execute_llm(setup_openai_mock):
node = init_llm_node(
config={ config={
"id": "llm", "id": "llm",
"data": { "data": {
@ -49,19 +101,6 @@ def test_execute_llm(setup_openai_mock):
}, },
) )
# construct variable pool
pool = VariablePool(
system_variables={
SystemVariableKey.QUERY: "what's the weather today?",
SystemVariableKey.FILES: [],
SystemVariableKey.CONVERSATION_ID: "abababa",
SystemVariableKey.USER_ID: "aaa",
},
user_inputs={},
environment_variables=[],
)
pool.add(["abc", "output"], "sunny")
credentials = {"openai_api_key": os.environ.get("OPENAI_API_KEY")} credentials = {"openai_api_key": os.environ.get("OPENAI_API_KEY")}
provider_instance = ModelProviderFactory().get_provider_instance("openai") provider_instance = ModelProviderFactory().get_provider_instance("openai")
@ -80,13 +119,15 @@ def test_execute_llm(setup_openai_mock):
model_type_instance=model_type_instance, model_type_instance=model_type_instance,
) )
model_instance = ModelInstance(provider_model_bundle=provider_model_bundle, model="gpt-3.5-turbo") model_instance = ModelInstance(provider_model_bundle=provider_model_bundle, model="gpt-3.5-turbo")
model_schema = model_type_instance.get_model_schema("gpt-3.5-turbo")
assert model_schema is not None
model_config = ModelConfigWithCredentialsEntity( model_config = ModelConfigWithCredentialsEntity(
model="gpt-3.5-turbo", model="gpt-3.5-turbo",
provider="openai", provider="openai",
mode="chat", mode="chat",
credentials=credentials, credentials=credentials,
parameters={}, parameters={},
model_schema=model_type_instance.get_model_schema("gpt-3.5-turbo"), model_schema=model_schema,
provider_model_bundle=provider_model_bundle, provider_model_bundle=provider_model_bundle,
) )
@ -96,11 +137,16 @@ def test_execute_llm(setup_openai_mock):
node._fetch_model_config = MagicMock(return_value=(model_instance, model_config)) node._fetch_model_config = MagicMock(return_value=(model_instance, model_config))
# execute node # execute node
result = node.run(pool) result = node._run()
assert isinstance(result, Generator)
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED for item in result:
assert result.outputs["text"] is not None if isinstance(item, RunCompletedEvent):
assert result.outputs["usage"]["total_tokens"] > 0 assert item.run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert item.run_result.process_data is not None
assert item.run_result.outputs is not None
assert item.run_result.outputs.get("text") is not None
assert item.run_result.outputs.get("usage", {})["total_tokens"] > 0
@pytest.mark.parametrize("setup_code_executor_mock", [["none"]], indirect=True) @pytest.mark.parametrize("setup_code_executor_mock", [["none"]], indirect=True)
@ -109,13 +155,7 @@ def test_execute_llm_with_jinja2(setup_code_executor_mock, setup_openai_mock):
""" """
Test execute LLM node with jinja2 Test execute LLM node with jinja2
""" """
node = LLMNode( node = init_llm_node(
tenant_id="1",
app_id="1",
workflow_id="1",
user_id="1",
invoke_from=InvokeFrom.WEB_APP,
user_from=UserFrom.ACCOUNT,
config={ config={
"id": "llm", "id": "llm",
"data": { "data": {
@ -149,19 +189,6 @@ def test_execute_llm_with_jinja2(setup_code_executor_mock, setup_openai_mock):
}, },
) )
# construct variable pool
pool = VariablePool(
system_variables={
SystemVariableKey.QUERY: "what's the weather today?",
SystemVariableKey.FILES: [],
SystemVariableKey.CONVERSATION_ID: "abababa",
SystemVariableKey.USER_ID: "aaa",
},
user_inputs={},
environment_variables=[],
)
pool.add(["abc", "output"], "sunny")
credentials = {"openai_api_key": os.environ.get("OPENAI_API_KEY")} credentials = {"openai_api_key": os.environ.get("OPENAI_API_KEY")}
provider_instance = ModelProviderFactory().get_provider_instance("openai") provider_instance = ModelProviderFactory().get_provider_instance("openai")
@ -181,14 +208,15 @@ def test_execute_llm_with_jinja2(setup_code_executor_mock, setup_openai_mock):
) )
model_instance = ModelInstance(provider_model_bundle=provider_model_bundle, model="gpt-3.5-turbo") model_instance = ModelInstance(provider_model_bundle=provider_model_bundle, model="gpt-3.5-turbo")
model_schema = model_type_instance.get_model_schema("gpt-3.5-turbo")
assert model_schema is not None
model_config = ModelConfigWithCredentialsEntity( model_config = ModelConfigWithCredentialsEntity(
model="gpt-3.5-turbo", model="gpt-3.5-turbo",
provider="openai", provider="openai",
mode="chat", mode="chat",
credentials=credentials, credentials=credentials,
parameters={}, parameters={},
model_schema=model_type_instance.get_model_schema("gpt-3.5-turbo"), model_schema=model_schema,
provider_model_bundle=provider_model_bundle, provider_model_bundle=provider_model_bundle,
) )
@ -198,8 +226,11 @@ def test_execute_llm_with_jinja2(setup_code_executor_mock, setup_openai_mock):
node._fetch_model_config = MagicMock(return_value=(model_instance, model_config)) node._fetch_model_config = MagicMock(return_value=(model_instance, model_config))
# execute node # execute node
result = node.run(pool) result = node._run()
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED for item in result:
assert "sunny" in json.dumps(result.process_data) if isinstance(item, RunCompletedEvent):
assert "what's the weather today?" in json.dumps(result.process_data) assert item.run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert item.run_result.process_data is not None
assert "sunny" in json.dumps(item.run_result.process_data)
assert "what's the weather today?" in json.dumps(item.run_result.process_data)

View File

@ -1,5 +1,7 @@
import json import json
import os import os
import time
import uuid
from typing import Optional from typing import Optional
from unittest.mock import MagicMock from unittest.mock import MagicMock
@ -14,12 +16,15 @@ from core.model_runtime.model_providers.model_provider_factory import ModelProvi
from core.workflow.entities.node_entities import UserFrom from core.workflow.entities.node_entities import UserFrom
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
from core.workflow.enums import SystemVariableKey from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine.entities.graph import Graph
from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from core.workflow.nodes.parameter_extractor.parameter_extractor_node import ParameterExtractorNode from core.workflow.nodes.parameter_extractor.parameter_extractor_node import ParameterExtractorNode
from extensions.ext_database import db from extensions.ext_database import db
from models.provider import ProviderType from models.provider import ProviderType
"""FOR MOCK FIXTURES, DO NOT REMOVE""" """FOR MOCK FIXTURES, DO NOT REMOVE"""
from models.workflow import WorkflowNodeExecutionStatus from models.workflow import WorkflowNodeExecutionStatus, WorkflowType
from tests.integration_tests.model_runtime.__mock.anthropic import setup_anthropic_mock from tests.integration_tests.model_runtime.__mock.anthropic import setup_anthropic_mock
from tests.integration_tests.model_runtime.__mock.openai import setup_openai_mock from tests.integration_tests.model_runtime.__mock.openai import setup_openai_mock
@ -46,13 +51,15 @@ def get_mocked_fetch_model_config(
model_type_instance=model_type_instance, model_type_instance=model_type_instance,
) )
model_instance = ModelInstance(provider_model_bundle=provider_model_bundle, model=model) model_instance = ModelInstance(provider_model_bundle=provider_model_bundle, model=model)
model_schema = model_type_instance.get_model_schema(model)
assert model_schema is not None
model_config = ModelConfigWithCredentialsEntity( model_config = ModelConfigWithCredentialsEntity(
model=model, model=model,
provider=provider, provider=provider,
mode=mode, mode=mode,
credentials=credentials, credentials=credentials,
parameters={}, parameters={},
model_schema=model_type_instance.get_model_schema(model), model_schema=model_schema,
provider_model_bundle=provider_model_bundle, provider_model_bundle=provider_model_bundle,
) )
@ -73,18 +80,62 @@ def get_mocked_fetch_memory(memory_text: str):
return MagicMock(return_value=MemoryMock()) return MagicMock(return_value=MemoryMock())
def init_parameter_extractor_node(config: dict):
graph_config = {
"edges": [
{
"id": "start-source-next-target",
"source": "start",
"target": "llm",
},
],
"nodes": [{"data": {"type": "start"}, "id": "start"}, config],
}
graph = Graph.init(graph_config=graph_config)
init_params = GraphInitParams(
tenant_id="1",
app_id="1",
workflow_type=WorkflowType.WORKFLOW,
workflow_id="1",
graph_config=graph_config,
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
)
# construct variable pool
variable_pool = VariablePool(
system_variables={
SystemVariableKey.QUERY: "what's the weather in SF",
SystemVariableKey.FILES: [],
SystemVariableKey.CONVERSATION_ID: "abababa",
SystemVariableKey.USER_ID: "aaa",
},
user_inputs={},
environment_variables=[],
conversation_variables=[],
)
variable_pool.add(["a", "b123", "args1"], 1)
variable_pool.add(["a", "b123", "args2"], 2)
return ParameterExtractorNode(
id=str(uuid.uuid4()),
graph_init_params=init_params,
graph=graph,
graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()),
config=config,
)
@pytest.mark.parametrize("setup_openai_mock", [["chat"]], indirect=True) @pytest.mark.parametrize("setup_openai_mock", [["chat"]], indirect=True)
def test_function_calling_parameter_extractor(setup_openai_mock): def test_function_calling_parameter_extractor(setup_openai_mock):
""" """
Test function calling for parameter extractor. Test function calling for parameter extractor.
""" """
node = ParameterExtractorNode( node = init_parameter_extractor_node(
tenant_id="1",
app_id="1",
workflow_id="1",
user_id="1",
invoke_from=InvokeFrom.WEB_APP,
user_from=UserFrom.ACCOUNT,
config={ config={
"id": "llm", "id": "llm",
"data": { "data": {
@ -97,7 +148,7 @@ def test_function_calling_parameter_extractor(setup_openai_mock):
"reasoning_mode": "function_call", "reasoning_mode": "function_call",
"memory": None, "memory": None,
}, },
}, }
) )
node._fetch_model_config = get_mocked_fetch_model_config( node._fetch_model_config = get_mocked_fetch_model_config(
@ -120,9 +171,10 @@ def test_function_calling_parameter_extractor(setup_openai_mock):
environment_variables=[], environment_variables=[],
) )
result = node.run(pool) result = node._run()
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs is not None
assert result.outputs.get("location") == "kawaii" assert result.outputs.get("location") == "kawaii"
assert result.outputs.get("__reason") == None assert result.outputs.get("__reason") == None
@ -132,13 +184,7 @@ def test_instructions(setup_openai_mock):
""" """
Test chat parameter extractor. Test chat parameter extractor.
""" """
node = ParameterExtractorNode( node = init_parameter_extractor_node(
tenant_id="1",
app_id="1",
workflow_id="1",
user_id="1",
invoke_from=InvokeFrom.WEB_APP,
user_from=UserFrom.ACCOUNT,
config={ config={
"id": "llm", "id": "llm",
"data": { "data": {
@ -162,29 +208,19 @@ def test_instructions(setup_openai_mock):
) )
db.session.close = MagicMock() db.session.close = MagicMock()
# construct variable pool result = node._run()
pool = VariablePool(
system_variables={
SystemVariableKey.QUERY: "what's the weather in SF",
SystemVariableKey.FILES: [],
SystemVariableKey.CONVERSATION_ID: "abababa",
SystemVariableKey.USER_ID: "aaa",
},
user_inputs={},
environment_variables=[],
)
result = node.run(pool)
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs is not None
assert result.outputs.get("location") == "kawaii" assert result.outputs.get("location") == "kawaii"
assert result.outputs.get("__reason") == None assert result.outputs.get("__reason") == None
process_data = result.process_data process_data = result.process_data
assert process_data is not None
process_data.get("prompts") process_data.get("prompts")
for prompt in process_data.get("prompts"): for prompt in process_data.get("prompts", []):
if prompt.get("role") == "system": if prompt.get("role") == "system":
assert "what's the weather in SF" in prompt.get("text") assert "what's the weather in SF" in prompt.get("text")
@ -194,13 +230,7 @@ def test_chat_parameter_extractor(setup_anthropic_mock):
""" """
Test chat parameter extractor. Test chat parameter extractor.
""" """
node = ParameterExtractorNode( node = init_parameter_extractor_node(
tenant_id="1",
app_id="1",
workflow_id="1",
user_id="1",
invoke_from=InvokeFrom.WEB_APP,
user_from=UserFrom.ACCOUNT,
config={ config={
"id": "llm", "id": "llm",
"data": { "data": {
@ -224,27 +254,17 @@ def test_chat_parameter_extractor(setup_anthropic_mock):
) )
db.session.close = MagicMock() db.session.close = MagicMock()
# construct variable pool result = node._run()
pool = VariablePool(
system_variables={
SystemVariableKey.QUERY: "what's the weather in SF",
SystemVariableKey.FILES: [],
SystemVariableKey.CONVERSATION_ID: "abababa",
SystemVariableKey.USER_ID: "aaa",
},
user_inputs={},
environment_variables=[],
)
result = node.run(pool)
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs is not None
assert result.outputs.get("location") == "" assert result.outputs.get("location") == ""
assert ( assert (
result.outputs.get("__reason") result.outputs.get("__reason")
== "Failed to extract result from function call or text response, using empty result." == "Failed to extract result from function call or text response, using empty result."
) )
prompts = result.process_data.get("prompts") assert result.process_data is not None
prompts = result.process_data.get("prompts", [])
for prompt in prompts: for prompt in prompts:
if prompt.get("role") == "user": if prompt.get("role") == "user":
@ -257,13 +277,7 @@ def test_completion_parameter_extractor(setup_openai_mock):
""" """
Test completion parameter extractor. Test completion parameter extractor.
""" """
node = ParameterExtractorNode( node = init_parameter_extractor_node(
tenant_id="1",
app_id="1",
workflow_id="1",
user_id="1",
invoke_from=InvokeFrom.WEB_APP,
user_from=UserFrom.ACCOUNT,
config={ config={
"id": "llm", "id": "llm",
"data": { "data": {
@ -292,28 +306,18 @@ def test_completion_parameter_extractor(setup_openai_mock):
) )
db.session.close = MagicMock() db.session.close = MagicMock()
# construct variable pool result = node._run()
pool = VariablePool(
system_variables={
SystemVariableKey.QUERY: "what's the weather in SF",
SystemVariableKey.FILES: [],
SystemVariableKey.CONVERSATION_ID: "abababa",
SystemVariableKey.USER_ID: "aaa",
},
user_inputs={},
environment_variables=[],
)
result = node.run(pool)
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs is not None
assert result.outputs.get("location") == "" assert result.outputs.get("location") == ""
assert ( assert (
result.outputs.get("__reason") result.outputs.get("__reason")
== "Failed to extract result from function call or text response, using empty result." == "Failed to extract result from function call or text response, using empty result."
) )
assert len(result.process_data.get("prompts")) == 1 assert result.process_data is not None
assert "SF" in result.process_data.get("prompts")[0].get("text") assert len(result.process_data.get("prompts", [])) == 1
assert "SF" in result.process_data.get("prompts", [])[0].get("text")
def test_extract_json_response(): def test_extract_json_response():
@ -321,13 +325,7 @@ def test_extract_json_response():
Test extract json response. Test extract json response.
""" """
node = ParameterExtractorNode( node = init_parameter_extractor_node(
tenant_id="1",
app_id="1",
workflow_id="1",
user_id="1",
invoke_from=InvokeFrom.WEB_APP,
user_from=UserFrom.ACCOUNT,
config={ config={
"id": "llm", "id": "llm",
"data": { "data": {
@ -356,6 +354,7 @@ def test_extract_json_response():
hello world. hello world.
""") """)
assert result is not None
assert result["location"] == "kawaii" assert result["location"] == "kawaii"
@ -364,13 +363,7 @@ def test_chat_parameter_extractor_with_memory(setup_anthropic_mock):
""" """
Test chat parameter extractor with memory. Test chat parameter extractor with memory.
""" """
node = ParameterExtractorNode( node = init_parameter_extractor_node(
tenant_id="1",
app_id="1",
workflow_id="1",
user_id="1",
invoke_from=InvokeFrom.WEB_APP,
user_from=UserFrom.ACCOUNT,
config={ config={
"id": "llm", "id": "llm",
"data": { "data": {
@ -395,27 +388,17 @@ def test_chat_parameter_extractor_with_memory(setup_anthropic_mock):
node._fetch_memory = get_mocked_fetch_memory("customized memory") node._fetch_memory = get_mocked_fetch_memory("customized memory")
db.session.close = MagicMock() db.session.close = MagicMock()
# construct variable pool result = node._run()
pool = VariablePool(
system_variables={
SystemVariableKey.QUERY: "what's the weather in SF",
SystemVariableKey.FILES: [],
SystemVariableKey.CONVERSATION_ID: "abababa",
SystemVariableKey.USER_ID: "aaa",
},
user_inputs={},
environment_variables=[],
)
result = node.run(pool)
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs is not None
assert result.outputs.get("location") == "" assert result.outputs.get("location") == ""
assert ( assert (
result.outputs.get("__reason") result.outputs.get("__reason")
== "Failed to extract result from function call or text response, using empty result." == "Failed to extract result from function call or text response, using empty result."
) )
prompts = result.process_data.get("prompts") assert result.process_data is not None
prompts = result.process_data.get("prompts", [])
latest_role = None latest_role = None
for prompt in prompts: for prompt in prompts:

View File

@ -1,23 +1,23 @@
import time
import uuid
import pytest import pytest
from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities.node_entities import UserFrom from core.workflow.entities.node_entities import UserFrom
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine.entities.graph import Graph
from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from core.workflow.nodes.template_transform.template_transform_node import TemplateTransformNode from core.workflow.nodes.template_transform.template_transform_node import TemplateTransformNode
from models.workflow import WorkflowNodeExecutionStatus from models.workflow import WorkflowNodeExecutionStatus, WorkflowType
from tests.integration_tests.workflow.nodes.__mock.code_executor import setup_code_executor_mock from tests.integration_tests.workflow.nodes.__mock.code_executor import setup_code_executor_mock
@pytest.mark.parametrize("setup_code_executor_mock", [["none"]], indirect=True) @pytest.mark.parametrize("setup_code_executor_mock", [["none"]], indirect=True)
def test_execute_code(setup_code_executor_mock): def test_execute_code(setup_code_executor_mock):
code = """{{args2}}""" code = """{{args2}}"""
node = TemplateTransformNode(
tenant_id="1",
app_id="1",
workflow_id="1",
user_id="1",
invoke_from=InvokeFrom.WEB_APP,
user_from=UserFrom.END_USER,
config = { config = {
"id": "1", "id": "1",
"data": { "data": {
@ -31,16 +31,54 @@ def test_execute_code(setup_code_executor_mock):
], ],
"template": code, "template": code,
}, },
}
graph_config = {
"edges": [
{
"id": "start-source-next-target",
"source": "start",
"target": "1",
}, },
],
"nodes": [{"data": {"type": "start"}, "id": "start"}, config],
}
graph = Graph.init(graph_config=graph_config)
init_params = GraphInitParams(
tenant_id="1",
app_id="1",
workflow_type=WorkflowType.WORKFLOW,
workflow_id="1",
graph_config=graph_config,
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
) )
# construct variable pool # construct variable pool
pool = VariablePool(system_variables={}, user_inputs={}, environment_variables=[]) variable_pool = VariablePool(
pool.add(["1", "123", "args1"], 1) system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"},
pool.add(["1", "123", "args2"], 3) user_inputs={},
environment_variables=[],
conversation_variables=[],
)
variable_pool.add(["1", "123", "args1"], 1)
variable_pool.add(["1", "123", "args2"], 3)
node = TemplateTransformNode(
id=str(uuid.uuid4()),
graph_init_params=init_params,
graph=graph,
graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()),
config=config,
)
# execute node # execute node
result = node.run(pool) result = node._run()
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs is not None
assert result.outputs["output"] == "3" assert result.outputs["output"] == "3"

View File

@ -1,21 +1,62 @@
import time
import uuid
from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities.node_entities import UserFrom from core.workflow.entities.node_entities import NodeRunResult, UserFrom
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine.entities.graph import Graph
from core.workflow.graph_engine.entities.graph_init_params import GraphInitParams
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from core.workflow.nodes.tool.tool_node import ToolNode from core.workflow.nodes.tool.tool_node import ToolNode
from models.workflow import WorkflowNodeExecutionStatus from models.workflow import WorkflowNodeExecutionStatus, WorkflowType
def init_tool_node(config: dict):
graph_config = {
"edges": [
{
"id": "start-source-next-target",
"source": "start",
"target": "1",
},
],
"nodes": [{"data": {"type": "start"}, "id": "start"}, config],
}
graph = Graph.init(graph_config=graph_config)
init_params = GraphInitParams(
tenant_id="1",
app_id="1",
workflow_type=WorkflowType.WORKFLOW,
workflow_id="1",
graph_config=graph_config,
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
)
# construct variable pool
variable_pool = VariablePool(
system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"},
user_inputs={},
environment_variables=[],
conversation_variables=[],
)
return ToolNode(
id=str(uuid.uuid4()),
graph_init_params=init_params,
graph=graph,
graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()),
config=config,
)
def test_tool_variable_invoke(): def test_tool_variable_invoke():
pool = VariablePool(system_variables={}, user_inputs={}, environment_variables=[]) node = init_tool_node(
pool.add(["1", "123", "args1"], "1+1")
node = ToolNode(
tenant_id="1",
app_id="1",
workflow_id="1",
user_id="1",
invoke_from=InvokeFrom.WEB_APP,
user_from=UserFrom.ACCOUNT,
config={ config={
"id": "1", "id": "1",
"data": { "data": {
@ -34,28 +75,22 @@ def test_tool_variable_invoke():
} }
}, },
}, },
}, }
) )
# execute node node.graph_runtime_state.variable_pool.add(["1", "123", "args1"], "1+1")
result = node.run(pool)
# execute node
result = node._run()
assert isinstance(result, NodeRunResult)
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs is not None
assert "2" in result.outputs["text"] assert "2" in result.outputs["text"]
assert result.outputs["files"] == [] assert result.outputs["files"] == []
def test_tool_mixed_invoke(): def test_tool_mixed_invoke():
pool = VariablePool(system_variables={}, user_inputs={}, environment_variables=[]) node = init_tool_node(
pool.add(["1", "args1"], "1+1")
node = ToolNode(
tenant_id="1",
app_id="1",
workflow_id="1",
user_id="1",
invoke_from=InvokeFrom.WEB_APP,
user_from=UserFrom.ACCOUNT,
config={ config={
"id": "1", "id": "1",
"data": { "data": {
@ -74,12 +109,15 @@ def test_tool_mixed_invoke():
} }
}, },
}, },
}, }
) )
# execute node node.graph_runtime_state.variable_pool.add(["1", "args1"], "1+1")
result = node.run(pool)
# execute node
result = node._run()
assert isinstance(result, NodeRunResult)
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs is not None
assert "2" in result.outputs["text"] assert "2" in result.outputs["text"]
assert result.outputs["files"] == [] assert result.outputs["files"] == []

View File

@ -32,33 +32,22 @@ def test_init():
"id": "http-source-answer2-target", "id": "http-source-answer2-target",
"source": "http", "source": "http",
"target": "answer2", "target": "answer2",
} },
], ],
"nodes": [ "nodes": [
{ {"data": {"type": "start"}, "id": "start"},
"data": {
"type": "start"
},
"id": "start"
},
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm" "id": "llm",
}, },
{ {
"data": { "data": {"type": "answer", "title": "answer", "answer": "1"},
"type": "answer",
"title": "answer",
"answer": "1"
},
"id": "answer", "id": "answer",
}, },
{ {
"data": { "data": {"type": "question-classifier"},
"type": "question-classifier"
},
"id": "qc", "id": "qc",
}, },
{ {
@ -68,19 +57,13 @@ def test_init():
"id": "http", "id": "http",
}, },
{ {
"data": { "data": {"type": "answer", "title": "answer", "answer": "1"},
"type": "answer",
"title": "answer",
"answer": "1"
},
"id": "answer2", "id": "answer2",
} },
], ],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
start_node_id = "start" start_node_id = "start"
@ -127,7 +110,7 @@ def test__init_iteration_graph():
"source": "code", "source": "code",
"sourceHandle": "source", "sourceHandle": "source",
"target": "iteration", "target": "iteration",
} },
], ],
"nodes": [ "nodes": [
{ {
@ -143,17 +126,11 @@ def test__init_iteration_graph():
"id": "llm", "id": "llm",
}, },
{ {
"data": { "data": {"type": "answer", "title": "answer", "answer": "1"},
"type": "answer",
"title": "answer",
"answer": "1"
},
"id": "answer", "id": "answer",
}, },
{ {
"data": { "data": {"type": "iteration"},
"type": "iteration"
},
"id": "iteration", "id": "iteration",
}, },
{ {
@ -171,11 +148,7 @@ def test__init_iteration_graph():
"parentId": "iteration", "parentId": "iteration",
}, },
{ {
"data": { "data": {"type": "answer", "title": "answer", "answer": "1"},
"type": "answer",
"title": "answer",
"answer": "1"
},
"id": "answer-in-iteration", "id": "answer-in-iteration",
"parentId": "iteration", "parentId": "iteration",
}, },
@ -184,27 +157,18 @@ def test__init_iteration_graph():
"type": "code", "type": "code",
}, },
"id": "code", "id": "code",
} },
] ],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config, root_node_id="template-transform-in-iteration")
graph_config=graph_config,
root_node_id="template-transform-in-iteration"
)
graph.add_extra_edge( graph.add_extra_edge(
source_node_id="answer-in-iteration", source_node_id="answer-in-iteration",
target_node_id="template-transform-in-iteration", target_node_id="template-transform-in-iteration",
run_condition=RunCondition( run_condition=RunCondition(
type="condition", type="condition",
conditions=[ conditions=[Condition(variable_selector=["iteration", "index"], comparison_operator="", value="5")],
Condition( ),
variable_selector=["iteration", "index"],
comparison_operator="",
value="5"
)
]
)
) )
# iteration: # iteration:
@ -248,47 +212,36 @@ def test_parallels_graph():
"id": "llm3-source-answer-target", "id": "llm3-source-answer-target",
"source": "llm3", "source": "llm3",
"target": "answer", "target": "answer",
} },
], ],
"nodes": [ "nodes": [
{"data": {"type": "start"}, "id": "start"},
{ {
"data": { "data": {
"type": "start" "type": "llm",
}, },
"id": "start" "id": "llm1",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm1" "id": "llm2",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm2" "id": "llm3",
}, },
{ {
"data": { "data": {"type": "answer", "title": "answer", "answer": "1"},
"type": "llm",
},
"id": "llm3"
},
{
"data": {
"type": "answer",
"title": "answer",
"answer": "1"
},
"id": "answer", "id": "answer",
}, },
], ],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
assert graph.root_node_id == "start" assert graph.root_node_id == "start"
for i in range(3): for i in range(3):
@ -330,47 +283,36 @@ def test_parallels_graph2():
"id": "llm2-source-answer-target", "id": "llm2-source-answer-target",
"source": "llm2", "source": "llm2",
"target": "answer", "target": "answer",
} },
], ],
"nodes": [ "nodes": [
{"data": {"type": "start"}, "id": "start"},
{ {
"data": { "data": {
"type": "start" "type": "llm",
}, },
"id": "start" "id": "llm1",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm1" "id": "llm2",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm2" "id": "llm3",
}, },
{ {
"data": { "data": {"type": "answer", "title": "answer", "answer": "1"},
"type": "llm",
},
"id": "llm3"
},
{
"data": {
"type": "answer",
"title": "answer",
"answer": "1"
},
"id": "answer", "id": "answer",
}, },
], ],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
assert graph.root_node_id == "start" assert graph.root_node_id == "start"
for i in range(3): for i in range(3):
@ -407,44 +349,33 @@ def test_parallels_graph3():
}, },
], ],
"nodes": [ "nodes": [
{"data": {"type": "start"}, "id": "start"},
{ {
"data": { "data": {
"type": "start" "type": "llm",
}, },
"id": "start" "id": "llm1",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm1" "id": "llm2",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm2" "id": "llm3",
}, },
{ {
"data": { "data": {"type": "answer", "title": "answer", "answer": "1"},
"type": "llm",
},
"id": "llm3"
},
{
"data": {
"type": "answer",
"title": "answer",
"answer": "1"
},
"id": "answer", "id": "answer",
}, },
], ],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
assert graph.root_node_id == "start" assert graph.root_node_id == "start"
for i in range(3): for i in range(3):
@ -504,65 +435,54 @@ def test_parallels_graph4():
"id": "code3-source-answer-target", "id": "code3-source-answer-target",
"source": "code3", "source": "code3",
"target": "answer", "target": "answer",
} },
], ],
"nodes": [ "nodes": [
{ {"data": {"type": "start"}, "id": "start"},
"data": {
"type": "start"
},
"id": "start"
},
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm1" "id": "llm1",
}, },
{ {
"data": { "data": {
"type": "code", "type": "code",
}, },
"id": "code1" "id": "code1",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm2" "id": "llm2",
}, },
{ {
"data": { "data": {
"type": "code", "type": "code",
}, },
"id": "code2" "id": "code2",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm3" "id": "llm3",
}, },
{ {
"data": { "data": {
"type": "code", "type": "code",
}, },
"id": "code3" "id": "code3",
}, },
{ {
"data": { "data": {"type": "answer", "title": "answer", "answer": "1"},
"type": "answer",
"title": "answer",
"answer": "1"
},
"id": "answer", "id": "answer",
}, },
], ],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
assert graph.root_node_id == "start" assert graph.root_node_id == "start"
for i in range(3): for i in range(3):
@ -641,77 +561,66 @@ def test_parallels_graph5():
"id": "code2-source-answer-target", "id": "code2-source-answer-target",
"source": "code2", "source": "code2",
"target": "answer", "target": "answer",
} },
], ],
"nodes": [ "nodes": [
{ {"data": {"type": "start"}, "id": "start"},
"data": {
"type": "start"
},
"id": "start"
},
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm1" "id": "llm1",
}, },
{ {
"data": { "data": {
"type": "code", "type": "code",
}, },
"id": "code1" "id": "code1",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm2" "id": "llm2",
}, },
{ {
"data": { "data": {
"type": "code", "type": "code",
}, },
"id": "code2" "id": "code2",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm3" "id": "llm3",
}, },
{ {
"data": { "data": {
"type": "code", "type": "code",
}, },
"id": "code3" "id": "code3",
}, },
{ {
"data": { "data": {"type": "answer", "title": "answer", "answer": "1"},
"type": "answer",
"title": "answer",
"answer": "1"
},
"id": "answer", "id": "answer",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm4" "id": "llm4",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm5" "id": "llm5",
}, },
], ],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
assert graph.root_node_id == "start" assert graph.root_node_id == "start"
for i in range(5): for i in range(5):
@ -786,65 +695,54 @@ def test_parallels_graph6():
"id": "code3-source-answer-target", "id": "code3-source-answer-target",
"source": "code3", "source": "code3",
"target": "answer", "target": "answer",
} },
], ],
"nodes": [ "nodes": [
{ {"data": {"type": "start"}, "id": "start"},
"data": {
"type": "start"
},
"id": "start"
},
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm1" "id": "llm1",
}, },
{ {
"data": { "data": {
"type": "code", "type": "code",
}, },
"id": "code1" "id": "code1",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm2" "id": "llm2",
}, },
{ {
"data": { "data": {
"type": "code", "type": "code",
}, },
"id": "code2" "id": "code2",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm3" "id": "llm3",
}, },
{ {
"data": { "data": {
"type": "code", "type": "code",
}, },
"id": "code3" "id": "code3",
}, },
{ {
"data": { "data": {"type": "answer", "title": "answer", "answer": "1"},
"type": "answer",
"title": "answer",
"answer": "1"
},
"id": "answer", "id": "answer",
}, },
], ],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
assert graph.root_node_id == "start" assert graph.root_node_id == "start"
for i in range(3): for i in range(3):

View File

@ -22,8 +22,8 @@ from core.workflow.nodes.llm.llm_node import LLMNode
from models.workflow import WorkflowNodeExecutionStatus, WorkflowType from models.workflow import WorkflowNodeExecutionStatus, WorkflowType
@patch('extensions.ext_database.db.session.remove') @patch("extensions.ext_database.db.session.remove")
@patch('extensions.ext_database.db.session.close') @patch("extensions.ext_database.db.session.close")
def test_run_parallel_in_workflow(mock_close, mock_remove): def test_run_parallel_in_workflow(mock_close, mock_remove):
graph_config = { graph_config = {
"edges": [ "edges": [
@ -51,85 +51,61 @@ def test_run_parallel_in_workflow(mock_close, mock_remove):
"id": "5", "id": "5",
"source": "llm3", "source": "llm3",
"target": "end2", "target": "end2",
} },
], ],
"nodes": [ "nodes": [
{ {
"data": { "data": {
"type": "start", "type": "start",
"title": "start", "title": "start",
"variables": [{ "variables": [
{
"label": "query", "label": "query",
"max_length": 48, "max_length": 48,
"options": [], "options": [],
"required": True, "required": True,
"type": "text-input", "type": "text-input",
"variable": "query" "variable": "query",
}] }
],
}, },
"id": "start" "id": "start",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
"title": "llm1", "title": "llm1",
"context": { "context": {"enabled": False, "variable_selector": []},
"enabled": False,
"variable_selector": []
},
"model": { "model": {
"completion_params": { "completion_params": {"temperature": 0.7},
"temperature": 0.7
},
"mode": "chat", "mode": "chat",
"name": "gpt-4o", "name": "gpt-4o",
"provider": "openai" "provider": "openai",
}, },
"prompt_template": [{ "prompt_template": [
"role": "system", {"role": "system", "text": "say hi"},
"text": "say hi" {"role": "user", "text": "{{#start.query#}}"},
}, { ],
"role": "user", "vision": {"configs": {"detail": "high"}, "enabled": False},
"text": "{{#start.query#}}"
}],
"vision": {
"configs": {
"detail": "high"
}, },
"enabled": False "id": "llm1",
}
},
"id": "llm1"
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
"title": "llm2", "title": "llm2",
"context": { "context": {"enabled": False, "variable_selector": []},
"enabled": False,
"variable_selector": []
},
"model": { "model": {
"completion_params": { "completion_params": {"temperature": 0.7},
"temperature": 0.7
},
"mode": "chat", "mode": "chat",
"name": "gpt-4o", "name": "gpt-4o",
"provider": "openai" "provider": "openai",
}, },
"prompt_template": [{ "prompt_template": [
"role": "system", {"role": "system", "text": "say bye"},
"text": "say bye" {"role": "user", "text": "{{#start.query#}}"},
}, { ],
"role": "user", "vision": {"configs": {"detail": "high"}, "enabled": False},
"text": "{{#start.query#}}"
}],
"vision": {
"configs": {
"detail": "high"
},
"enabled": False
}
}, },
"id": "llm2", "id": "llm2",
}, },
@ -137,31 +113,18 @@ def test_run_parallel_in_workflow(mock_close, mock_remove):
"data": { "data": {
"type": "llm", "type": "llm",
"title": "llm3", "title": "llm3",
"context": { "context": {"enabled": False, "variable_selector": []},
"enabled": False,
"variable_selector": []
},
"model": { "model": {
"completion_params": { "completion_params": {"temperature": 0.7},
"temperature": 0.7
},
"mode": "chat", "mode": "chat",
"name": "gpt-4o", "name": "gpt-4o",
"provider": "openai" "provider": "openai",
}, },
"prompt_template": [{ "prompt_template": [
"role": "system", {"role": "system", "text": "say good morning"},
"text": "say good morning" {"role": "user", "text": "{{#start.query#}}"},
}, { ],
"role": "user", "vision": {"configs": {"detail": "high"}, "enabled": False},
"text": "{{#start.query#}}"
}],
"vision": {
"configs": {
"detail": "high"
},
"enabled": False
}
}, },
"id": "llm3", "id": "llm3",
}, },
@ -169,13 +132,10 @@ def test_run_parallel_in_workflow(mock_close, mock_remove):
"data": { "data": {
"type": "end", "type": "end",
"title": "end1", "title": "end1",
"outputs": [{ "outputs": [
"value_selector": ["llm2", "text"], {"value_selector": ["llm2", "text"], "variable": "result2"},
"variable": "result2" {"value_selector": ["start", "query"], "variable": "query"},
}, { ],
"value_selector": ["start", "query"],
"variable": "query"
}],
}, },
"id": "end1", "id": "end1",
}, },
@ -183,29 +143,21 @@ def test_run_parallel_in_workflow(mock_close, mock_remove):
"data": { "data": {
"type": "end", "type": "end",
"title": "end2", "title": "end2",
"outputs": [{ "outputs": [
"value_selector": ["llm1", "text"], {"value_selector": ["llm1", "text"], "variable": "result1"},
"variable": "result1" {"value_selector": ["llm3", "text"], "variable": "result3"},
}, { ],
"value_selector": ["llm3", "text"],
"variable": "result3"
}],
}, },
"id": "end2", "id": "end2",
} },
], ],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
variable_pool = VariablePool(system_variables={ variable_pool = VariablePool(
SystemVariableKey.FILES: [], system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, user_inputs={"query": "hi"}
SystemVariableKey.USER_ID: 'aaa' )
}, user_inputs={
"query": "hi"
})
graph_engine = GraphEngine( graph_engine = GraphEngine(
tenant_id="111", tenant_id="111",
@ -220,19 +172,14 @@ def test_run_parallel_in_workflow(mock_close, mock_remove):
graph=graph, graph=graph,
variable_pool=variable_pool, variable_pool=variable_pool,
max_execution_steps=500, max_execution_steps=500,
max_execution_time=1200 max_execution_time=1200,
) )
def llm_generator(self): def llm_generator(self):
contents = [ contents = ["hi", "bye", "good morning"]
'hi',
'bye',
'good morning'
]
yield RunStreamChunkEvent( yield RunStreamChunkEvent(
chunk_content=contents[int(self.node_id[-1]) - 1], chunk_content=contents[int(self.node_id[-1]) - 1], from_variable_selector=[self.node_id, "text"]
from_variable_selector=[self.node_id, 'text']
) )
yield RunCompletedEvent( yield RunCompletedEvent(
@ -244,14 +191,14 @@ def test_run_parallel_in_workflow(mock_close, mock_remove):
metadata={ metadata={
NodeRunMetadataKey.TOTAL_TOKENS: 1, NodeRunMetadataKey.TOTAL_TOKENS: 1,
NodeRunMetadataKey.TOTAL_PRICE: 1, NodeRunMetadataKey.TOTAL_PRICE: 1,
NodeRunMetadataKey.CURRENCY: 'USD' NodeRunMetadataKey.CURRENCY: "USD",
} },
) )
) )
# print("") # print("")
with patch.object(LLMNode, '_run', new=llm_generator): with patch.object(LLMNode, "_run", new=llm_generator):
items = [] items = []
generator = graph_engine.run() generator = graph_engine.run()
for item in generator: for item in generator:
@ -263,21 +210,19 @@ def test_run_parallel_in_workflow(mock_close, mock_remove):
assert not isinstance(item, NodeRunFailedEvent) assert not isinstance(item, NodeRunFailedEvent)
assert not isinstance(item, GraphRunFailedEvent) assert not isinstance(item, GraphRunFailedEvent)
if isinstance(item, BaseNodeEvent) and item.route_node_state.node_id in [ if isinstance(item, BaseNodeEvent) and item.route_node_state.node_id in ["llm2", "llm3", "end1", "end2"]:
'llm2', 'llm3', 'end1', 'end2'
]:
assert item.parallel_id is not None assert item.parallel_id is not None
assert len(items) == 18 assert len(items) == 18
assert isinstance(items[0], GraphRunStartedEvent) assert isinstance(items[0], GraphRunStartedEvent)
assert isinstance(items[1], NodeRunStartedEvent) assert isinstance(items[1], NodeRunStartedEvent)
assert items[1].route_node_state.node_id == 'start' assert items[1].route_node_state.node_id == "start"
assert isinstance(items[2], NodeRunSucceededEvent) assert isinstance(items[2], NodeRunSucceededEvent)
assert items[2].route_node_state.node_id == 'start' assert items[2].route_node_state.node_id == "start"
@patch('extensions.ext_database.db.session.remove') @patch("extensions.ext_database.db.session.remove")
@patch('extensions.ext_database.db.session.close') @patch("extensions.ext_database.db.session.close")
def test_run_parallel_in_chatflow(mock_close, mock_remove): def test_run_parallel_in_chatflow(mock_close, mock_remove):
graph_config = { graph_config = {
"edges": [ "edges": [
@ -305,69 +250,41 @@ def test_run_parallel_in_chatflow(mock_close, mock_remove):
"id": "5", "id": "5",
"source": "answer3", "source": "answer3",
"target": "answer5", "target": "answer5",
} },
], ],
"nodes": [ "nodes": [
{"data": {"type": "start", "title": "start"}, "id": "start"},
{"data": {"type": "answer", "title": "answer1", "answer": "1"}, "id": "answer1"},
{ {
"data": { "data": {"type": "answer", "title": "answer2", "answer": "2"},
"type": "start",
"title": "start"
},
"id": "start"
},
{
"data": {
"type": "answer",
"title": "answer1",
"answer": "1"
},
"id": "answer1"
},
{
"data": {
"type": "answer",
"title": "answer2",
"answer": "2"
},
"id": "answer2", "id": "answer2",
}, },
{ {
"data": { "data": {"type": "answer", "title": "answer3", "answer": "3"},
"type": "answer",
"title": "answer3",
"answer": "3"
},
"id": "answer3", "id": "answer3",
}, },
{ {
"data": { "data": {"type": "answer", "title": "answer4", "answer": "4"},
"type": "answer",
"title": "answer4",
"answer": "4"
},
"id": "answer4", "id": "answer4",
}, },
{ {
"data": { "data": {"type": "answer", "title": "answer5", "answer": "5"},
"type": "answer",
"title": "answer5",
"answer": "5"
},
"id": "answer5", "id": "answer5",
} },
], ],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
variable_pool = VariablePool(system_variables={ variable_pool = VariablePool(
SystemVariableKey.QUERY: 'what\'s the weather in SF', system_variables={
SystemVariableKey.QUERY: "what's the weather in SF",
SystemVariableKey.FILES: [], SystemVariableKey.FILES: [],
SystemVariableKey.CONVERSATION_ID: 'abababa', SystemVariableKey.CONVERSATION_ID: "abababa",
SystemVariableKey.USER_ID: 'aaa' SystemVariableKey.USER_ID: "aaa",
}, user_inputs={}) },
user_inputs={},
)
graph_engine = GraphEngine( graph_engine = GraphEngine(
tenant_id="111", tenant_id="111",
@ -382,7 +299,7 @@ def test_run_parallel_in_chatflow(mock_close, mock_remove):
graph=graph, graph=graph,
variable_pool=variable_pool, variable_pool=variable_pool,
max_execution_steps=500, max_execution_steps=500,
max_execution_time=1200 max_execution_time=1200,
) )
# print("") # print("")
@ -399,135 +316,155 @@ def test_run_parallel_in_chatflow(mock_close, mock_remove):
assert not isinstance(item, GraphRunFailedEvent) assert not isinstance(item, GraphRunFailedEvent)
if isinstance(item, BaseNodeEvent) and item.route_node_state.node_id in [ if isinstance(item, BaseNodeEvent) and item.route_node_state.node_id in [
'answer2', 'answer3', 'answer4', 'answer5' "answer2",
"answer3",
"answer4",
"answer5",
]: ]:
assert item.parallel_id is not None assert item.parallel_id is not None
assert len(items) == 23 assert len(items) == 23
assert isinstance(items[0], GraphRunStartedEvent) assert isinstance(items[0], GraphRunStartedEvent)
assert isinstance(items[1], NodeRunStartedEvent) assert isinstance(items[1], NodeRunStartedEvent)
assert items[1].route_node_state.node_id == 'start' assert items[1].route_node_state.node_id == "start"
assert isinstance(items[2], NodeRunSucceededEvent) assert isinstance(items[2], NodeRunSucceededEvent)
assert items[2].route_node_state.node_id == 'start' assert items[2].route_node_state.node_id == "start"
@patch('extensions.ext_database.db.session.remove') @patch("extensions.ext_database.db.session.remove")
@patch('extensions.ext_database.db.session.close') @patch("extensions.ext_database.db.session.close")
def test_run_branch(mock_close, mock_remove): def test_run_branch(mock_close, mock_remove):
graph_config = { graph_config = {
"edges": [{ "edges": [
{
"id": "1", "id": "1",
"source": "start", "source": "start",
"target": "if-else-1", "target": "if-else-1",
}, { },
{
"id": "2", "id": "2",
"source": "if-else-1", "source": "if-else-1",
"sourceHandle": "true", "sourceHandle": "true",
"target": "answer-1", "target": "answer-1",
}, { },
{
"id": "3", "id": "3",
"source": "if-else-1", "source": "if-else-1",
"sourceHandle": "false", "sourceHandle": "false",
"target": "if-else-2", "target": "if-else-2",
}, { },
{
"id": "4", "id": "4",
"source": "if-else-2", "source": "if-else-2",
"sourceHandle": "true", "sourceHandle": "true",
"target": "answer-2", "target": "answer-2",
}, { },
{
"id": "5", "id": "5",
"source": "if-else-2", "source": "if-else-2",
"sourceHandle": "false", "sourceHandle": "false",
"target": "answer-3", "target": "answer-3",
}], },
"nodes": [{ ],
"nodes": [
{
"data": { "data": {
"title": "Start", "title": "Start",
"type": "start", "type": "start",
"variables": [{ "variables": [
{
"label": "uid", "label": "uid",
"max_length": 48, "max_length": 48,
"options": [], "options": [],
"required": True, "required": True,
"type": "text-input", "type": "text-input",
"variable": "uid" "variable": "uid",
}] }
],
}, },
"id": "start" "id": "start",
}, {
"data": {
"answer": "1 {{#start.uid#}}",
"title": "Answer",
"type": "answer",
"variables": []
}, },
{
"data": {"answer": "1 {{#start.uid#}}", "title": "Answer", "type": "answer", "variables": []},
"id": "answer-1", "id": "answer-1",
}, { },
{
"data": { "data": {
"cases": [{ "cases": [
{
"case_id": "true", "case_id": "true",
"conditions": [{ "conditions": [
{
"comparison_operator": "contains", "comparison_operator": "contains",
"id": "b0f02473-08b6-4a81-af91-15345dcb2ec8", "id": "b0f02473-08b6-4a81-af91-15345dcb2ec8",
"value": "hi", "value": "hi",
"varType": "string", "varType": "string",
"variable_selector": ["sys", "query"] "variable_selector": ["sys", "query"],
}], }
],
"id": "true", "id": "true",
"logical_operator": "and" "logical_operator": "and",
}], }
],
"desc": "", "desc": "",
"title": "IF/ELSE", "title": "IF/ELSE",
"type": "if-else" "type": "if-else",
}, },
"id": "if-else-1", "id": "if-else-1",
}, { },
{
"data": { "data": {
"cases": [{ "cases": [
{
"case_id": "true", "case_id": "true",
"conditions": [{ "conditions": [
{
"comparison_operator": "contains", "comparison_operator": "contains",
"id": "ae895199-5608-433b-b5f0-0997ae1431e4", "id": "ae895199-5608-433b-b5f0-0997ae1431e4",
"value": "takatost", "value": "takatost",
"varType": "string", "varType": "string",
"variable_selector": ["sys", "query"] "variable_selector": ["sys", "query"],
}], }
],
"id": "true", "id": "true",
"logical_operator": "and" "logical_operator": "and",
}], }
],
"title": "IF/ELSE 2", "title": "IF/ELSE 2",
"type": "if-else" "type": "if-else",
}, },
"id": "if-else-2", "id": "if-else-2",
}, { },
{
"data": { "data": {
"answer": "2", "answer": "2",
"title": "Answer 2", "title": "Answer 2",
"type": "answer", "type": "answer",
}, },
"id": "answer-2", "id": "answer-2",
}, { },
{
"data": { "data": {
"answer": "3", "answer": "3",
"title": "Answer 3", "title": "Answer 3",
"type": "answer", "type": "answer",
}, },
"id": "answer-3", "id": "answer-3",
}] },
],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
variable_pool = VariablePool(system_variables={ variable_pool = VariablePool(
SystemVariableKey.QUERY: 'hi', system_variables={
SystemVariableKey.QUERY: "hi",
SystemVariableKey.FILES: [], SystemVariableKey.FILES: [],
SystemVariableKey.CONVERSATION_ID: 'abababa', SystemVariableKey.CONVERSATION_ID: "abababa",
SystemVariableKey.USER_ID: 'aaa' SystemVariableKey.USER_ID: "aaa",
}, user_inputs={ },
"uid": "takato" user_inputs={"uid": "takato"},
}) )
graph_engine = GraphEngine( graph_engine = GraphEngine(
tenant_id="111", tenant_id="111",
@ -542,7 +479,7 @@ def test_run_branch(mock_close, mock_remove):
graph=graph, graph=graph,
variable_pool=variable_pool, variable_pool=variable_pool,
max_execution_steps=500, max_execution_steps=500,
max_execution_time=1200 max_execution_time=1200,
) )
# print("") # print("")
@ -554,15 +491,15 @@ def test_run_branch(mock_close, mock_remove):
items.append(item) items.append(item)
assert len(items) == 10 assert len(items) == 10
assert items[3].route_node_state.node_id == 'if-else-1' assert items[3].route_node_state.node_id == "if-else-1"
assert items[4].route_node_state.node_id == 'if-else-1' assert items[4].route_node_state.node_id == "if-else-1"
assert isinstance(items[5], NodeRunStreamChunkEvent) assert isinstance(items[5], NodeRunStreamChunkEvent)
assert items[5].chunk_content == '1 ' assert items[5].chunk_content == "1 "
assert isinstance(items[6], NodeRunStreamChunkEvent) assert isinstance(items[6], NodeRunStreamChunkEvent)
assert items[6].chunk_content == 'takato' assert items[6].chunk_content == "takato"
assert items[7].route_node_state.node_id == 'answer-1' assert items[7].route_node_state.node_id == "answer-1"
assert items[8].route_node_state.node_id == 'answer-1' assert items[8].route_node_state.node_id == "answer-1"
assert items[8].route_node_state.node_run_result.outputs['answer'] == '1 takato' assert items[8].route_node_state.node_run_result.outputs["answer"] == "1 takato"
assert isinstance(items[9], GraphRunSucceededEvent) assert isinstance(items[9], GraphRunSucceededEvent)
# print(graph_engine.graph_runtime_state.model_dump_json(indent=2)) # print(graph_engine.graph_runtime_state.model_dump_json(indent=2))

View File

@ -24,61 +24,52 @@ def test_execute_answer():
}, },
], ],
"nodes": [ "nodes": [
{ {"data": {"type": "start"}, "id": "start"},
"data": {
"type": "start"
},
"id": "start"
},
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm" "id": "llm",
}, },
] ],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
init_params = GraphInitParams( init_params = GraphInitParams(
tenant_id='1', tenant_id="1",
app_id='1', app_id="1",
workflow_type=WorkflowType.WORKFLOW, workflow_type=WorkflowType.WORKFLOW,
workflow_id='1', workflow_id="1",
graph_config=graph_config, graph_config=graph_config,
user_id='1', user_id="1",
user_from=UserFrom.ACCOUNT, user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER, invoke_from=InvokeFrom.DEBUGGER,
call_depth=0 call_depth=0,
) )
# construct variable pool # construct variable pool
pool = VariablePool(system_variables={ pool = VariablePool(
SystemVariableKey.FILES: [], system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"},
SystemVariableKey.USER_ID: 'aaa' user_inputs={},
}, user_inputs={}, environment_variables=[]) environment_variables=[],
pool.add(['start', 'weather'], 'sunny') )
pool.add(['llm', 'text'], 'You are a helpful AI.') pool.add(["start", "weather"], "sunny")
pool.add(["llm", "text"], "You are a helpful AI.")
node = AnswerNode( node = AnswerNode(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
graph_init_params=init_params, graph_init_params=init_params,
graph=graph, graph=graph,
graph_runtime_state=GraphRuntimeState( graph_runtime_state=GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter()),
variable_pool=pool,
start_at=time.perf_counter()
),
config={ config={
'id': 'answer', "id": "answer",
'data': { "data": {
'title': '123', "title": "123",
'type': 'answer', "type": "answer",
'answer': 'Today\'s weather is {{#start.weather#}}\n{{#llm.text#}}\n{{img}}\nFin.' "answer": "Today's weather is {{#start.weather#}}\n{{#llm.text#}}\n{{img}}\nFin.",
} },
} },
) )
# Mock db.session.close() # Mock db.session.close()
@ -88,4 +79,4 @@ def test_execute_answer():
result = node._run() result = node._run()
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs['answer'] == "Today's weather is sunny\nYou are a helpful AI.\n{{img}}\nFin." assert result.outputs["answer"] == "Today's weather is sunny\nYou are a helpful AI.\n{{img}}\nFin."

View File

@ -54,72 +54,56 @@ def test_init():
"id": "llm1-source-answer-target", "id": "llm1-source-answer-target",
"source": "llm1", "source": "llm1",
"target": "answer", "target": "answer",
} },
], ],
"nodes": [ "nodes": [
{"data": {"type": "start"}, "id": "start"},
{ {
"data": { "data": {
"type": "start" "type": "llm",
}, },
"id": "start" "id": "llm1",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm1" "id": "llm2",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm2" "id": "llm3",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm3" "id": "llm4",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm4" "id": "llm5",
}, },
{ {
"data": { "data": {"type": "answer", "title": "answer", "answer": "1{{#llm2.text#}}2"},
"type": "llm",
},
"id": "llm5"
},
{
"data": {
"type": "answer",
"title": "answer",
"answer": "1{{#llm2.text#}}2"
},
"id": "answer", "id": "answer",
}, },
{ {
"data": { "data": {"type": "answer", "title": "answer2", "answer": "1{{#llm3.text#}}2"},
"type": "answer",
"title": "answer2",
"answer": "1{{#llm3.text#}}2"
},
"id": "answer2", "id": "answer2",
}, },
], ],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
answer_stream_generate_route = AnswerStreamGeneratorRouter.init( answer_stream_generate_route = AnswerStreamGeneratorRouter.init(
node_id_config_mapping=graph.node_id_config_mapping, node_id_config_mapping=graph.node_id_config_mapping, reverse_edge_mapping=graph.reverse_edge_mapping
reverse_edge_mapping=graph.reverse_edge_mapping
) )
assert answer_stream_generate_route.answer_dependencies['answer'] == ['answer2'] assert answer_stream_generate_route.answer_dependencies["answer"] == ["answer2"]
assert answer_stream_generate_route.answer_dependencies['answer2'] == [] assert answer_stream_generate_route.answer_dependencies["answer2"] == []

View File

@ -18,7 +18,7 @@ from core.workflow.nodes.start.entities import StartNodeData
def _recursive_process(graph: Graph, next_node_id: str) -> Generator[GraphEngineEvent, None, None]: def _recursive_process(graph: Graph, next_node_id: str) -> Generator[GraphEngineEvent, None, None]:
if next_node_id == 'start': if next_node_id == "start":
yield from _publish_events(graph, next_node_id) yield from _publish_events(graph, next_node_id)
for edge in graph.edge_mapping.get(next_node_id, []): for edge in graph.edge_mapping.get(next_node_id, []):
@ -29,10 +29,7 @@ def _recursive_process(graph: Graph, next_node_id: str) -> Generator[GraphEngine
def _publish_events(graph: Graph, next_node_id: str) -> Generator[GraphEngineEvent, None, None]: def _publish_events(graph: Graph, next_node_id: str) -> Generator[GraphEngineEvent, None, None]:
route_node_state = RouteNodeState( route_node_state = RouteNodeState(node_id=next_node_id, start_at=datetime.now(timezone.utc).replace(tzinfo=None))
node_id=next_node_id,
start_at=datetime.now(timezone.utc).replace(tzinfo=None)
)
parallel_id = graph.node_parallel_mapping.get(next_node_id) parallel_id = graph.node_parallel_mapping.get(next_node_id)
parallel_start_node_id = None parallel_start_node_id = None
@ -52,10 +49,10 @@ def _publish_events(graph: Graph, next_node_id: str) -> Generator[GraphEngineEve
node_data=mock_node_data, node_data=mock_node_data,
route_node_state=route_node_state, route_node_state=route_node_state,
parallel_id=graph.node_parallel_mapping.get(next_node_id), parallel_id=graph.node_parallel_mapping.get(next_node_id),
parallel_start_node_id=parallel_start_node_id parallel_start_node_id=parallel_start_node_id,
) )
if 'llm' in next_node_id: if "llm" in next_node_id:
length = int(next_node_id[-1]) length = int(next_node_id[-1])
for i in range(0, length): for i in range(0, length):
yield NodeRunStreamChunkEvent( yield NodeRunStreamChunkEvent(
@ -67,7 +64,7 @@ def _publish_events(graph: Graph, next_node_id: str) -> Generator[GraphEngineEve
route_node_state=route_node_state, route_node_state=route_node_state,
from_variable_selector=[next_node_id, "text"], from_variable_selector=[next_node_id, "text"],
parallel_id=parallel_id, parallel_id=parallel_id,
parallel_start_node_id=parallel_start_node_id parallel_start_node_id=parallel_start_node_id,
) )
route_node_state.status = RouteNodeState.Status.SUCCESS route_node_state.status = RouteNodeState.Status.SUCCESS
@ -79,7 +76,7 @@ def _publish_events(graph: Graph, next_node_id: str) -> Generator[GraphEngineEve
node_data=mock_node_data, node_data=mock_node_data,
route_node_state=route_node_state, route_node_state=route_node_state,
parallel_id=parallel_id, parallel_id=parallel_id,
parallel_start_node_id=parallel_start_node_id parallel_start_node_id=parallel_start_node_id,
) )
@ -135,90 +132,75 @@ def test_process():
"id": "llm1-source-answer-target", "id": "llm1-source-answer-target",
"source": "llm1", "source": "llm1",
"target": "answer", "target": "answer",
} },
], ],
"nodes": [ "nodes": [
{"data": {"type": "start"}, "id": "start"},
{ {
"data": { "data": {
"type": "start" "type": "llm",
}, },
"id": "start" "id": "llm1",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm1" "id": "llm2",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm2" "id": "llm3",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm3" "id": "llm4",
}, },
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm4" "id": "llm5",
}, },
{ {
"data": { "data": {"type": "answer", "title": "answer", "answer": "a{{#llm2.text#}}b"},
"type": "llm",
},
"id": "llm5"
},
{
"data": {
"type": "answer",
"title": "answer",
"answer": "a{{#llm2.text#}}b"
},
"id": "answer", "id": "answer",
}, },
{ {
"data": { "data": {"type": "answer", "title": "answer2", "answer": "c{{#llm3.text#}}d"},
"type": "answer",
"title": "answer2",
"answer": "c{{#llm3.text#}}d"
},
"id": "answer2", "id": "answer2",
}, },
], ],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
variable_pool = VariablePool(system_variables={ variable_pool = VariablePool(
SystemVariableKey.QUERY: 'what\'s the weather in SF', system_variables={
SystemVariableKey.QUERY: "what's the weather in SF",
SystemVariableKey.FILES: [], SystemVariableKey.FILES: [],
SystemVariableKey.CONVERSATION_ID: 'abababa', SystemVariableKey.CONVERSATION_ID: "abababa",
SystemVariableKey.USER_ID: 'aaa' SystemVariableKey.USER_ID: "aaa",
}, user_inputs={}) },
user_inputs={},
answer_stream_processor = AnswerStreamProcessor(
graph=graph,
variable_pool=variable_pool
) )
answer_stream_processor = AnswerStreamProcessor(graph=graph, variable_pool=variable_pool)
def graph_generator() -> Generator[GraphEngineEvent, None, None]: def graph_generator() -> Generator[GraphEngineEvent, None, None]:
# print("") # print("")
for event in _recursive_process(graph, "start"): for event in _recursive_process(graph, "start"):
# print("[ORIGIN]", event.__class__.__name__ + ":", event.route_node_state.node_id, # print("[ORIGIN]", event.__class__.__name__ + ":", event.route_node_state.node_id,
# " " + (event.chunk_content if isinstance(event, NodeRunStreamChunkEvent) else "")) # " " + (event.chunk_content if isinstance(event, NodeRunStreamChunkEvent) else ""))
if isinstance(event, NodeRunSucceededEvent): if isinstance(event, NodeRunSucceededEvent):
if 'llm' in event.route_node_state.node_id: if "llm" in event.route_node_state.node_id:
variable_pool.add( variable_pool.add(
[event.route_node_state.node_id, "text"], [event.route_node_state.node_id, "text"],
"".join(str(i) for i in range(0, int(event.route_node_state.node_id[-1]))) "".join(str(i) for i in range(0, int(event.route_node_state.node_id[-1]))),
) )
yield event yield event

View File

@ -17,41 +17,43 @@ from models.workflow import WorkflowNodeExecutionStatus, WorkflowType
def test_run(): def test_run():
graph_config = { graph_config = {
"edges": [{ "edges": [
{
"id": "start-source-pe-target", "id": "start-source-pe-target",
"source": "start", "source": "start",
"target": "pe", "target": "pe",
}, { },
{
"id": "iteration-1-source-answer-3-target", "id": "iteration-1-source-answer-3-target",
"source": "iteration-1", "source": "iteration-1",
"target": "answer-3", "target": "answer-3",
}, { },
{
"id": "tt-source-if-else-target", "id": "tt-source-if-else-target",
"source": "tt", "source": "tt",
"target": "if-else", "target": "if-else",
}, { },
{
"id": "if-else-true-answer-2-target", "id": "if-else-true-answer-2-target",
"source": "if-else", "source": "if-else",
"sourceHandle": "true", "sourceHandle": "true",
"target": "answer-2", "target": "answer-2",
}, { },
{
"id": "if-else-false-answer-4-target", "id": "if-else-false-answer-4-target",
"source": "if-else", "source": "if-else",
"sourceHandle": "false", "sourceHandle": "false",
"target": "answer-4", "target": "answer-4",
}, { },
{
"id": "pe-source-iteration-1-target", "id": "pe-source-iteration-1-target",
"source": "pe", "source": "pe",
"target": "iteration-1", "target": "iteration-1",
}],
"nodes": [{
"data": {
"title": "Start",
"type": "start",
"variables": []
}, },
"id": "start" ],
}, { "nodes": [
{"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"},
{
"data": { "data": {
"iterator_selector": ["pe", "list_output"], "iterator_selector": ["pe", "list_output"],
"output_selector": ["tt", "output"], "output_selector": ["tt", "output"],
@ -62,114 +64,105 @@ def test_run():
"type": "iteration", "type": "iteration",
}, },
"id": "iteration-1", "id": "iteration-1",
}, { },
{
"data": { "data": {
"answer": "{{#tt.output#}}", "answer": "{{#tt.output#}}",
"iteration_id": "iteration-1", "iteration_id": "iteration-1",
"title": "answer 2", "title": "answer 2",
"type": "answer" "type": "answer",
}, },
"id": "answer-2" "id": "answer-2",
}, { },
{
"data": { "data": {
"iteration_id": "iteration-1", "iteration_id": "iteration-1",
"template": "{{ arg1 }} 123", "template": "{{ arg1 }} 123",
"title": "template transform", "title": "template transform",
"type": "template-transform", "type": "template-transform",
"variables": [{ "variables": [{"value_selector": ["sys", "query"], "variable": "arg1"}],
"value_selector": ["sys", "query"],
"variable": "arg1"
}]
}, },
"id": "tt", "id": "tt",
}, {
"data": {
"answer": "{{#iteration-1.output#}}88888",
"title": "answer 3",
"type": "answer"
}, },
{
"data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer"},
"id": "answer-3", "id": "answer-3",
}, { },
{
"data": { "data": {
"conditions": [{ "conditions": [
{
"comparison_operator": "is", "comparison_operator": "is",
"id": "1721916275284", "id": "1721916275284",
"value": "hi", "value": "hi",
"variable_selector": ["sys", "query"] "variable_selector": ["sys", "query"],
}], }
],
"iteration_id": "iteration-1", "iteration_id": "iteration-1",
"logical_operator": "and", "logical_operator": "and",
"title": "if", "title": "if",
"type": "if-else" "type": "if-else",
}, },
"id": "if-else", "id": "if-else",
}, {
"data": {
"answer": "no hi",
"iteration_id": "iteration-1",
"title": "answer 4",
"type": "answer"
}, },
{
"data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer"},
"id": "answer-4", "id": "answer-4",
}, { },
{
"data": { "data": {
"instruction": "test1", "instruction": "test1",
"model": { "model": {
"completion_params": { "completion_params": {"temperature": 0.7},
"temperature": 0.7
},
"mode": "chat", "mode": "chat",
"name": "gpt-4o", "name": "gpt-4o",
"provider": "openai" "provider": "openai",
}, },
"parameters": [{ "parameters": [
"description": "test", {"description": "test", "name": "list_output", "required": False, "type": "array[string]"}
"name": "list_output", ],
"required": False,
"type": "array[string]"
}],
"query": ["sys", "query"], "query": ["sys", "query"],
"reasoning_mode": "prompt", "reasoning_mode": "prompt",
"title": "pe", "title": "pe",
"type": "parameter-extractor" "type": "parameter-extractor",
}, },
"id": "pe", "id": "pe",
}] },
],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
init_params = GraphInitParams( init_params = GraphInitParams(
tenant_id='1', tenant_id="1",
app_id='1', app_id="1",
workflow_type=WorkflowType.CHAT, workflow_type=WorkflowType.CHAT,
workflow_id='1', workflow_id="1",
graph_config=graph_config, graph_config=graph_config,
user_id='1', user_id="1",
user_from=UserFrom.ACCOUNT, user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER, invoke_from=InvokeFrom.DEBUGGER,
call_depth=0 call_depth=0,
) )
# construct variable pool # construct variable pool
pool = VariablePool(system_variables={ pool = VariablePool(
SystemVariableKey.QUERY: 'dify', system_variables={
SystemVariableKey.QUERY: "dify",
SystemVariableKey.FILES: [], SystemVariableKey.FILES: [],
SystemVariableKey.CONVERSATION_ID: 'abababa', SystemVariableKey.CONVERSATION_ID: "abababa",
SystemVariableKey.USER_ID: '1' SystemVariableKey.USER_ID: "1",
}, user_inputs={}, environment_variables=[]) },
pool.add(['pe', 'list_output'], ["dify-1", "dify-2"]) user_inputs={},
environment_variables=[],
)
pool.add(["pe", "list_output"], ["dify-1", "dify-2"])
iteration_node = IterationNode( iteration_node = IterationNode(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
graph_init_params=init_params, graph_init_params=init_params,
graph=graph, graph=graph,
graph_runtime_state=GraphRuntimeState( graph_runtime_state=GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter()),
variable_pool=pool,
start_at=time.perf_counter()
),
config={ config={
"data": { "data": {
"iterator_selector": ["pe", "list_output"], "iterator_selector": ["pe", "list_output"],
@ -181,23 +174,19 @@ def test_run():
"type": "iteration", "type": "iteration",
}, },
"id": "iteration-1", "id": "iteration-1",
} },
) )
def tt_generator(self): def tt_generator(self):
return NodeRunResult( return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED, status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs={ inputs={"iterator_selector": "dify"},
'iterator_selector': 'dify' outputs={"output": "dify 123"},
},
outputs={
'output': 'dify 123'
}
) )
# print("") # print("")
with patch.object(TemplateTransformNode, '_run', new=tt_generator): with patch.object(TemplateTransformNode, "_run", new=tt_generator):
# execute node # execute node
result = iteration_node._run() result = iteration_node._run()
@ -214,45 +203,48 @@ def test_run():
def test_run_parallel(): def test_run_parallel():
graph_config = { graph_config = {
"edges": [{ "edges": [
{
"id": "start-source-pe-target", "id": "start-source-pe-target",
"source": "start", "source": "start",
"target": "pe", "target": "pe",
}, { },
{
"id": "iteration-1-source-answer-3-target", "id": "iteration-1-source-answer-3-target",
"source": "iteration-1", "source": "iteration-1",
"target": "answer-3", "target": "answer-3",
}, { },
{
"id": "tt-source-if-else-target", "id": "tt-source-if-else-target",
"source": "tt", "source": "tt",
"target": "if-else", "target": "if-else",
}, { },
{
"id": "tt-2-source-if-else-target", "id": "tt-2-source-if-else-target",
"source": "tt-2", "source": "tt-2",
"target": "if-else", "target": "if-else",
}, { },
{
"id": "if-else-true-answer-2-target", "id": "if-else-true-answer-2-target",
"source": "if-else", "source": "if-else",
"sourceHandle": "true", "sourceHandle": "true",
"target": "answer-2", "target": "answer-2",
}, { },
{
"id": "if-else-false-answer-4-target", "id": "if-else-false-answer-4-target",
"source": "if-else", "source": "if-else",
"sourceHandle": "false", "sourceHandle": "false",
"target": "answer-4", "target": "answer-4",
}, { },
{
"id": "pe-source-iteration-1-target", "id": "pe-source-iteration-1-target",
"source": "pe", "source": "pe",
"target": "iteration-1", "target": "iteration-1",
}],
"nodes": [{
"data": {
"title": "Start",
"type": "start",
"variables": []
}, },
"id": "start" ],
}, { "nodes": [
{"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"},
{
"data": { "data": {
"iterator_selector": ["pe", "list_output"], "iterator_selector": ["pe", "list_output"],
"output_selector": ["tt", "output"], "output_selector": ["tt", "output"],
@ -263,128 +255,117 @@ def test_run_parallel():
"type": "iteration", "type": "iteration",
}, },
"id": "iteration-1", "id": "iteration-1",
}, { },
{
"data": { "data": {
"answer": "{{#tt.output#}}", "answer": "{{#tt.output#}}",
"iteration_id": "iteration-1", "iteration_id": "iteration-1",
"title": "answer 2", "title": "answer 2",
"type": "answer" "type": "answer",
}, },
"id": "answer-2" "id": "answer-2",
}, { },
{
"data": { "data": {
"iteration_id": "iteration-1", "iteration_id": "iteration-1",
"start_node_in_iteration": True, "start_node_in_iteration": True,
"template": "{{ arg1 }} 123", "template": "{{ arg1 }} 123",
"title": "template transform", "title": "template transform",
"type": "template-transform", "type": "template-transform",
"variables": [{ "variables": [{"value_selector": ["sys", "query"], "variable": "arg1"}],
"value_selector": ["sys", "query"],
"variable": "arg1"
}]
}, },
"id": "tt", "id": "tt",
},{ },
{
"data": { "data": {
"iteration_id": "iteration-1", "iteration_id": "iteration-1",
"start_node_in_iteration": True, "start_node_in_iteration": True,
"template": "{{ arg1 }} 321", "template": "{{ arg1 }} 321",
"title": "template transform", "title": "template transform",
"type": "template-transform", "type": "template-transform",
"variables": [{ "variables": [{"value_selector": ["sys", "query"], "variable": "arg1"}],
"value_selector": ["sys", "query"],
"variable": "arg1"
}]
}, },
"id": "tt-2", "id": "tt-2",
}, {
"data": {
"answer": "{{#iteration-1.output#}}88888",
"title": "answer 3",
"type": "answer"
}, },
{
"data": {"answer": "{{#iteration-1.output#}}88888", "title": "answer 3", "type": "answer"},
"id": "answer-3", "id": "answer-3",
}, { },
{
"data": { "data": {
"conditions": [{ "conditions": [
{
"comparison_operator": "is", "comparison_operator": "is",
"id": "1721916275284", "id": "1721916275284",
"value": "hi", "value": "hi",
"variable_selector": ["sys", "query"] "variable_selector": ["sys", "query"],
}], }
],
"iteration_id": "iteration-1", "iteration_id": "iteration-1",
"logical_operator": "and", "logical_operator": "and",
"title": "if", "title": "if",
"type": "if-else" "type": "if-else",
}, },
"id": "if-else", "id": "if-else",
}, {
"data": {
"answer": "no hi",
"iteration_id": "iteration-1",
"title": "answer 4",
"type": "answer"
}, },
{
"data": {"answer": "no hi", "iteration_id": "iteration-1", "title": "answer 4", "type": "answer"},
"id": "answer-4", "id": "answer-4",
}, { },
{
"data": { "data": {
"instruction": "test1", "instruction": "test1",
"model": { "model": {
"completion_params": { "completion_params": {"temperature": 0.7},
"temperature": 0.7
},
"mode": "chat", "mode": "chat",
"name": "gpt-4o", "name": "gpt-4o",
"provider": "openai" "provider": "openai",
}, },
"parameters": [{ "parameters": [
"description": "test", {"description": "test", "name": "list_output", "required": False, "type": "array[string]"}
"name": "list_output", ],
"required": False,
"type": "array[string]"
}],
"query": ["sys", "query"], "query": ["sys", "query"],
"reasoning_mode": "prompt", "reasoning_mode": "prompt",
"title": "pe", "title": "pe",
"type": "parameter-extractor" "type": "parameter-extractor",
}, },
"id": "pe", "id": "pe",
}] },
],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
init_params = GraphInitParams( init_params = GraphInitParams(
tenant_id='1', tenant_id="1",
app_id='1', app_id="1",
workflow_type=WorkflowType.CHAT, workflow_type=WorkflowType.CHAT,
workflow_id='1', workflow_id="1",
graph_config=graph_config, graph_config=graph_config,
user_id='1', user_id="1",
user_from=UserFrom.ACCOUNT, user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER, invoke_from=InvokeFrom.DEBUGGER,
call_depth=0 call_depth=0,
) )
# construct variable pool # construct variable pool
pool = VariablePool(system_variables={ pool = VariablePool(
SystemVariableKey.QUERY: 'dify', system_variables={
SystemVariableKey.QUERY: "dify",
SystemVariableKey.FILES: [], SystemVariableKey.FILES: [],
SystemVariableKey.CONVERSATION_ID: 'abababa', SystemVariableKey.CONVERSATION_ID: "abababa",
SystemVariableKey.USER_ID: '1' SystemVariableKey.USER_ID: "1",
}, user_inputs={}, environment_variables=[]) },
pool.add(['pe', 'list_output'], ["dify-1", "dify-2"]) user_inputs={},
environment_variables=[],
)
pool.add(["pe", "list_output"], ["dify-1", "dify-2"])
iteration_node = IterationNode( iteration_node = IterationNode(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
graph_init_params=init_params, graph_init_params=init_params,
graph=graph, graph=graph,
graph_runtime_state=GraphRuntimeState( graph_runtime_state=GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter()),
variable_pool=pool,
start_at=time.perf_counter()
),
config={ config={
"data": { "data": {
"iterator_selector": ["pe", "list_output"], "iterator_selector": ["pe", "list_output"],
@ -395,23 +376,19 @@ def test_run_parallel():
"type": "iteration", "type": "iteration",
}, },
"id": "iteration-1", "id": "iteration-1",
} },
) )
def tt_generator(self): def tt_generator(self):
return NodeRunResult( return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED, status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs={ inputs={"iterator_selector": "dify"},
'iterator_selector': 'dify' outputs={"output": "dify 123"},
},
outputs={
'output': 'dify 123'
}
) )
# print("") # print("")
with patch.object(TemplateTransformNode, '_run', new=tt_generator): with patch.object(TemplateTransformNode, "_run", new=tt_generator):
# execute node # execute node
result = iteration_node._run() result = iteration_node._run()

View File

@ -24,60 +24,47 @@ def test_execute_answer():
}, },
], ],
"nodes": [ "nodes": [
{"data": {"type": "start"}, "id": "start"},
{ {
"data": { "data": {
"type": "start" "title": "123",
"type": "answer",
"answer": "Today's weather is {{#start.weather#}}\n{{#llm.text#}}\n{{img}}\nFin.",
}, },
"id": "start" "id": "answer",
}, },
{ ],
"data": {
'title': '123',
'type': 'answer',
'answer': 'Today\'s weather is {{#start.weather#}}\n{{#llm.text#}}\n{{img}}\nFin.'
},
"id": "answer"
},
]
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
init_params = GraphInitParams( init_params = GraphInitParams(
tenant_id='1', tenant_id="1",
app_id='1', app_id="1",
workflow_type=WorkflowType.WORKFLOW, workflow_type=WorkflowType.WORKFLOW,
workflow_id='1', workflow_id="1",
graph_config=graph_config, graph_config=graph_config,
user_id='1', user_id="1",
user_from=UserFrom.ACCOUNT, user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER, invoke_from=InvokeFrom.DEBUGGER,
call_depth=0 call_depth=0,
) )
# construct variable pool # construct variable pool
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables={ system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"},
SystemVariableKey.FILES: [],
SystemVariableKey.USER_ID: 'aaa'
},
user_inputs={}, user_inputs={},
environment_variables=[], environment_variables=[],
conversation_variables=[], conversation_variables=[],
) )
variable_pool.add(['start', 'weather'], 'sunny') variable_pool.add(["start", "weather"], "sunny")
variable_pool.add(['llm', 'text'], 'You are a helpful AI.') variable_pool.add(["llm", "text"], "You are a helpful AI.")
node = AnswerNode( node = AnswerNode(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
graph_init_params=init_params, graph_init_params=init_params,
graph=graph, graph=graph,
graph_runtime_state=GraphRuntimeState( graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()),
variable_pool=variable_pool,
start_at=time.perf_counter()
),
config={ config={
"id": "answer", "id": "answer",
"data": { "data": {

View File

@ -15,65 +15,49 @@ from models.workflow import WorkflowNodeExecutionStatus, WorkflowType
def test_execute_if_else_result_true(): def test_execute_if_else_result_true():
graph_config = { graph_config = {"edges": [], "nodes": [{"data": {"type": "start"}, "id": "start"}]}
"edges": [],
"nodes": [
{
"data": {
"type": "start"
},
"id": "start"
}
]
}
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
init_params = GraphInitParams( init_params = GraphInitParams(
tenant_id='1', tenant_id="1",
app_id='1', app_id="1",
workflow_type=WorkflowType.WORKFLOW, workflow_type=WorkflowType.WORKFLOW,
workflow_id='1', workflow_id="1",
graph_config=graph_config, graph_config=graph_config,
user_id='1', user_id="1",
user_from=UserFrom.ACCOUNT, user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER, invoke_from=InvokeFrom.DEBUGGER,
call_depth=0 call_depth=0,
) )
# construct variable pool # construct variable pool
pool = VariablePool(system_variables={ pool = VariablePool(
SystemVariableKey.FILES: [], system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"}, user_inputs={}
SystemVariableKey.USER_ID: 'aaa' )
}, user_inputs={}) pool.add(["start", "array_contains"], ["ab", "def"])
pool.add(['start', 'array_contains'], ['ab', 'def']) pool.add(["start", "array_not_contains"], ["ac", "def"])
pool.add(['start', 'array_not_contains'], ['ac', 'def']) pool.add(["start", "contains"], "cabcde")
pool.add(['start', 'contains'], 'cabcde') pool.add(["start", "not_contains"], "zacde")
pool.add(['start', 'not_contains'], 'zacde') pool.add(["start", "start_with"], "abc")
pool.add(['start', 'start_with'], 'abc') pool.add(["start", "end_with"], "zzab")
pool.add(['start', 'end_with'], 'zzab') pool.add(["start", "is"], "ab")
pool.add(['start', 'is'], 'ab') pool.add(["start", "is_not"], "aab")
pool.add(['start', 'is_not'], 'aab') pool.add(["start", "empty"], "")
pool.add(['start', 'empty'], '') pool.add(["start", "not_empty"], "aaa")
pool.add(['start', 'not_empty'], 'aaa') pool.add(["start", "equals"], 22)
pool.add(['start', 'equals'], 22) pool.add(["start", "not_equals"], 23)
pool.add(['start', 'not_equals'], 23) pool.add(["start", "greater_than"], 23)
pool.add(['start', 'greater_than'], 23) pool.add(["start", "less_than"], 21)
pool.add(['start', 'less_than'], 21) pool.add(["start", "greater_than_or_equal"], 22)
pool.add(['start', 'greater_than_or_equal'], 22) pool.add(["start", "less_than_or_equal"], 21)
pool.add(['start', 'less_than_or_equal'], 21) pool.add(["start", "not_null"], "1212")
pool.add(['start', 'not_null'], '1212')
node = IfElseNode( node = IfElseNode(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
graph_init_params=init_params, graph_init_params=init_params,
graph=graph, graph=graph,
graph_runtime_state=GraphRuntimeState( graph_runtime_state=GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter()),
variable_pool=pool,
start_at=time.perf_counter()
),
config={ config={
"id": "if-else", "id": "if-else",
"data": { "data": {
@ -140,53 +124,44 @@ def test_execute_if_else_result_false():
}, },
], ],
"nodes": [ "nodes": [
{ {"data": {"type": "start"}, "id": "start"},
"data": {
"type": "start"
},
"id": "start"
},
{ {
"data": { "data": {
"type": "llm", "type": "llm",
}, },
"id": "llm" "id": "llm",
}, },
] ],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
init_params = GraphInitParams( init_params = GraphInitParams(
tenant_id='1', tenant_id="1",
app_id='1', app_id="1",
workflow_type=WorkflowType.WORKFLOW, workflow_type=WorkflowType.WORKFLOW,
workflow_id='1', workflow_id="1",
graph_config=graph_config, graph_config=graph_config,
user_id='1', user_id="1",
user_from=UserFrom.ACCOUNT, user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER, invoke_from=InvokeFrom.DEBUGGER,
call_depth=0 call_depth=0,
) )
# construct variable pool # construct variable pool
pool = VariablePool(system_variables={ pool = VariablePool(
SystemVariableKey.FILES: [], system_variables={SystemVariableKey.FILES: [], SystemVariableKey.USER_ID: "aaa"},
SystemVariableKey.USER_ID: 'aaa' user_inputs={},
}, user_inputs={}, environment_variables=[]) environment_variables=[],
pool.add(['start', 'array_contains'], ['1ab', 'def']) )
pool.add(['start', 'array_not_contains'], ['ab', 'def']) pool.add(["start", "array_contains"], ["1ab", "def"])
pool.add(["start", "array_not_contains"], ["ab", "def"])
node = IfElseNode( node = IfElseNode(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
graph_init_params=init_params, graph_init_params=init_params,
graph=graph, graph=graph,
graph_runtime_state=GraphRuntimeState( graph_runtime_state=GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter()),
variable_pool=pool,
start_at=time.perf_counter()
),
config={ config={
"id": "if-else", "id": "if-else",
"data": { "data": {

View File

@ -27,35 +27,28 @@ def test_overwrite_string_variable():
}, },
], ],
"nodes": [ "nodes": [
{ {"data": {"type": "start"}, "id": "start"},
"data": {
"type": "start"
},
"id": "start"
},
{ {
"data": { "data": {
"type": "assigner", "type": "assigner",
}, },
"id": "assigner" "id": "assigner",
}, },
] ],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
init_params = GraphInitParams( init_params = GraphInitParams(
tenant_id='1', tenant_id="1",
app_id='1', app_id="1",
workflow_type=WorkflowType.WORKFLOW, workflow_type=WorkflowType.WORKFLOW,
workflow_id='1', workflow_id="1",
graph_config=graph_config, graph_config=graph_config,
user_id='1', user_id="1",
user_from=UserFrom.ACCOUNT, user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER, invoke_from=InvokeFrom.DEBUGGER,
call_depth=0 call_depth=0,
) )
conversation_variable = StringVariable( conversation_variable = StringVariable(
@ -87,21 +80,18 @@ def test_overwrite_string_variable():
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
graph_init_params=init_params, graph_init_params=init_params,
graph=graph, graph=graph,
graph_runtime_state=GraphRuntimeState( graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()),
variable_pool=variable_pool,
start_at=time.perf_counter()
),
config={ config={
'id': 'node_id', "id": "node_id",
'data': { "data": {
'assigned_variable_selector': ['conversation', conversation_variable.name], "assigned_variable_selector": ["conversation", conversation_variable.name],
'write_mode': WriteMode.OVER_WRITE.value, "write_mode": WriteMode.OVER_WRITE.value,
'input_variable_selector': [DEFAULT_NODE_ID, input_variable.name], "input_variable_selector": [DEFAULT_NODE_ID, input_variable.name],
}, },
}, },
) )
with mock.patch('core.workflow.nodes.variable_assigner.node.update_conversation_variable') as mock_run: with mock.patch("core.workflow.nodes.variable_assigner.node.update_conversation_variable") as mock_run:
list(node.run()) list(node.run())
mock_run.assert_called_once() mock_run.assert_called_once()
@ -121,35 +111,28 @@ def test_append_variable_to_array():
}, },
], ],
"nodes": [ "nodes": [
{ {"data": {"type": "start"}, "id": "start"},
"data": {
"type": "start"
},
"id": "start"
},
{ {
"data": { "data": {
"type": "assigner", "type": "assigner",
}, },
"id": "assigner" "id": "assigner",
}, },
] ],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
init_params = GraphInitParams( init_params = GraphInitParams(
tenant_id='1', tenant_id="1",
app_id='1', app_id="1",
workflow_type=WorkflowType.WORKFLOW, workflow_type=WorkflowType.WORKFLOW,
workflow_id='1', workflow_id="1",
graph_config=graph_config, graph_config=graph_config,
user_id='1', user_id="1",
user_from=UserFrom.ACCOUNT, user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER, invoke_from=InvokeFrom.DEBUGGER,
call_depth=0 call_depth=0,
) )
conversation_variable = ArrayStringVariable( conversation_variable = ArrayStringVariable(
@ -179,21 +162,18 @@ def test_append_variable_to_array():
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
graph_init_params=init_params, graph_init_params=init_params,
graph=graph, graph=graph,
graph_runtime_state=GraphRuntimeState( graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()),
variable_pool=variable_pool,
start_at=time.perf_counter()
),
config={ config={
'id': 'node_id', "id": "node_id",
'data': { "data": {
'assigned_variable_selector': ['conversation', conversation_variable.name], "assigned_variable_selector": ["conversation", conversation_variable.name],
'write_mode': WriteMode.APPEND.value, "write_mode": WriteMode.APPEND.value,
'input_variable_selector': [DEFAULT_NODE_ID, input_variable.name], "input_variable_selector": [DEFAULT_NODE_ID, input_variable.name],
}, },
}, },
) )
with mock.patch('core.workflow.nodes.variable_assigner.node.update_conversation_variable') as mock_run: with mock.patch("core.workflow.nodes.variable_assigner.node.update_conversation_variable") as mock_run:
list(node.run()) list(node.run())
mock_run.assert_called_once() mock_run.assert_called_once()
@ -212,35 +192,28 @@ def test_clear_array():
}, },
], ],
"nodes": [ "nodes": [
{ {"data": {"type": "start"}, "id": "start"},
"data": {
"type": "start"
},
"id": "start"
},
{ {
"data": { "data": {
"type": "assigner", "type": "assigner",
}, },
"id": "assigner" "id": "assigner",
}, },
] ],
} }
graph = Graph.init( graph = Graph.init(graph_config=graph_config)
graph_config=graph_config
)
init_params = GraphInitParams( init_params = GraphInitParams(
tenant_id='1', tenant_id="1",
app_id='1', app_id="1",
workflow_type=WorkflowType.WORKFLOW, workflow_type=WorkflowType.WORKFLOW,
workflow_id='1', workflow_id="1",
graph_config=graph_config, graph_config=graph_config,
user_id='1', user_id="1",
user_from=UserFrom.ACCOUNT, user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER, invoke_from=InvokeFrom.DEBUGGER,
call_depth=0 call_depth=0,
) )
conversation_variable = ArrayStringVariable( conversation_variable = ArrayStringVariable(
@ -250,7 +223,7 @@ def test_clear_array():
) )
variable_pool = VariablePool( variable_pool = VariablePool(
system_variables={SystemVariableKey.CONVERSATION_ID: 'conversation_id'}, system_variables={SystemVariableKey.CONVERSATION_ID: "conversation_id"},
user_inputs={}, user_inputs={},
environment_variables=[], environment_variables=[],
conversation_variables=[conversation_variable], conversation_variables=[conversation_variable],
@ -260,10 +233,7 @@ def test_clear_array():
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
graph_init_params=init_params, graph_init_params=init_params,
graph=graph, graph=graph,
graph_runtime_state=GraphRuntimeState( graph_runtime_state=GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()),
variable_pool=variable_pool,
start_at=time.perf_counter()
),
config={ config={
"id": "node_id", "id": "node_id",
"data": { "data": {
@ -274,7 +244,7 @@ def test_clear_array():
}, },
) )
with mock.patch('core.workflow.nodes.variable_assigner.node.update_conversation_variable') as mock_run: with mock.patch("core.workflow.nodes.variable_assigner.node.update_conversation_variable") as mock_run:
list(node.run()) list(node.run())
mock_run.assert_called_once() mock_run.assert_called_once()