fix(workflow_entry): Support receive File and FileList in single step run. (#10947)

Signed-off-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: JzoNg <jzongcode@gmail.com>
This commit is contained in:
-LAN- 2024-11-25 12:46:50 +08:00 committed by GitHub
parent 79a35c2fe6
commit 3eb51d85da
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 75 additions and 95 deletions

View File

@ -127,7 +127,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
conversation_id=conversation.id if conversation else None, conversation_id=conversation.id if conversation else None,
inputs=conversation.inputs inputs=conversation.inputs
if conversation if conversation
else self._prepare_user_inputs(user_inputs=inputs, app_config=app_config), else self._prepare_user_inputs(user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.id),
query=query, query=query,
files=file_objs, files=file_objs,
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL, parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,

View File

@ -134,7 +134,7 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
conversation_id=conversation.id if conversation else None, conversation_id=conversation.id if conversation else None,
inputs=conversation.inputs inputs=conversation.inputs
if conversation if conversation
else self._prepare_user_inputs(user_inputs=inputs, app_config=app_config), else self._prepare_user_inputs(user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.id),
query=query, query=query,
files=file_objs, files=file_objs,
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL, parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,

View File

@ -1,4 +1,4 @@
from collections.abc import Mapping from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any, Optional from typing import TYPE_CHECKING, Any, Optional
from core.app.app_config.entities import VariableEntityType from core.app.app_config.entities import VariableEntityType
@ -6,7 +6,7 @@ from core.file import File, FileUploadConfig
from factories import file_factory from factories import file_factory
if TYPE_CHECKING: if TYPE_CHECKING:
from core.app.app_config.entities import AppConfig, VariableEntity from core.app.app_config.entities import VariableEntity
class BaseAppGenerator: class BaseAppGenerator:
@ -14,23 +14,23 @@ class BaseAppGenerator:
self, self,
*, *,
user_inputs: Optional[Mapping[str, Any]], user_inputs: Optional[Mapping[str, Any]],
app_config: "AppConfig", variables: Sequence["VariableEntity"],
tenant_id: str,
) -> Mapping[str, Any]: ) -> Mapping[str, Any]:
user_inputs = user_inputs or {} user_inputs = user_inputs or {}
# Filter input variables from form configuration, handle required fields, default values, and option values # Filter input variables from form configuration, handle required fields, default values, and option values
variables = app_config.variables
user_inputs = { user_inputs = {
var.variable: self._validate_inputs(value=user_inputs.get(var.variable), variable_entity=var) var.variable: self._validate_inputs(value=user_inputs.get(var.variable), variable_entity=var)
for var in variables for var in variables
} }
user_inputs = {k: self._sanitize_value(v) for k, v in user_inputs.items()} user_inputs = {k: self._sanitize_value(v) for k, v in user_inputs.items()}
# Convert files in inputs to File # Convert files in inputs to File
entity_dictionary = {item.variable: item for item in app_config.variables} entity_dictionary = {item.variable: item for item in variables}
# Convert single file to File # Convert single file to File
files_inputs = { files_inputs = {
k: file_factory.build_from_mapping( k: file_factory.build_from_mapping(
mapping=v, mapping=v,
tenant_id=app_config.tenant_id, tenant_id=tenant_id,
config=FileUploadConfig( config=FileUploadConfig(
allowed_file_types=entity_dictionary[k].allowed_file_types, allowed_file_types=entity_dictionary[k].allowed_file_types,
allowed_file_extensions=entity_dictionary[k].allowed_file_extensions, allowed_file_extensions=entity_dictionary[k].allowed_file_extensions,
@ -44,7 +44,7 @@ class BaseAppGenerator:
file_list_inputs = { file_list_inputs = {
k: file_factory.build_from_mappings( k: file_factory.build_from_mappings(
mappings=v, mappings=v,
tenant_id=app_config.tenant_id, tenant_id=tenant_id,
config=FileUploadConfig( config=FileUploadConfig(
allowed_file_types=entity_dictionary[k].allowed_file_types, allowed_file_types=entity_dictionary[k].allowed_file_types,
allowed_file_extensions=entity_dictionary[k].allowed_file_extensions, allowed_file_extensions=entity_dictionary[k].allowed_file_extensions,

View File

@ -132,7 +132,7 @@ class ChatAppGenerator(MessageBasedAppGenerator):
conversation_id=conversation.id if conversation else None, conversation_id=conversation.id if conversation else None,
inputs=conversation.inputs inputs=conversation.inputs
if conversation if conversation
else self._prepare_user_inputs(user_inputs=inputs, app_config=app_config), else self._prepare_user_inputs(user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.id),
query=query, query=query,
files=file_objs, files=file_objs,
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL, parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,

View File

@ -113,7 +113,9 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
app_config=app_config, app_config=app_config,
model_conf=ModelConfigConverter.convert(app_config), model_conf=ModelConfigConverter.convert(app_config),
file_upload_config=file_extra_config, file_upload_config=file_extra_config,
inputs=self._prepare_user_inputs(user_inputs=inputs, app_config=app_config), inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.id
),
query=query, query=query,
files=file_objs, files=file_objs,
user_id=user.id, user_id=user.id,

View File

@ -96,7 +96,9 @@ class WorkflowAppGenerator(BaseAppGenerator):
task_id=str(uuid.uuid4()), task_id=str(uuid.uuid4()),
app_config=app_config, app_config=app_config,
file_upload_config=file_extra_config, file_upload_config=file_extra_config,
inputs=self._prepare_user_inputs(user_inputs=inputs, app_config=app_config), inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
),
files=system_files, files=system_files,
user_id=user.id, user_id=user.id,
stream=stream, stream=stream,

View File

@ -43,7 +43,6 @@ from core.workflow.graph_engine.entities.event import (
) )
from core.workflow.graph_engine.entities.graph import Graph from core.workflow.graph_engine.entities.graph import Graph
from core.workflow.nodes import NodeType from core.workflow.nodes import NodeType
from core.workflow.nodes.iteration import IterationNodeData
from core.workflow.nodes.node_mapping import node_type_classes_mapping from core.workflow.nodes.node_mapping import node_type_classes_mapping
from core.workflow.workflow_entry import WorkflowEntry from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_database import db from extensions.ext_database import db
@ -160,8 +159,6 @@ class WorkflowBasedAppRunner(AppRunner):
user_inputs=user_inputs, user_inputs=user_inputs,
variable_pool=variable_pool, variable_pool=variable_pool,
tenant_id=workflow.tenant_id, tenant_id=workflow.tenant_id,
node_type=node_type,
node_data=IterationNodeData(**iteration_node_config.get("data", {})),
) )
return graph, variable_pool return graph, variable_pool

View File

@ -36,7 +36,7 @@ class NodeRunResult(BaseModel):
inputs: Optional[Mapping[str, Any]] = None # node inputs inputs: Optional[Mapping[str, Any]] = None # node inputs
process_data: Optional[dict[str, Any]] = None # process data process_data: Optional[dict[str, Any]] = None # process data
outputs: Optional[dict[str, Any]] = None # node outputs outputs: Optional[Mapping[str, Any]] = None # node outputs
metadata: Optional[dict[NodeRunMetadataKey, Any]] = None # node metadata metadata: Optional[dict[NodeRunMetadataKey, Any]] = None # node metadata
llm_usage: Optional[LLMUsage] = None # llm usage llm_usage: Optional[LLMUsage] = None # llm usage

View File

@ -5,10 +5,9 @@ from collections.abc import Generator, Mapping, Sequence
from typing import Any, Optional, cast from typing import Any, Optional, cast
from configs import dify_config from configs import dify_config
from core.app.app_config.entities import FileUploadConfig
from core.app.apps.base_app_queue_manager import GenerateTaskStoppedError from core.app.apps.base_app_queue_manager import GenerateTaskStoppedError
from core.app.entities.app_invoke_entities import InvokeFrom from core.app.entities.app_invoke_entities import InvokeFrom
from core.file.models import File, FileTransferMethod, ImageConfig from core.file.models import File
from core.workflow.callbacks import WorkflowCallback from core.workflow.callbacks import WorkflowCallback
from core.workflow.entities.variable_pool import VariablePool from core.workflow.entities.variable_pool import VariablePool
from core.workflow.errors import WorkflowNodeRunFailedError from core.workflow.errors import WorkflowNodeRunFailedError
@ -18,9 +17,8 @@ from core.workflow.graph_engine.entities.graph_init_params import GraphInitParam
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from core.workflow.graph_engine.graph_engine import GraphEngine from core.workflow.graph_engine.graph_engine import GraphEngine
from core.workflow.nodes import NodeType from core.workflow.nodes import NodeType
from core.workflow.nodes.base import BaseNode, BaseNodeData from core.workflow.nodes.base import BaseNode
from core.workflow.nodes.event import NodeEvent from core.workflow.nodes.event import NodeEvent
from core.workflow.nodes.llm import LLMNodeData
from core.workflow.nodes.node_mapping import node_type_classes_mapping from core.workflow.nodes.node_mapping import node_type_classes_mapping
from factories import file_factory from factories import file_factory
from models.enums import UserFrom from models.enums import UserFrom
@ -115,7 +113,12 @@ class WorkflowEntry:
@classmethod @classmethod
def single_step_run( def single_step_run(
cls, workflow: Workflow, node_id: str, user_id: str, user_inputs: dict cls,
*,
workflow: Workflow,
node_id: str,
user_id: str,
user_inputs: dict,
) -> tuple[BaseNode, Generator[NodeEvent | InNodeEvent, None, None]]: ) -> tuple[BaseNode, Generator[NodeEvent | InNodeEvent, None, None]]:
""" """
Single step run workflow node Single step run workflow node
@ -135,13 +138,9 @@ class WorkflowEntry:
raise ValueError("nodes not found in workflow graph") raise ValueError("nodes not found in workflow graph")
# fetch node config from node id # fetch node config from node id
node_config = None try:
for node in nodes: node_config = next(filter(lambda node: node["id"] == node_id, nodes))
if node.get("id") == node_id: except StopIteration:
node_config = node
break
if not node_config:
raise ValueError("node id not found in workflow graph") raise ValueError("node id not found in workflow graph")
# Get node class # Get node class
@ -153,11 +152,7 @@ class WorkflowEntry:
raise ValueError(f"Node class not found for node type {node_type}") raise ValueError(f"Node class not found for node type {node_type}")
# init variable pool # init variable pool
variable_pool = VariablePool( variable_pool = VariablePool(environment_variables=workflow.environment_variables)
system_variables={},
user_inputs={},
environment_variables=workflow.environment_variables,
)
# init graph # init graph
graph = Graph.init(graph_config=workflow.graph_dict) graph = Graph.init(graph_config=workflow.graph_dict)
@ -183,28 +178,24 @@ class WorkflowEntry:
try: try:
# variable selector to variable mapping # variable selector to variable mapping
try: variable_mapping = node_cls.extract_variable_selector_to_variable_mapping(
variable_mapping = node_cls.extract_variable_selector_to_variable_mapping( graph_config=workflow.graph_dict, config=node_config
graph_config=workflow.graph_dict, config=node_config
)
except NotImplementedError:
variable_mapping = {}
cls.mapping_user_inputs_to_variable_pool(
variable_mapping=variable_mapping,
user_inputs=user_inputs,
variable_pool=variable_pool,
tenant_id=workflow.tenant_id,
node_type=node_type,
node_data=node_instance.node_data,
) )
except NotImplementedError:
variable_mapping = {}
cls.mapping_user_inputs_to_variable_pool(
variable_mapping=variable_mapping,
user_inputs=user_inputs,
variable_pool=variable_pool,
tenant_id=workflow.tenant_id,
)
try:
# run node # run node
generator = node_instance.run() generator = node_instance.run()
return node_instance, generator
except Exception as e: except Exception as e:
raise WorkflowNodeRunFailedError(node_instance=node_instance, error=str(e)) raise WorkflowNodeRunFailedError(node_instance=node_instance, error=str(e))
return node_instance, generator
@staticmethod @staticmethod
def handle_special_values(value: Optional[Mapping[str, Any]]) -> Mapping[str, Any] | None: def handle_special_values(value: Optional[Mapping[str, Any]]) -> Mapping[str, Any] | None:
@ -231,12 +222,11 @@ class WorkflowEntry:
@classmethod @classmethod
def mapping_user_inputs_to_variable_pool( def mapping_user_inputs_to_variable_pool(
cls, cls,
*,
variable_mapping: Mapping[str, Sequence[str]], variable_mapping: Mapping[str, Sequence[str]],
user_inputs: dict, user_inputs: dict,
variable_pool: VariablePool, variable_pool: VariablePool,
tenant_id: str, tenant_id: str,
node_type: NodeType,
node_data: BaseNodeData,
) -> None: ) -> None:
for node_variable, variable_selector in variable_mapping.items(): for node_variable, variable_selector in variable_mapping.items():
# fetch node id and variable key from node_variable # fetch node id and variable key from node_variable
@ -254,40 +244,21 @@ class WorkflowEntry:
# fetch variable node id from variable selector # fetch variable node id from variable selector
variable_node_id = variable_selector[0] variable_node_id = variable_selector[0]
variable_key_list = variable_selector[1:] variable_key_list = variable_selector[1:]
variable_key_list = cast(list[str], variable_key_list) variable_key_list = list(variable_key_list)
# get input value # get input value
input_value = user_inputs.get(node_variable) input_value = user_inputs.get(node_variable)
if not input_value: if not input_value:
input_value = user_inputs.get(node_variable_key) input_value = user_inputs.get(node_variable_key)
# FIXME: temp fix for image type if isinstance(input_value, dict) and "type" in input_value and "transfer_method" in input_value:
if node_type == NodeType.LLM: input_value = file_factory.build_from_mapping(mapping=input_value, tenant_id=tenant_id)
new_value = [] if (
if isinstance(input_value, list): isinstance(input_value, list)
node_data = cast(LLMNodeData, node_data) and all(isinstance(item, dict) for item in input_value)
and all("type" in item and "transfer_method" in item for item in input_value)
detail = node_data.vision.configs.detail if node_data.vision.configs else None ):
input_value = file_factory.build_from_mappings(mappings=input_value, tenant_id=tenant_id)
for item in input_value:
if isinstance(item, dict) and "type" in item and item["type"] == "image":
transfer_method = FileTransferMethod.value_of(item.get("transfer_method"))
mapping = {
"id": item.get("id"),
"transfer_method": transfer_method,
"upload_file_id": item.get("upload_file_id"),
"url": item.get("url"),
}
config = FileUploadConfig(image_config=ImageConfig(detail=detail) if detail else None)
file = file_factory.build_from_mapping(
mapping=mapping,
tenant_id=tenant_id,
config=config,
)
new_value.append(file)
if new_value:
input_value = new_value
# append variable and value to variable pool # append variable and value to variable pool
variable_pool.add([variable_node_id] + variable_key_list, input_value) variable_pool.add([variable_node_id] + variable_key_list, input_value)

View File

@ -86,12 +86,9 @@ def build_from_mapping(
def build_from_mappings( def build_from_mappings(
*, *,
mappings: Sequence[Mapping[str, Any]], mappings: Sequence[Mapping[str, Any]],
config: FileUploadConfig | None, config: FileUploadConfig | None = None,
tenant_id: str, tenant_id: str,
) -> Sequence[File]: ) -> Sequence[File]:
if not config:
return []
files = [ files = [
build_from_mapping( build_from_mapping(
mapping=mapping, mapping=mapping,
@ -102,13 +99,14 @@ def build_from_mappings(
] ]
if ( if (
config
# If image config is set. # If image config is set.
config.image_config and config.image_config
# And the number of image files exceeds the maximum limit # And the number of image files exceeds the maximum limit
and sum(1 for _ in (filter(lambda x: x.type == FileType.IMAGE, files))) > config.image_config.number_limits and sum(1 for _ in (filter(lambda x: x.type == FileType.IMAGE, files))) > config.image_config.number_limits
): ):
raise ValueError(f"Number of image files exceeds the maximum limit {config.image_config.number_limits}") raise ValueError(f"Number of image files exceeds the maximum limit {config.image_config.number_limits}")
if config.number_limits and len(files) > config.number_limits: if config and config.number_limits and len(files) > config.number_limits:
raise ValueError(f"Number of files exceeds the maximum limit {config.number_limits}") raise ValueError(f"Number of files exceeds the maximum limit {config.number_limits}")
return files return files

View File

@ -262,13 +262,17 @@ class WorkflowService:
if run_succeeded and node_run_result: if run_succeeded and node_run_result:
# create workflow node execution # create workflow node execution
workflow_node_execution.inputs = json.dumps(node_run_result.inputs) if node_run_result.inputs else None inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None
workflow_node_execution.process_data = ( process_data = (
json.dumps(node_run_result.process_data) if node_run_result.process_data else None WorkflowEntry.handle_special_values(node_run_result.process_data)
) if node_run_result.process_data
workflow_node_execution.outputs = ( else None
json.dumps(jsonable_encoder(node_run_result.outputs)) if node_run_result.outputs else None
) )
outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None
workflow_node_execution.inputs = json.dumps(inputs)
workflow_node_execution.process_data = json.dumps(process_data)
workflow_node_execution.outputs = json.dumps(outputs)
workflow_node_execution.execution_metadata = ( workflow_node_execution.execution_metadata = (
json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None
) )
@ -303,10 +307,10 @@ class WorkflowService:
new_app = workflow_converter.convert_to_workflow( new_app = workflow_converter.convert_to_workflow(
app_model=app_model, app_model=app_model,
account=account, account=account,
name=args.get("name"), name=args.get("name", "Default Name"),
icon_type=args.get("icon_type"), icon_type=args.get("icon_type", "emoji"),
icon=args.get("icon"), icon=args.get("icon", "🤖"),
icon_background=args.get("icon_background"), icon_background=args.get("icon_background", "#FFEAD5"),
) )
return new_app return new_app

View File

@ -16,6 +16,7 @@ import { InputVarType, NodeRunningStatus } from '@/app/components/workflow/types
import ResultPanel from '@/app/components/workflow/run/result-panel' import ResultPanel from '@/app/components/workflow/run/result-panel'
import Toast from '@/app/components/base/toast' import Toast from '@/app/components/base/toast'
import { TransferMethod } from '@/types/app' import { TransferMethod } from '@/types/app'
import { getProcessedFiles } from '@/app/components/base/file-uploader/utils'
const i18nPrefix = 'workflow.singleRun' const i18nPrefix = 'workflow.singleRun'
@ -39,6 +40,11 @@ function formatValue(value: string | any, type: InputVarType) {
return JSON.parse(item) return JSON.parse(item)
}) })
} }
if (type === InputVarType.multiFiles)
return getProcessedFiles(value)
if (type === InputVarType.singleFile)
return getProcessedFiles([value])[0]
return value return value
} }