From b56d2b739b37c7c291dec6ca436af6a80208b2db Mon Sep 17 00:00:00 2001 From: Novice Lee Date: Wed, 8 Jan 2025 07:41:17 +0800 Subject: [PATCH] feat: add fc agent mode support --- api/.env.example | 3 +- api/core/agent/strategy/plugin.py | 4 +- .../task_pipeline/workflow_cycle_manage.py | 7 +- api/core/plugin/entities/parameters.py | 3 +- api/core/plugin/manager/agent.py | 2 +- api/core/workflow/nodes/agent/agent_node.py | 100 ++++++++++++++++-- api/core/workflow/nodes/agent/entities.py | 67 +----------- api/factories/agent_factory.py | 4 +- 8 files changed, 107 insertions(+), 83 deletions(-) diff --git a/api/.env.example b/api/.env.example index 810ad282a5..43d4916ebd 100644 --- a/api/.env.example +++ b/api/.env.example @@ -420,10 +420,11 @@ POSITION_PROVIDER_EXCLUDES= # Plugin configuration PLUGIN_API_KEY=lYkiYYT6owG+71oLerGzA7GXCgOT++6ovaezWAjpCjf+Sjc3ZtU+qUEi+vRjI/+XbV1AaFy691iy+kGDv2Jvy0/eAh8Y1 -PLUGIN_DAEMON_URL=http://127.0.0.1:5002 +PLUGIN_API_URL=http://127.0.0.1:5002 PLUGIN_REMOTE_INSTALL_PORT=5003 PLUGIN_REMOTE_INSTALL_HOST=localhost PLUGIN_MAX_PACKAGE_SIZE=15728640 +INNER_API_KEY=QaHbTe77CtuXmsfyhR7+vRjI/+XbV1AaFy691iy+kGDv2Jvy0/eAh8Y1 INNER_API_KEY_FOR_PLUGIN=QaHbTe77CtuXmsfyhR7+vRjI/+XbV1AaFy691iy+kGDv2Jvy0/eAh8Y1 # Marketplace configuration diff --git a/api/core/agent/strategy/plugin.py b/api/core/agent/strategy/plugin.py index dc4cfaf34e..a4b25f46e6 100644 --- a/api/core/agent/strategy/plugin.py +++ b/api/core/agent/strategy/plugin.py @@ -14,12 +14,10 @@ class PluginAgentStrategy(BaseAgentStrategy): """ tenant_id: str - plugin_unique_identifier: str declaration: AgentStrategyEntity - def __init__(self, tenant_id: str, plugin_unique_identifier: str, declaration: AgentStrategyEntity): + def __init__(self, tenant_id: str, declaration: AgentStrategyEntity): self.tenant_id = tenant_id - self.plugin_unique_identifier = plugin_unique_identifier self.declaration = declaration def get_parameters(self) -> Sequence[AgentStrategyParameter]: diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index e218735eec..8427926de8 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -255,10 +255,9 @@ class WorkflowCycleManage: for workflow_node_execution in running_workflow_node_executions: workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value workflow_node_execution.error = error - workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None) - workflow_node_execution.elapsed_time = ( - workflow_node_execution.finished_at - workflow_node_execution.created_at - ).total_seconds() + finish_at = datetime.now(UTC).replace(tzinfo=None) + workflow_node_execution.finished_at = finish_at + workflow_node_execution.elapsed_time = (finish_at - workflow_node_execution.created_at).total_seconds() if trace_manager: trace_manager.add_trace_task( diff --git a/api/core/plugin/entities/parameters.py b/api/core/plugin/entities/parameters.py index 10b24bde35..7bfa616c11 100644 --- a/api/core/plugin/entities/parameters.py +++ b/api/core/plugin/entities/parameters.py @@ -136,7 +136,8 @@ def cast_parameter_value(typ: enum.StrEnum, value: Any, /): return value case _: return str(value) - + except ValueError: + raise except Exception: raise ValueError(f"The tool parameter value {value} is not in correct type of {as_normal_type(typ)}.") diff --git a/api/core/plugin/manager/agent.py b/api/core/plugin/manager/agent.py index 10db8b5581..50172f12f2 100644 --- a/api/core/plugin/manager/agent.py +++ b/api/core/plugin/manager/agent.py @@ -92,7 +92,7 @@ class PluginAgentManager(BasePluginManager): response = self._request_with_plugin_daemon_response_stream( "POST", - f"plugin/{tenant_id}/dispatch/agent/invoke", + f"plugin/{tenant_id}/dispatch/agent_strategy/invoke", AgentInvokeMessage, data={ "user_id": user_id, diff --git a/api/core/workflow/nodes/agent/agent_node.py b/api/core/workflow/nodes/agent/agent_node.py index 2595965ea4..798d6a550d 100644 --- a/api/core/workflow/nodes/agent/agent_node.py +++ b/api/core/workflow/nodes/agent/agent_node.py @@ -1,15 +1,23 @@ -from collections.abc import Generator, Sequence +from ast import literal_eval +from collections.abc import Generator, Mapping, Sequence from typing import Any, cast +from core.agent.entities import AgentToolEntity from core.agent.plugin_entities import AgentStrategyParameter +from core.model_manager import ModelManager +from core.model_runtime.entities.model_entities import ModelType from core.plugin.manager.exc import PluginDaemonClientSideError +from core.tools.entities.tool_entities import ToolProviderType +from core.tools.tool_manager import ToolManager from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.variable_pool import VariablePool from core.workflow.enums import SystemVariableKey from core.workflow.nodes.agent.entities import AgentNodeData +from core.workflow.nodes.base.entities import BaseNodeData from core.workflow.nodes.enums import NodeType from core.workflow.nodes.event.event import RunCompletedEvent from core.workflow.nodes.tool.tool_node import ToolNode +from core.workflow.utils.variable_template_parser import VariableTemplateParser from factories.agent_factory import get_plugin_agent_strategy from models.workflow import WorkflowNodeExecutionStatus @@ -31,7 +39,6 @@ class AgentNode(ToolNode): try: strategy = get_plugin_agent_strategy( tenant_id=self.tenant_id, - plugin_unique_identifier=node_data.plugin_unique_identifier, agent_strategy_provider_name=node_data.agent_strategy_provider_name, agent_strategy_name=node_data.agent_strategy_name, ) @@ -48,12 +55,12 @@ class AgentNode(ToolNode): agent_parameters = strategy.get_parameters() # get parameters - parameters = self._generate_parameters( + parameters = self._generate_agent_parameters( agent_parameters=agent_parameters, variable_pool=self.graph_runtime_state.variable_pool, node_data=node_data, ) - parameters_for_log = self._generate_parameters( + parameters_for_log = self._generate_agent_parameters( agent_parameters=agent_parameters, variable_pool=self.graph_runtime_state.variable_pool, node_data=node_data, @@ -78,6 +85,7 @@ class AgentNode(ToolNode): error=f"Failed to invoke agent: {str(e)}", ) ) + return try: # convert tool messages @@ -91,7 +99,7 @@ class AgentNode(ToolNode): ) ) - def _generate_parameters( + def _generate_agent_parameters( self, *, agent_parameters: Sequence[AgentStrategyParameter], @@ -130,6 +138,86 @@ class AgentNode(ToolNode): parameter_value = segment_group.log if for_log else segment_group.text else: raise ValueError(f"Unknown agent input type '{agent_input.type}'") - result[parameter_name] = parameter_value + value = parameter_value.strip() + if (parameter_value.startswith("{") and parameter_value.endswith("}")) or ( + parameter_value.startswith("[") and parameter_value.endswith("]") + ): + value = literal_eval(parameter_value) # transform string to python object + if parameter.type == "array[tools]": + value = cast(list[dict[str, Any]], value) + value = [tool for tool in value if tool.get("enabled", False)] + + if not for_log: + if parameter.type == "array[tools]": + value = cast(list[dict[str, Any]], value) + tool_value = [] + for tool in value: + entity = AgentToolEntity( + provider_id=tool.get("provider_name", ""), + provider_type=ToolProviderType.BUILT_IN, + tool_name=tool.get("tool_name", ""), + tool_parameters=tool.get("parameters", {}), + plugin_unique_identifier=tool.get("plugin_unique_identifier", None), + ) + + extra = tool.get("extra", {}) + + tool_runtime = ToolManager.get_agent_tool_runtime( + self.tenant_id, self.app_id, entity, self.invoke_from + ) + if tool_runtime.entity.description: + tool_runtime.entity.description.llm = ( + extra.get("descrption", "") or tool_runtime.entity.description.llm + ) + + tool_value.append(tool_runtime.entity.model_dump(mode="json")) + value = tool_value + if parameter.type == "model-selector": + value = cast(dict[str, Any], value) + model_instance = ModelManager().get_model_instance( + tenant_id=self.tenant_id, + provider=value.get("provider", ""), + model_type=ModelType(value.get("model_type", "")), + model=value.get("model", ""), + ) + models = model_instance.model_type_instance.plugin_model_provider.declaration.models + finded_model = next((model for model in models if model.model == value.get("model", "")), None) + + value["entity"] = finded_model.model_dump(mode="json") if finded_model else None + + result[parameter_name] = value + + return result + + @classmethod + def _extract_variable_selector_to_variable_mapping( + cls, + *, + graph_config: Mapping[str, Any], + node_id: str, + node_data: BaseNodeData, + ) -> Mapping[str, Sequence[str]]: + """ + Extract variable selector to variable mapping + :param graph_config: graph config + :param node_id: node id + :param node_data: node data + :return: + """ + node_data = cast(AgentNodeData, node_data) + result = {} + for parameter_name in node_data.agent_parameters: + input = node_data.agent_parameters[parameter_name] + if input.type == "mixed": + assert isinstance(input.value, str) + selectors = VariableTemplateParser(input.value).extract_variable_selectors() + for selector in selectors: + result[selector.variable] = selector.value_selector + elif input.type == "variable": + result[parameter_name] = input.value + elif input.type == "constant": + pass + + result = {node_id + "." + key: value for key, value in result.items()} return result diff --git a/api/core/workflow/nodes/agent/entities.py b/api/core/workflow/nodes/agent/entities.py index c2cfc3136a..066dcd5666 100644 --- a/api/core/workflow/nodes/agent/entities.py +++ b/api/core/workflow/nodes/agent/entities.py @@ -1,81 +1,18 @@ from typing import Any, Literal, Union -from pydantic import BaseModel, ValidationInfo, field_validator +from pydantic import BaseModel from core.tools.entities.tool_entities import ToolSelector from core.workflow.nodes.base.entities import BaseNodeData -class AgentEntity(BaseModel): +class AgentNodeData(BaseNodeData): agent_strategy_provider_name: str # redundancy agent_strategy_name: str agent_strategy_label: str # redundancy - agent_configurations: dict[str, Any] - plugin_unique_identifier: str - @field_validator("agent_configurations", mode="before") - @classmethod - def validate_agent_configurations(cls, value, values: ValidationInfo): - if not isinstance(value, dict): - raise ValueError("agent_configurations must be a dictionary") - - for key in values.data.get("agent_configurations", {}): - value = values.data.get("agent_configurations", {}).get(key) - if isinstance(value, dict): - # convert dict to ToolSelector - return ToolSelector(**value) - elif isinstance(value, ToolSelector): - return value - elif isinstance(value, list): - # convert list[ToolSelector] to ToolSelector - if all(isinstance(val, dict) for val in value): - return [ToolSelector(**val) for val in value] - elif all(isinstance(val, ToolSelector) for val in value): - return value - else: - raise ValueError("value must be a list of ToolSelector") - else: - raise ValueError("value must be a dictionary or ToolSelector") - - return value - - -class AgentNodeData(BaseNodeData, AgentEntity): class AgentInput(BaseModel): - # TODO: check this type value: Union[list[str], list[ToolSelector], Any] type: Literal["mixed", "variable", "constant"] - @field_validator("type", mode="before") - @classmethod - def check_type(cls, value, validation_info: ValidationInfo): - typ = value - value = validation_info.data.get("value") - if typ == "mixed" and not isinstance(value, str): - raise ValueError("value must be a string") - elif typ == "variable": - if not isinstance(value, list): - raise ValueError("value must be a list") - for val in value: - if not isinstance(val, str): - raise ValueError("value must be a list of strings") - elif typ == "constant": - if isinstance(value, list): - # convert dict to ToolSelector - if all(isinstance(val, dict) for val in value) or all( - isinstance(val, ToolSelector) for val in value - ): - return value - else: - raise ValueError("value must be a list of ToolSelector") - elif isinstance(value, dict): - # convert dict to ToolSelector - return ToolSelector(**value) - elif isinstance(value, ToolSelector): - return value - else: - raise ValueError("value must be a list of ToolSelector") - - return typ - agent_parameters: dict[str, AgentInput] diff --git a/api/factories/agent_factory.py b/api/factories/agent_factory.py index b22e5d6e5c..4b2d2cc769 100644 --- a/api/factories/agent_factory.py +++ b/api/factories/agent_factory.py @@ -3,13 +3,13 @@ from core.plugin.manager.agent import PluginAgentManager def get_plugin_agent_strategy( - tenant_id: str, plugin_unique_identifier: str, agent_strategy_provider_name: str, agent_strategy_name: str + tenant_id: str, agent_strategy_provider_name: str, agent_strategy_name: str ) -> PluginAgentStrategy: # TODO: use contexts to cache the agent provider manager = PluginAgentManager() agent_provider = manager.fetch_agent_strategy_provider(tenant_id, agent_strategy_provider_name) for agent_strategy in agent_provider.declaration.strategies: if agent_strategy.identity.name == agent_strategy_name: - return PluginAgentStrategy(tenant_id, plugin_unique_identifier, agent_strategy) + return PluginAgentStrategy(tenant_id, agent_strategy) raise ValueError(f"Agent strategy {agent_strategy_name} not found")