From f414d241c1a30777cda97072851f1343da5f1f56 Mon Sep 17 00:00:00 2001 From: Novice <857526207@qq.com> Date: Mon, 11 Nov 2024 14:47:52 +0800 Subject: [PATCH] Feat/iteration single run time (#10512) --- api/core/app/apps/workflow_app_runner.py | 1 + api/core/app/entities/queue_entities.py | 3 ++ api/core/app/entities/task_entities.py | 1 + .../task_pipeline/workflow_cycle_manage.py | 1 + api/core/workflow/entities/node_entities.py | 1 + .../workflow/graph_engine/entities/event.py | 2 + .../nodes/iteration/iteration_node.py | 22 +++++++- .../workflow/hooks/use-workflow-run.ts | 3 ++ .../components/workflow/nodes/_base/node.tsx | 2 +- .../workflow/panel/workflow-preview.tsx | 8 ++- web/app/components/workflow/run/index.tsx | 9 ++-- .../workflow/run/iteration-result-panel.tsx | 54 ++++++++++++++----- web/app/components/workflow/run/node.tsx | 6 +-- .../components/workflow/run/tracing-panel.tsx | 4 +- web/i18n/en-US/workflow.ts | 6 +-- web/types/workflow.ts | 7 +++ 16 files changed, 101 insertions(+), 29 deletions(-) diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 9a01e8a253..2872390d46 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -361,6 +361,7 @@ class WorkflowBasedAppRunner(AppRunner): node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps, output=event.pre_iteration_output, parallel_mode_run_id=event.parallel_mode_run_id, + duration=event.duration, ) ) elif isinstance(event, (IterationRunSucceededEvent | IterationRunFailedEvent)): diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index f1542ec5d8..69bc0d7f9e 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -111,6 +111,7 @@ class QueueIterationNextEvent(AppQueueEvent): """iteratoin run in parallel mode run id""" node_run_index: int output: Optional[Any] = None # output for the current iteration + duration: Optional[float] = None @field_validator("output", mode="before") @classmethod @@ -307,6 +308,8 @@ class QueueNodeSucceededEvent(AppQueueEvent): execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None error: Optional[str] = None + """single iteration duration map""" + iteration_duration_map: Optional[dict[str, float]] = None class QueueNodeInIterationFailedEvent(AppQueueEvent): diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 7e9aad54be..03cc6941a8 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -434,6 +434,7 @@ class IterationNodeNextStreamResponse(StreamResponse): parallel_id: Optional[str] = None parallel_start_node_id: Optional[str] = None parallel_mode_run_id: Optional[str] = None + duration: Optional[float] = None event: StreamEvent = StreamEvent.ITERATION_NEXT workflow_run_id: str diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index b89edf9079..042339969f 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -624,6 +624,7 @@ class WorkflowCycleManage: parallel_id=event.parallel_id, parallel_start_node_id=event.parallel_start_node_id, parallel_mode_run_id=event.parallel_mode_run_id, + duration=event.duration, ), ) diff --git a/api/core/workflow/entities/node_entities.py b/api/core/workflow/entities/node_entities.py index 7e10cddc71..a747266661 100644 --- a/api/core/workflow/entities/node_entities.py +++ b/api/core/workflow/entities/node_entities.py @@ -24,6 +24,7 @@ class NodeRunMetadataKey(str, Enum): PARENT_PARALLEL_ID = "parent_parallel_id" PARENT_PARALLEL_START_NODE_ID = "parent_parallel_start_node_id" PARALLEL_MODE_RUN_ID = "parallel_mode_run_id" + ITERATION_DURATION_MAP = "iteration_duration_map" # single iteration duration if iteration node runs class NodeRunResult(BaseModel): diff --git a/api/core/workflow/graph_engine/entities/event.py b/api/core/workflow/graph_engine/entities/event.py index bacea191dd..3736e632c3 100644 --- a/api/core/workflow/graph_engine/entities/event.py +++ b/api/core/workflow/graph_engine/entities/event.py @@ -148,6 +148,7 @@ class IterationRunStartedEvent(BaseIterationEvent): class IterationRunNextEvent(BaseIterationEvent): index: int = Field(..., description="index") pre_iteration_output: Optional[Any] = Field(None, description="pre iteration output") + duration: Optional[float] = Field(None, description="duration") class IterationRunSucceededEvent(BaseIterationEvent): @@ -156,6 +157,7 @@ class IterationRunSucceededEvent(BaseIterationEvent): outputs: Optional[dict[str, Any]] = None metadata: Optional[dict[str, Any]] = None steps: int = 0 + iteration_duration_map: Optional[dict[str, float]] = None class IterationRunFailedEvent(BaseIterationEvent): diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index e5863d771b..941ebde7a9 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -156,6 +156,7 @@ class IterationNode(BaseNode[IterationNodeData]): index=0, pre_iteration_output=None, ) + iter_run_map: dict[str, float] = {} outputs: list[Any] = [None] * len(iterator_list_value) try: if self.node_data.is_parallel: @@ -175,6 +176,7 @@ class IterationNode(BaseNode[IterationNodeData]): iteration_graph, index, item, + iter_run_map, ) future.add_done_callback(thread_pool.task_done_callback) futures.append(future) @@ -213,6 +215,7 @@ class IterationNode(BaseNode[IterationNodeData]): start_at, graph_engine, iteration_graph, + iter_run_map, ) if self.node_data.error_handle_mode == ErrorHandleMode.REMOVE_ABNORMAL_OUTPUT: outputs = [output for output in outputs if output is not None] @@ -230,7 +233,9 @@ class IterationNode(BaseNode[IterationNodeData]): yield RunCompletedEvent( run_result=NodeRunResult( - status=WorkflowNodeExecutionStatus.SUCCEEDED, outputs={"output": jsonable_encoder(outputs)} + status=WorkflowNodeExecutionStatus.SUCCEEDED, + outputs={"output": jsonable_encoder(outputs)}, + metadata={NodeRunMetadataKey.ITERATION_DURATION_MAP: iter_run_map}, ) ) except IterationNodeError as e: @@ -356,15 +361,19 @@ class IterationNode(BaseNode[IterationNodeData]): start_at: datetime, graph_engine: "GraphEngine", iteration_graph: Graph, + iter_run_map: dict[str, float], parallel_mode_run_id: Optional[str] = None, ) -> Generator[NodeEvent | InNodeEvent, None, None]: """ run single iteration """ + iter_start_at = datetime.now(timezone.utc).replace(tzinfo=None) + try: rst = graph_engine.run() # get current iteration index current_index = variable_pool.get([self.node_id, "index"]).value + iteration_run_id = parallel_mode_run_id if parallel_mode_run_id is not None else f"{current_index}" next_index = int(current_index) + 1 if current_index is None: @@ -431,6 +440,8 @@ class IterationNode(BaseNode[IterationNodeData]): variable_pool.add([self.node_id, "index"], next_index) if next_index < len(iterator_list_value): variable_pool.add([self.node_id, "item"], iterator_list_value[next_index]) + duration = (datetime.now(timezone.utc).replace(tzinfo=None) - iter_start_at).total_seconds() + iter_run_map[iteration_run_id] = duration yield IterationRunNextEvent( iteration_id=self.id, iteration_node_id=self.node_id, @@ -439,6 +450,7 @@ class IterationNode(BaseNode[IterationNodeData]): index=next_index, parallel_mode_run_id=parallel_mode_run_id, pre_iteration_output=None, + duration=duration, ) return elif self.node_data.error_handle_mode == ErrorHandleMode.REMOVE_ABNORMAL_OUTPUT: @@ -449,6 +461,8 @@ class IterationNode(BaseNode[IterationNodeData]): if next_index < len(iterator_list_value): variable_pool.add([self.node_id, "item"], iterator_list_value[next_index]) + duration = (datetime.now(timezone.utc).replace(tzinfo=None) - iter_start_at).total_seconds() + iter_run_map[iteration_run_id] = duration yield IterationRunNextEvent( iteration_id=self.id, iteration_node_id=self.node_id, @@ -457,6 +471,7 @@ class IterationNode(BaseNode[IterationNodeData]): index=next_index, parallel_mode_run_id=parallel_mode_run_id, pre_iteration_output=None, + duration=duration, ) return elif self.node_data.error_handle_mode == ErrorHandleMode.TERMINATED: @@ -485,6 +500,8 @@ class IterationNode(BaseNode[IterationNodeData]): if next_index < len(iterator_list_value): variable_pool.add([self.node_id, "item"], iterator_list_value[next_index]) + duration = (datetime.now(timezone.utc).replace(tzinfo=None) - iter_start_at).total_seconds() + iter_run_map[iteration_run_id] = duration yield IterationRunNextEvent( iteration_id=self.id, iteration_node_id=self.node_id, @@ -493,6 +510,7 @@ class IterationNode(BaseNode[IterationNodeData]): index=next_index, parallel_mode_run_id=parallel_mode_run_id, pre_iteration_output=jsonable_encoder(current_iteration_output) if current_iteration_output else None, + duration=duration, ) except IterationNodeError as e: @@ -528,6 +546,7 @@ class IterationNode(BaseNode[IterationNodeData]): iteration_graph: Graph, index: int, item: Any, + iter_run_map: dict[str, float], ) -> Generator[NodeEvent | InNodeEvent, None, None]: """ run single iteration in parallel mode @@ -546,6 +565,7 @@ class IterationNode(BaseNode[IterationNodeData]): start_at=start_at, graph_engine=graph_engine_copy, iteration_graph=iteration_graph, + iter_run_map=iter_run_map, parallel_mode_run_id=parallel_mode_run_id, ): q.put(event) diff --git a/web/app/components/workflow/hooks/use-workflow-run.ts b/web/app/components/workflow/hooks/use-workflow-run.ts index 26654ef71e..eab3535505 100644 --- a/web/app/components/workflow/hooks/use-workflow-run.ts +++ b/web/app/components/workflow/hooks/use-workflow-run.ts @@ -445,6 +445,7 @@ export const useWorkflowRun = () => { ...data, status: NodeRunningStatus.Running, details: [], + iterDurationMap: {}, } as any) })) @@ -496,6 +497,8 @@ export const useWorkflowRun = () => { setWorkflowRunningData(produce(workflowRunningData!, (draft) => { const iteration = draft.tracing!.find(trace => trace.node_id === data.node_id) if (iteration) { + if (iteration.iterDurationMap && data.duration) + iteration.iterDurationMap[data.parallel_mode_run_id ?? `${data.index - 1}`] = data.duration if (iteration.details!.length >= iteration.metadata.iterator_length!) return } diff --git a/web/app/components/workflow/nodes/_base/node.tsx b/web/app/components/workflow/nodes/_base/node.tsx index e864c419e2..c5b78c5c21 100644 --- a/web/app/components/workflow/nodes/_base/node.tsx +++ b/web/app/components/workflow/nodes/_base/node.tsx @@ -193,7 +193,7 @@ const BaseNode: FC = ({ { data._iterationLength && data._iterationIndex && data._runningStatus === NodeRunningStatus.Running && (
- {data._iterationIndex}/{data._iterationLength} + {data._iterationIndex > data._iterationLength ? data._iterationLength : data._iterationIndex}/{data._iterationLength}
) } diff --git a/web/app/components/workflow/panel/workflow-preview.tsx b/web/app/components/workflow/panel/workflow-preview.tsx index 361f9d6bf4..d560c0b2cb 100644 --- a/web/app/components/workflow/panel/workflow-preview.tsx +++ b/web/app/components/workflow/panel/workflow-preview.tsx @@ -28,7 +28,7 @@ import IterationResultPanel from '../run/iteration-result-panel' import InputsPanel from './inputs-panel' import cn from '@/utils/classnames' import Loading from '@/app/components/base/loading' -import type { NodeTracing } from '@/types/workflow' +import type { IterationDurationMap, NodeTracing } from '@/types/workflow' const WorkflowPreview = () => { const { t } = useTranslation() @@ -53,12 +53,14 @@ const WorkflowPreview = () => { }, [workflowRunningData]) const [iterationRunResult, setIterationRunResult] = useState([]) + const [iterDurationMap, setIterDurationMap] = useState({}) const [isShowIterationDetail, { setTrue: doShowIterationDetail, setFalse: doHideIterationDetail, }] = useBoolean(false) - const handleShowIterationDetail = useCallback((detail: NodeTracing[][]) => { + const handleShowIterationDetail = useCallback((detail: NodeTracing[][], iterationDurationMap: IterationDurationMap) => { + setIterDurationMap(iterationDurationMap) setIterationRunResult(detail) doShowIterationDetail() }, [doShowIterationDetail]) @@ -72,6 +74,7 @@ const WorkflowPreview = () => { list={iterationRunResult} onHide={doHideIterationDetail} onBack={doHideIterationDetail} + iterDurationMap={iterDurationMap} /> ) @@ -94,6 +97,7 @@ const WorkflowPreview = () => { list={iterationRunResult} onHide={doHideIterationDetail} onBack={doHideIterationDetail} + iterDurationMap={iterDurationMap} /> ) : ( diff --git a/web/app/components/workflow/run/index.tsx b/web/app/components/workflow/run/index.tsx index 89db43fa35..5267cf257d 100644 --- a/web/app/components/workflow/run/index.tsx +++ b/web/app/components/workflow/run/index.tsx @@ -13,7 +13,7 @@ import cn from '@/utils/classnames' import { ToastContext } from '@/app/components/base/toast' import Loading from '@/app/components/base/loading' import { fetchRunDetail, fetchTracingList } from '@/service/log' -import type { NodeTracing } from '@/types/workflow' +import type { IterationDurationMap, NodeTracing } from '@/types/workflow' import type { WorkflowRunDetailResponse } from '@/models/log' import { useStore as useAppStore } from '@/app/components/app/store' @@ -172,15 +172,17 @@ const RunPanel: FC = ({ hideResult, activeTab = 'RESULT', runID, getRe }, [loading]) const [iterationRunResult, setIterationRunResult] = useState([]) + const [iterDurationMap, setIterDurationMap] = useState({}) const [isShowIterationDetail, { setTrue: doShowIterationDetail, setFalse: doHideIterationDetail, }] = useBoolean(false) - const handleShowIterationDetail = useCallback((detail: NodeTracing[][]) => { + const handleShowIterationDetail = useCallback((detail: NodeTracing[][], iterDurationMap: IterationDurationMap) => { setIterationRunResult(detail) doShowIterationDetail() - }, [doShowIterationDetail]) + setIterDurationMap(iterDurationMap) + }, [doShowIterationDetail, setIterationRunResult, setIterDurationMap]) if (isShowIterationDetail) { return ( @@ -189,6 +191,7 @@ const RunPanel: FC = ({ hideResult, activeTab = 'RESULT', runID, getRe list={iterationRunResult} onHide={doHideIterationDetail} onBack={doHideIterationDetail} + iterDurationMap={iterDurationMap} /> ) diff --git a/web/app/components/workflow/run/iteration-result-panel.tsx b/web/app/components/workflow/run/iteration-result-panel.tsx index c4cd909f2e..b13eadec99 100644 --- a/web/app/components/workflow/run/iteration-result-panel.tsx +++ b/web/app/components/workflow/run/iteration-result-panel.tsx @@ -6,12 +6,14 @@ import { RiArrowRightSLine, RiCloseLine, RiErrorWarningLine, + RiLoader2Line, } from '@remixicon/react' import { ArrowNarrowLeft } from '../../base/icons/src/vender/line/arrows' +import { NodeRunningStatus } from '../types' import TracingPanel from './tracing-panel' import { Iteration } from '@/app/components/base/icons/src/vender/workflow' import cn from '@/utils/classnames' -import type { NodeTracing } from '@/types/workflow' +import type { IterationDurationMap, NodeTracing } from '@/types/workflow' const i18nPrefix = 'workflow.singleRun' type Props = { @@ -19,6 +21,7 @@ type Props = { onHide: () => void onBack: () => void noWrap?: boolean + iterDurationMap?: IterationDurationMap } const IterationResultPanel: FC = ({ @@ -26,6 +29,7 @@ const IterationResultPanel: FC = ({ onHide, onBack, noWrap, + iterDurationMap, }) => { const { t } = useTranslation() const [expandedIterations, setExpandedIterations] = useState>({}) @@ -36,6 +40,40 @@ const IterationResultPanel: FC = ({ [index]: !prev[index], })) }, []) + const countIterDuration = (iteration: NodeTracing[], iterDurationMap: IterationDurationMap): string => { + const IterRunIndex = iteration[0].execution_metadata.iteration_index as number + const iterRunId = iteration[0].execution_metadata.parallel_mode_run_id + const iterItem = iterDurationMap[iterRunId || IterRunIndex] + const duration = iterItem + return `${(duration && duration > 0.01) ? duration.toFixed(2) : 0.01}s` + } + const iterationStatusShow = (index: number, iteration: NodeTracing[], iterDurationMap?: IterationDurationMap) => { + const hasFailed = iteration.some(item => item.status === NodeRunningStatus.Failed) + const isRunning = iteration.some(item => item.status === NodeRunningStatus.Running) + const hasDurationMap = iterDurationMap && Object.keys(iterDurationMap).length !== 0 + + if (hasFailed) + return + + if (isRunning) + return + + return ( + <> + {hasDurationMap && ( +
+ {countIterDuration(iteration, iterDurationMap)} +
+ )} + + + ) + } const main = ( <> @@ -72,19 +110,7 @@ const IterationResultPanel: FC = ({ {t(`${i18nPrefix}.iteration`)} {index + 1} - { - iteration.some(item => item.status === 'failed') - ? ( - - ) - : (< RiArrowRightSLine className={ - cn( - 'w-4 h-4 text-text-tertiary transition-transform duration-200 flex-shrink-0', - expandedIterations[index] && 'transform rotate-90', - )} /> - ) - } - + {iterationStatusShow(index, iteration, iterDurationMap)} {expandedIterations[index] &&
void + onShowIterationDetail?: (detail: NodeTracing[][], iterDurationMap: IterationDurationMap) => void notShowIterationNav?: boolean justShowIterationNavArrow?: boolean } @@ -90,7 +90,7 @@ const NodePanel: FC = ({ const handleOnShowIterationDetail = (e: React.MouseEvent) => { e.stopPropagation() e.nativeEvent.stopImmediatePropagation() - onShowIterationDetail?.(nodeInfo.details || []) + onShowIterationDetail?.(nodeInfo.details || [], nodeInfo?.iterDurationMap || nodeInfo.execution_metadata?.iteration_duration_map || {}) } return (
diff --git a/web/app/components/workflow/run/tracing-panel.tsx b/web/app/components/workflow/run/tracing-panel.tsx index 613c10198d..57b3a5cf5f 100644 --- a/web/app/components/workflow/run/tracing-panel.tsx +++ b/web/app/components/workflow/run/tracing-panel.tsx @@ -16,11 +16,11 @@ import NodePanel from './node' import { BlockEnum, } from '@/app/components/workflow/types' -import type { NodeTracing } from '@/types/workflow' +import type { IterationDurationMap, NodeTracing } from '@/types/workflow' type TracingPanelProps = { list: NodeTracing[] - onShowIterationDetail?: (detail: NodeTracing[][]) => void + onShowIterationDetail?: (detail: NodeTracing[][], iterDurationMap: IterationDurationMap) => void className?: string hideNodeInfo?: boolean hideNodeProcessDetail?: boolean diff --git a/web/i18n/en-US/workflow.ts b/web/i18n/en-US/workflow.ts index 7bfad01f23..b3fece702a 100644 --- a/web/i18n/en-US/workflow.ts +++ b/web/i18n/en-US/workflow.ts @@ -569,9 +569,9 @@ const translation = { MaxParallelismDesc: 'The maximum parallelism is used to control the number of tasks executed simultaneously in a single iteration.', errorResponseMethod: 'Error response method', ErrorMethod: { - operationTerminated: 'terminated', - continueOnError: 'continue on error', - removeAbnormalOutput: 'remove abnormal output', + operationTerminated: 'Terminated', + continueOnError: 'Continue on Error', + removeAbnormalOutput: 'Remove Abnormal Output', }, answerNodeWarningDesc: 'Parallel mode warning: Answer nodes, conversation variable assignments, and persistent read/write operations within iterations may cause exceptions.', }, diff --git a/web/types/workflow.ts b/web/types/workflow.ts index 3c0675b605..34b08e878e 100644 --- a/web/types/workflow.ts +++ b/web/types/workflow.ts @@ -33,6 +33,7 @@ export type NodeTracing = { parent_parallel_id?: string parent_parallel_start_node_id?: string parallel_mode_run_id?: string + iteration_duration_map?: IterationDurationMap } metadata: { iterator_length: number @@ -44,6 +45,7 @@ export type NodeTracing = { name: string email: string } + iterDurationMap?: IterationDurationMap finished_at: number extras?: any expand?: boolean // for UI @@ -207,7 +209,10 @@ export type IterationNextResponse = { parallel_mode_run_id: string execution_metadata: { parallel_id?: string + iteration_index: number + parallel_mode_run_id?: string } + duration?: number } } @@ -323,3 +328,5 @@ export type ConversationVariableResponse = { total: number page: number } + +export type IterationDurationMap = Record