From 3eb51d85dafc0c55dac55de6a972796e628d4eed Mon Sep 17 00:00:00 2001 From: -LAN- Date: Mon, 25 Nov 2024 12:46:50 +0800 Subject: [PATCH] fix(workflow_entry): Support receive File and FileList in single step run. (#10947) Signed-off-by: -LAN- Co-authored-by: JzoNg --- .../app/apps/advanced_chat/app_generator.py | 2 +- api/core/app/apps/agent_chat/app_generator.py | 2 +- api/core/app/apps/base_app_generator.py | 14 +-- api/core/app/apps/chat/app_generator.py | 2 +- api/core/app/apps/completion/app_generator.py | 4 +- api/core/app/apps/workflow/app_generator.py | 4 +- api/core/app/apps/workflow_app_runner.py | 3 - api/core/workflow/entities/node_entities.py | 2 +- api/core/workflow/workflow_entry.py | 97 +++++++------------ api/factories/file_factory.py | 10 +- api/services/workflow_service.py | 24 +++-- .../components/before-run-form/index.tsx | 6 ++ 12 files changed, 75 insertions(+), 95 deletions(-) diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index 00e5a74732..c099b29c97 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -127,7 +127,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): conversation_id=conversation.id if conversation else None, inputs=conversation.inputs if conversation - else self._prepare_user_inputs(user_inputs=inputs, app_config=app_config), + else self._prepare_user_inputs(user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.id), query=query, files=file_objs, parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL, diff --git a/api/core/app/apps/agent_chat/app_generator.py b/api/core/app/apps/agent_chat/app_generator.py index d1564a260e..ae0a01e8a7 100644 --- a/api/core/app/apps/agent_chat/app_generator.py +++ b/api/core/app/apps/agent_chat/app_generator.py @@ -134,7 +134,7 @@ class AgentChatAppGenerator(MessageBasedAppGenerator): conversation_id=conversation.id if conversation else None, inputs=conversation.inputs if conversation - else self._prepare_user_inputs(user_inputs=inputs, app_config=app_config), + else self._prepare_user_inputs(user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.id), query=query, files=file_objs, parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL, diff --git a/api/core/app/apps/base_app_generator.py b/api/core/app/apps/base_app_generator.py index 2c78d95778..85b7aced55 100644 --- a/api/core/app/apps/base_app_generator.py +++ b/api/core/app/apps/base_app_generator.py @@ -1,4 +1,4 @@ -from collections.abc import Mapping +from collections.abc import Mapping, Sequence from typing import TYPE_CHECKING, Any, Optional from core.app.app_config.entities import VariableEntityType @@ -6,7 +6,7 @@ from core.file import File, FileUploadConfig from factories import file_factory if TYPE_CHECKING: - from core.app.app_config.entities import AppConfig, VariableEntity + from core.app.app_config.entities import VariableEntity class BaseAppGenerator: @@ -14,23 +14,23 @@ class BaseAppGenerator: self, *, user_inputs: Optional[Mapping[str, Any]], - app_config: "AppConfig", + variables: Sequence["VariableEntity"], + tenant_id: str, ) -> Mapping[str, Any]: user_inputs = user_inputs or {} # Filter input variables from form configuration, handle required fields, default values, and option values - variables = app_config.variables user_inputs = { var.variable: self._validate_inputs(value=user_inputs.get(var.variable), variable_entity=var) for var in variables } user_inputs = {k: self._sanitize_value(v) for k, v in user_inputs.items()} # Convert files in inputs to File - entity_dictionary = {item.variable: item for item in app_config.variables} + entity_dictionary = {item.variable: item for item in variables} # Convert single file to File files_inputs = { k: file_factory.build_from_mapping( mapping=v, - tenant_id=app_config.tenant_id, + tenant_id=tenant_id, config=FileUploadConfig( allowed_file_types=entity_dictionary[k].allowed_file_types, allowed_file_extensions=entity_dictionary[k].allowed_file_extensions, @@ -44,7 +44,7 @@ class BaseAppGenerator: file_list_inputs = { k: file_factory.build_from_mappings( mappings=v, - tenant_id=app_config.tenant_id, + tenant_id=tenant_id, config=FileUploadConfig( allowed_file_types=entity_dictionary[k].allowed_file_types, allowed_file_extensions=entity_dictionary[k].allowed_file_extensions, diff --git a/api/core/app/apps/chat/app_generator.py b/api/core/app/apps/chat/app_generator.py index e683dfef3f..cf6aae34f8 100644 --- a/api/core/app/apps/chat/app_generator.py +++ b/api/core/app/apps/chat/app_generator.py @@ -132,7 +132,7 @@ class ChatAppGenerator(MessageBasedAppGenerator): conversation_id=conversation.id if conversation else None, inputs=conversation.inputs if conversation - else self._prepare_user_inputs(user_inputs=inputs, app_config=app_config), + else self._prepare_user_inputs(user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.id), query=query, files=file_objs, parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL, diff --git a/api/core/app/apps/completion/app_generator.py b/api/core/app/apps/completion/app_generator.py index 22ee8b0967..70434f24c1 100644 --- a/api/core/app/apps/completion/app_generator.py +++ b/api/core/app/apps/completion/app_generator.py @@ -113,7 +113,9 @@ class CompletionAppGenerator(MessageBasedAppGenerator): app_config=app_config, model_conf=ModelConfigConverter.convert(app_config), file_upload_config=file_extra_config, - inputs=self._prepare_user_inputs(user_inputs=inputs, app_config=app_config), + inputs=self._prepare_user_inputs( + user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.id + ), query=query, files=file_objs, user_id=user.id, diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index 65da39b220..31efe43412 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -96,7 +96,9 @@ class WorkflowAppGenerator(BaseAppGenerator): task_id=str(uuid.uuid4()), app_config=app_config, file_upload_config=file_extra_config, - inputs=self._prepare_user_inputs(user_inputs=inputs, app_config=app_config), + inputs=self._prepare_user_inputs( + user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id + ), files=system_files, user_id=user.id, stream=stream, diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 2872390d46..1cf72ae79e 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -43,7 +43,6 @@ from core.workflow.graph_engine.entities.event import ( ) from core.workflow.graph_engine.entities.graph import Graph from core.workflow.nodes import NodeType -from core.workflow.nodes.iteration import IterationNodeData from core.workflow.nodes.node_mapping import node_type_classes_mapping from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_database import db @@ -160,8 +159,6 @@ class WorkflowBasedAppRunner(AppRunner): user_inputs=user_inputs, variable_pool=variable_pool, tenant_id=workflow.tenant_id, - node_type=node_type, - node_data=IterationNodeData(**iteration_node_config.get("data", {})), ) return graph, variable_pool diff --git a/api/core/workflow/entities/node_entities.py b/api/core/workflow/entities/node_entities.py index 1ac64e94ef..e174d3baa0 100644 --- a/api/core/workflow/entities/node_entities.py +++ b/api/core/workflow/entities/node_entities.py @@ -36,7 +36,7 @@ class NodeRunResult(BaseModel): inputs: Optional[Mapping[str, Any]] = None # node inputs process_data: Optional[dict[str, Any]] = None # process data - outputs: Optional[dict[str, Any]] = None # node outputs + outputs: Optional[Mapping[str, Any]] = None # node outputs metadata: Optional[dict[NodeRunMetadataKey, Any]] = None # node metadata llm_usage: Optional[LLMUsage] = None # llm usage diff --git a/api/core/workflow/workflow_entry.py b/api/core/workflow/workflow_entry.py index 84b251223f..6f7b143ad6 100644 --- a/api/core/workflow/workflow_entry.py +++ b/api/core/workflow/workflow_entry.py @@ -5,10 +5,9 @@ from collections.abc import Generator, Mapping, Sequence from typing import Any, Optional, cast from configs import dify_config -from core.app.app_config.entities import FileUploadConfig from core.app.apps.base_app_queue_manager import GenerateTaskStoppedError from core.app.entities.app_invoke_entities import InvokeFrom -from core.file.models import File, FileTransferMethod, ImageConfig +from core.file.models import File from core.workflow.callbacks import WorkflowCallback from core.workflow.entities.variable_pool import VariablePool from core.workflow.errors import WorkflowNodeRunFailedError @@ -18,9 +17,8 @@ from core.workflow.graph_engine.entities.graph_init_params import GraphInitParam from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.graph_engine.graph_engine import GraphEngine from core.workflow.nodes import NodeType -from core.workflow.nodes.base import BaseNode, BaseNodeData +from core.workflow.nodes.base import BaseNode from core.workflow.nodes.event import NodeEvent -from core.workflow.nodes.llm import LLMNodeData from core.workflow.nodes.node_mapping import node_type_classes_mapping from factories import file_factory from models.enums import UserFrom @@ -115,7 +113,12 @@ class WorkflowEntry: @classmethod def single_step_run( - cls, workflow: Workflow, node_id: str, user_id: str, user_inputs: dict + cls, + *, + workflow: Workflow, + node_id: str, + user_id: str, + user_inputs: dict, ) -> tuple[BaseNode, Generator[NodeEvent | InNodeEvent, None, None]]: """ Single step run workflow node @@ -135,13 +138,9 @@ class WorkflowEntry: raise ValueError("nodes not found in workflow graph") # fetch node config from node id - node_config = None - for node in nodes: - if node.get("id") == node_id: - node_config = node - break - - if not node_config: + try: + node_config = next(filter(lambda node: node["id"] == node_id, nodes)) + except StopIteration: raise ValueError("node id not found in workflow graph") # Get node class @@ -153,11 +152,7 @@ class WorkflowEntry: raise ValueError(f"Node class not found for node type {node_type}") # init variable pool - variable_pool = VariablePool( - system_variables={}, - user_inputs={}, - environment_variables=workflow.environment_variables, - ) + variable_pool = VariablePool(environment_variables=workflow.environment_variables) # init graph graph = Graph.init(graph_config=workflow.graph_dict) @@ -183,28 +178,24 @@ class WorkflowEntry: try: # variable selector to variable mapping - try: - variable_mapping = node_cls.extract_variable_selector_to_variable_mapping( - graph_config=workflow.graph_dict, config=node_config - ) - except NotImplementedError: - variable_mapping = {} - - cls.mapping_user_inputs_to_variable_pool( - variable_mapping=variable_mapping, - user_inputs=user_inputs, - variable_pool=variable_pool, - tenant_id=workflow.tenant_id, - node_type=node_type, - node_data=node_instance.node_data, + variable_mapping = node_cls.extract_variable_selector_to_variable_mapping( + graph_config=workflow.graph_dict, config=node_config ) + except NotImplementedError: + variable_mapping = {} + cls.mapping_user_inputs_to_variable_pool( + variable_mapping=variable_mapping, + user_inputs=user_inputs, + variable_pool=variable_pool, + tenant_id=workflow.tenant_id, + ) + try: # run node generator = node_instance.run() - - return node_instance, generator except Exception as e: raise WorkflowNodeRunFailedError(node_instance=node_instance, error=str(e)) + return node_instance, generator @staticmethod def handle_special_values(value: Optional[Mapping[str, Any]]) -> Mapping[str, Any] | None: @@ -231,12 +222,11 @@ class WorkflowEntry: @classmethod def mapping_user_inputs_to_variable_pool( cls, + *, variable_mapping: Mapping[str, Sequence[str]], user_inputs: dict, variable_pool: VariablePool, tenant_id: str, - node_type: NodeType, - node_data: BaseNodeData, ) -> None: for node_variable, variable_selector in variable_mapping.items(): # fetch node id and variable key from node_variable @@ -254,40 +244,21 @@ class WorkflowEntry: # fetch variable node id from variable selector variable_node_id = variable_selector[0] variable_key_list = variable_selector[1:] - variable_key_list = cast(list[str], variable_key_list) + variable_key_list = list(variable_key_list) # get input value input_value = user_inputs.get(node_variable) if not input_value: input_value = user_inputs.get(node_variable_key) - # FIXME: temp fix for image type - if node_type == NodeType.LLM: - new_value = [] - if isinstance(input_value, list): - node_data = cast(LLMNodeData, node_data) - - detail = node_data.vision.configs.detail if node_data.vision.configs else None - - for item in input_value: - if isinstance(item, dict) and "type" in item and item["type"] == "image": - transfer_method = FileTransferMethod.value_of(item.get("transfer_method")) - mapping = { - "id": item.get("id"), - "transfer_method": transfer_method, - "upload_file_id": item.get("upload_file_id"), - "url": item.get("url"), - } - config = FileUploadConfig(image_config=ImageConfig(detail=detail) if detail else None) - file = file_factory.build_from_mapping( - mapping=mapping, - tenant_id=tenant_id, - config=config, - ) - new_value.append(file) - - if new_value: - input_value = new_value + if isinstance(input_value, dict) and "type" in input_value and "transfer_method" in input_value: + input_value = file_factory.build_from_mapping(mapping=input_value, tenant_id=tenant_id) + if ( + isinstance(input_value, list) + and all(isinstance(item, dict) for item in input_value) + and all("type" in item and "transfer_method" in item for item in input_value) + ): + input_value = file_factory.build_from_mappings(mappings=input_value, tenant_id=tenant_id) # append variable and value to variable pool variable_pool.add([variable_node_id] + variable_key_list, input_value) diff --git a/api/factories/file_factory.py b/api/factories/file_factory.py index 1c368a22ca..ad8dba8190 100644 --- a/api/factories/file_factory.py +++ b/api/factories/file_factory.py @@ -86,12 +86,9 @@ def build_from_mapping( def build_from_mappings( *, mappings: Sequence[Mapping[str, Any]], - config: FileUploadConfig | None, + config: FileUploadConfig | None = None, tenant_id: str, ) -> Sequence[File]: - if not config: - return [] - files = [ build_from_mapping( mapping=mapping, @@ -102,13 +99,14 @@ def build_from_mappings( ] if ( + config # If image config is set. - config.image_config + and config.image_config # And the number of image files exceeds the maximum limit and sum(1 for _ in (filter(lambda x: x.type == FileType.IMAGE, files))) > config.image_config.number_limits ): raise ValueError(f"Number of image files exceeds the maximum limit {config.image_config.number_limits}") - if config.number_limits and len(files) > config.number_limits: + if config and config.number_limits and len(files) > config.number_limits: raise ValueError(f"Number of files exceeds the maximum limit {config.number_limits}") return files diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index fde8673ff5..aa2babd7f7 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -262,13 +262,17 @@ class WorkflowService: if run_succeeded and node_run_result: # create workflow node execution - workflow_node_execution.inputs = json.dumps(node_run_result.inputs) if node_run_result.inputs else None - workflow_node_execution.process_data = ( - json.dumps(node_run_result.process_data) if node_run_result.process_data else None - ) - workflow_node_execution.outputs = ( - json.dumps(jsonable_encoder(node_run_result.outputs)) if node_run_result.outputs else None + inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None + process_data = ( + WorkflowEntry.handle_special_values(node_run_result.process_data) + if node_run_result.process_data + else None ) + outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None + + workflow_node_execution.inputs = json.dumps(inputs) + workflow_node_execution.process_data = json.dumps(process_data) + workflow_node_execution.outputs = json.dumps(outputs) workflow_node_execution.execution_metadata = ( json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None ) @@ -303,10 +307,10 @@ class WorkflowService: new_app = workflow_converter.convert_to_workflow( app_model=app_model, account=account, - name=args.get("name"), - icon_type=args.get("icon_type"), - icon=args.get("icon"), - icon_background=args.get("icon_background"), + name=args.get("name", "Default Name"), + icon_type=args.get("icon_type", "emoji"), + icon=args.get("icon", "🤖"), + icon_background=args.get("icon_background", "#FFEAD5"), ) return new_app diff --git a/web/app/components/workflow/nodes/_base/components/before-run-form/index.tsx b/web/app/components/workflow/nodes/_base/components/before-run-form/index.tsx index 6a3da3cf24..79d9c5b4dd 100644 --- a/web/app/components/workflow/nodes/_base/components/before-run-form/index.tsx +++ b/web/app/components/workflow/nodes/_base/components/before-run-form/index.tsx @@ -16,6 +16,7 @@ import { InputVarType, NodeRunningStatus } from '@/app/components/workflow/types import ResultPanel from '@/app/components/workflow/run/result-panel' import Toast from '@/app/components/base/toast' import { TransferMethod } from '@/types/app' +import { getProcessedFiles } from '@/app/components/base/file-uploader/utils' const i18nPrefix = 'workflow.singleRun' @@ -39,6 +40,11 @@ function formatValue(value: string | any, type: InputVarType) { return JSON.parse(item) }) } + if (type === InputVarType.multiFiles) + return getProcessedFiles(value) + + if (type === InputVarType.singleFile) + return getProcessedFiles([value])[0] return value }