From 4e3dc36e37db9aaae7ce8aff2b69d02692f4b7be Mon Sep 17 00:00:00 2001 From: StyleZhang Date: Tue, 27 Aug 2024 14:39:56 +0800 Subject: [PATCH] fix: workflow run edge status --- .../workflow/hooks/use-workflow-run.ts | 17 ++++++++--- web/app/components/workflow/types.ts | 1 + web/service/base.ts | 18 ++++++++++- web/types/workflow.ts | 30 +++++++++++++++++++ 4 files changed, 61 insertions(+), 5 deletions(-) diff --git a/web/app/components/workflow/hooks/use-workflow-run.ts b/web/app/components/workflow/hooks/use-workflow-run.ts index 96f6557fe0..af08baeceb 100644 --- a/web/app/components/workflow/hooks/use-workflow-run.ts +++ b/web/app/components/workflow/hooks/use-workflow-run.ts @@ -1,5 +1,6 @@ import { useCallback } from 'react' import { + getIncomers, useReactFlow, useStoreApi, } from 'reactflow' @@ -8,6 +9,7 @@ import { v4 as uuidV4 } from 'uuid' import { usePathname } from 'next/navigation' import { useWorkflowStore } from '../store' import { useNodesSyncDraft } from '../hooks' +import type { Node } from '../types' import { NodeRunningStatus, WorkflowRunningStatus, @@ -288,11 +290,12 @@ export const useWorkflowRun = () => { draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running }) setNodes(newNodes) + const incomeNodesId = getIncomers({ id: data.node_id } as Node, newNodes, edges).filter(node => node.data._runningStatus === NodeRunningStatus.Succeeded).map(node => node.id) const newEdges = produce(edges, (draft) => { - const edge = draft.find(edge => edge.target === data.node_id && edge.source === prevNodeId) - - if (edge) - edge.data = { ...edge.data, _runned: true } as any + draft.forEach((edge) => { + if (edge.target === data.node_id && incomeNodesId.includes(edge.source)) + edge.data = { ...edge.data, _runned: true } as any + }) }) setEdges(newEdges) } @@ -470,6 +473,12 @@ export const useWorkflowRun = () => { if (onIterationFinish) onIterationFinish(params) }, + onParallelBranchStarted: (params) => { + // console.log(params, 'parallel start') + }, + onParallelBranchFinished: (params) => { + // console.log(params, 'finished') + }, onTextChunk: (params) => { const { data: { text } } = params const { diff --git a/web/app/components/workflow/types.ts b/web/app/components/workflow/types.ts index c53a09e0b6..5fcd2b0873 100644 --- a/web/app/components/workflow/types.ts +++ b/web/app/components/workflow/types.ts @@ -55,6 +55,7 @@ export type CommonNodeType = { _iterationLength?: number _iterationIndex?: number _inParallelHovering?: boolean + start_node_in_iteration?: boolean isIterationStart?: boolean isInIteration?: boolean iteration_id?: string diff --git a/web/service/base.ts b/web/service/base.ts index bda83f1c8e..d2c71f408c 100644 --- a/web/service/base.ts +++ b/web/service/base.ts @@ -8,6 +8,8 @@ import type { IterationStartedResponse, NodeFinishedResponse, NodeStartedResponse, + ParallelBranchFinishedResponse, + ParallelBranchStartedResponse, TextChunkResponse, TextReplaceResponse, WorkflowFinishedResponse, @@ -59,6 +61,8 @@ export type IOnNodeFinished = (nodeFinished: NodeFinishedResponse) => void export type IOnIterationStarted = (workflowStarted: IterationStartedResponse) => void export type IOnIterationNexted = (workflowStarted: IterationNextedResponse) => void export type IOnIterationFinished = (workflowFinished: IterationFinishedResponse) => void +export type IOnParallelBranchStarted = (parallelBranchStarted: ParallelBranchStartedResponse) => void +export type IOnParallelBranchFinished = (parallelBranchFinished: ParallelBranchFinishedResponse) => void export type IOnTextChunk = (textChunk: TextChunkResponse) => void export type IOnTTSChunk = (messageId: string, audioStr: string, audioType?: string) => void export type IOnTTSEnd = (messageId: string, audioStr: string, audioType?: string) => void @@ -86,6 +90,8 @@ export type IOtherOptions = { onIterationStart?: IOnIterationStarted onIterationNext?: IOnIterationNexted onIterationFinish?: IOnIterationFinished + onParallelBranchStarted?: IOnParallelBranchStarted + onParallelBranchFinished?: IOnParallelBranchFinished onTextChunk?: IOnTextChunk onTTSChunk?: IOnTTSChunk onTTSEnd?: IOnTTSEnd @@ -139,6 +145,8 @@ const handleStream = ( onIterationStart?: IOnIterationStarted, onIterationNext?: IOnIterationNexted, onIterationFinish?: IOnIterationFinished, + onParallelBranchStarted?: IOnParallelBranchStarted, + onParallelBranchFinished?: IOnParallelBranchFinished, onTextChunk?: IOnTextChunk, onTTSChunk?: IOnTTSChunk, onTTSEnd?: IOnTTSEnd, @@ -228,6 +236,12 @@ const handleStream = ( else if (bufferObj.event === 'iteration_completed') { onIterationFinish?.(bufferObj as IterationFinishedResponse) } + else if (bufferObj.event === 'parallel_branch_started') { + onParallelBranchStarted?.(bufferObj as ParallelBranchStartedResponse) + } + else if (bufferObj.event === 'parallel_branch_finished') { + onParallelBranchFinished?.(bufferObj as ParallelBranchFinishedResponse) + } else if (bufferObj.event === 'text_chunk') { onTextChunk?.(bufferObj as TextChunkResponse) } @@ -488,6 +502,8 @@ export const ssePost = ( onIterationStart, onIterationNext, onIterationFinish, + onParallelBranchStarted, + onParallelBranchFinished, onTextChunk, onTTSChunk, onTTSEnd, @@ -544,7 +560,7 @@ export const ssePost = ( return } onData?.(str, isFirstMessage, moreInfo) - }, onCompleted, onThought, onMessageEnd, onMessageReplace, onFile, onWorkflowStarted, onWorkflowFinished, onNodeStarted, onNodeFinished, onIterationStart, onIterationNext, onIterationFinish, onTextChunk, onTTSChunk, onTTSEnd, onTextReplace) + }, onCompleted, onThought, onMessageEnd, onMessageReplace, onFile, onWorkflowStarted, onWorkflowFinished, onNodeStarted, onNodeFinished, onIterationStart, onIterationNext, onIterationFinish, onParallelBranchStarted, onParallelBranchFinished, onTextChunk, onTTSChunk, onTTSEnd, onTextReplace) }).catch((e) => { if (e.toString() !== 'AbortError: The user aborted a request.' && !e.toString().errorMessage.includes('TypeError: Cannot assign to read only property')) Toast.notify({ type: 'error', message: e }) diff --git a/web/types/workflow.ts b/web/types/workflow.ts index f7991bc4e0..73fa14e094 100644 --- a/web/types/workflow.ts +++ b/web/types/workflow.ts @@ -187,6 +187,36 @@ export type IterationFinishedResponse = { } } +export type ParallelBranchStartedResponse = { + task_id: string + workflow_run_id: string + event: string + data: { + parallel_id: string + parallel_start_node_id: string + parent_parallel_id: string + parent_parallel_start_node_id: string + iteration_id?: string + created_at: number + } +} + +export type ParallelBranchFinishedResponse = { + task_id: string + workflow_run_id: string + event: string + data: { + parallel_id: string + parallel_start_node_id: string + parent_parallel_id: string + parent_parallel_start_node_id: string + iteration_id?: string + status: string + created_at: number + error: string + } +} + export type TextChunkResponse = { task_id: string workflow_run_id: string