diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 4e0996f011..0884fac4a9 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -522,6 +522,7 @@ class WorkflowBasedAppRunner(AppRunner): status=event.status, data=event.data, metadata=event.metadata, + node_id=event.node_id, ) ) elif isinstance(event, ParallelBranchRunStartedEvent): diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 6dc67c5eea..3702326406 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -427,6 +427,7 @@ class QueueAgentLogEvent(AppQueueEvent): status: str data: Mapping[str, Any] metadata: Optional[Mapping[str, Any]] = None + node_id: str class QueueNodeRetryEvent(QueueNodeStartedEvent): diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 0708986a65..f23ee1b9fd 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -817,6 +817,7 @@ class AgentLogStreamResponse(StreamResponse): status: str data: Mapping[str, Any] metadata: Optional[Mapping[str, Any]] = None + node_id: str event: StreamEvent = StreamEvent.AGENT_LOG data: Data diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index 0dc2daa3bc..8204d0be85 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -959,5 +959,6 @@ class WorkflowCycleManage: status=event.status, data=event.data, metadata=event.metadata, + node_id=event.node_id, ), ) diff --git a/api/core/workflow/graph_engine/entities/event.py b/api/core/workflow/graph_engine/entities/event.py index caadb79cbd..689a07c4f6 100644 --- a/api/core/workflow/graph_engine/entities/event.py +++ b/api/core/workflow/graph_engine/entities/event.py @@ -268,6 +268,7 @@ class AgentLogEvent(BaseAgentEvent): status: str = Field(..., description="status") data: Mapping[str, Any] = Field(..., description="data") metadata: Optional[Mapping[str, Any]] = Field(default=None, description="metadata") + node_id: str = Field(..., description="agent node id") InNodeEvent = BaseNodeEvent | BaseParallelBranchEvent | BaseIterationEvent | BaseAgentEvent | BaseLoopEvent diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index f19aa5a179..9bb54da3a2 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -18,6 +18,7 @@ from core.workflow.entities.node_entities import AgentNodeStrategyInit, NodeRunM from core.workflow.entities.variable_pool import VariablePool, VariableValue from core.workflow.graph_engine.condition_handlers.condition_manager import ConditionManager from core.workflow.graph_engine.entities.event import ( + BaseAgentEvent, BaseIterationEvent, BaseLoopEvent, GraphEngineEvent, @@ -502,7 +503,7 @@ class GraphEngine: break yield event - if event.parallel_id == parallel_id: + if not isinstance(event, BaseAgentEvent) and event.parallel_id == parallel_id: if isinstance(event, ParallelBranchRunSucceededEvent): succeeded_count += 1 if succeeded_count == len(futures): diff --git a/api/core/workflow/nodes/tool/tool_node.py b/api/core/workflow/nodes/tool/tool_node.py index 8518db5d80..1e31a2c324 100644 --- a/api/core/workflow/nodes/tool/tool_node.py +++ b/api/core/workflow/nodes/tool/tool_node.py @@ -338,6 +338,7 @@ class ToolNode(BaseNode[ToolNodeData]): data=message.message.data, label=message.message.label, metadata=message.message.metadata, + node_id=self.node_id, ) # check if the agent log is already in the list