fix: workflow run edge status

This commit is contained in:
StyleZhang 2024-08-27 14:39:56 +08:00
parent b9f34f679f
commit 4e3dc36e37
4 changed files with 61 additions and 5 deletions

View File

@ -1,5 +1,6 @@
import { useCallback } from 'react' import { useCallback } from 'react'
import { import {
getIncomers,
useReactFlow, useReactFlow,
useStoreApi, useStoreApi,
} from 'reactflow' } from 'reactflow'
@ -8,6 +9,7 @@ import { v4 as uuidV4 } from 'uuid'
import { usePathname } from 'next/navigation' import { usePathname } from 'next/navigation'
import { useWorkflowStore } from '../store' import { useWorkflowStore } from '../store'
import { useNodesSyncDraft } from '../hooks' import { useNodesSyncDraft } from '../hooks'
import type { Node } from '../types'
import { import {
NodeRunningStatus, NodeRunningStatus,
WorkflowRunningStatus, WorkflowRunningStatus,
@ -288,11 +290,12 @@ export const useWorkflowRun = () => {
draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
}) })
setNodes(newNodes) setNodes(newNodes)
const incomeNodesId = getIncomers({ id: data.node_id } as Node, newNodes, edges).filter(node => node.data._runningStatus === NodeRunningStatus.Succeeded).map(node => node.id)
const newEdges = produce(edges, (draft) => { const newEdges = produce(edges, (draft) => {
const edge = draft.find(edge => edge.target === data.node_id && edge.source === prevNodeId) draft.forEach((edge) => {
if (edge.target === data.node_id && incomeNodesId.includes(edge.source))
if (edge) edge.data = { ...edge.data, _runned: true } as any
edge.data = { ...edge.data, _runned: true } as any })
}) })
setEdges(newEdges) setEdges(newEdges)
} }
@ -470,6 +473,12 @@ export const useWorkflowRun = () => {
if (onIterationFinish) if (onIterationFinish)
onIterationFinish(params) onIterationFinish(params)
}, },
onParallelBranchStarted: (params) => {
// console.log(params, 'parallel start')
},
onParallelBranchFinished: (params) => {
// console.log(params, 'finished')
},
onTextChunk: (params) => { onTextChunk: (params) => {
const { data: { text } } = params const { data: { text } } = params
const { const {

View File

@ -55,6 +55,7 @@ export type CommonNodeType<T = {}> = {
_iterationLength?: number _iterationLength?: number
_iterationIndex?: number _iterationIndex?: number
_inParallelHovering?: boolean _inParallelHovering?: boolean
start_node_in_iteration?: boolean
isIterationStart?: boolean isIterationStart?: boolean
isInIteration?: boolean isInIteration?: boolean
iteration_id?: string iteration_id?: string

View File

@ -8,6 +8,8 @@ import type {
IterationStartedResponse, IterationStartedResponse,
NodeFinishedResponse, NodeFinishedResponse,
NodeStartedResponse, NodeStartedResponse,
ParallelBranchFinishedResponse,
ParallelBranchStartedResponse,
TextChunkResponse, TextChunkResponse,
TextReplaceResponse, TextReplaceResponse,
WorkflowFinishedResponse, WorkflowFinishedResponse,
@ -59,6 +61,8 @@ export type IOnNodeFinished = (nodeFinished: NodeFinishedResponse) => void
export type IOnIterationStarted = (workflowStarted: IterationStartedResponse) => void export type IOnIterationStarted = (workflowStarted: IterationStartedResponse) => void
export type IOnIterationNexted = (workflowStarted: IterationNextedResponse) => void export type IOnIterationNexted = (workflowStarted: IterationNextedResponse) => void
export type IOnIterationFinished = (workflowFinished: IterationFinishedResponse) => void export type IOnIterationFinished = (workflowFinished: IterationFinishedResponse) => void
export type IOnParallelBranchStarted = (parallelBranchStarted: ParallelBranchStartedResponse) => void
export type IOnParallelBranchFinished = (parallelBranchFinished: ParallelBranchFinishedResponse) => void
export type IOnTextChunk = (textChunk: TextChunkResponse) => void export type IOnTextChunk = (textChunk: TextChunkResponse) => void
export type IOnTTSChunk = (messageId: string, audioStr: string, audioType?: string) => void export type IOnTTSChunk = (messageId: string, audioStr: string, audioType?: string) => void
export type IOnTTSEnd = (messageId: string, audioStr: string, audioType?: string) => void export type IOnTTSEnd = (messageId: string, audioStr: string, audioType?: string) => void
@ -86,6 +90,8 @@ export type IOtherOptions = {
onIterationStart?: IOnIterationStarted onIterationStart?: IOnIterationStarted
onIterationNext?: IOnIterationNexted onIterationNext?: IOnIterationNexted
onIterationFinish?: IOnIterationFinished onIterationFinish?: IOnIterationFinished
onParallelBranchStarted?: IOnParallelBranchStarted
onParallelBranchFinished?: IOnParallelBranchFinished
onTextChunk?: IOnTextChunk onTextChunk?: IOnTextChunk
onTTSChunk?: IOnTTSChunk onTTSChunk?: IOnTTSChunk
onTTSEnd?: IOnTTSEnd onTTSEnd?: IOnTTSEnd
@ -139,6 +145,8 @@ const handleStream = (
onIterationStart?: IOnIterationStarted, onIterationStart?: IOnIterationStarted,
onIterationNext?: IOnIterationNexted, onIterationNext?: IOnIterationNexted,
onIterationFinish?: IOnIterationFinished, onIterationFinish?: IOnIterationFinished,
onParallelBranchStarted?: IOnParallelBranchStarted,
onParallelBranchFinished?: IOnParallelBranchFinished,
onTextChunk?: IOnTextChunk, onTextChunk?: IOnTextChunk,
onTTSChunk?: IOnTTSChunk, onTTSChunk?: IOnTTSChunk,
onTTSEnd?: IOnTTSEnd, onTTSEnd?: IOnTTSEnd,
@ -228,6 +236,12 @@ const handleStream = (
else if (bufferObj.event === 'iteration_completed') { else if (bufferObj.event === 'iteration_completed') {
onIterationFinish?.(bufferObj as IterationFinishedResponse) onIterationFinish?.(bufferObj as IterationFinishedResponse)
} }
else if (bufferObj.event === 'parallel_branch_started') {
onParallelBranchStarted?.(bufferObj as ParallelBranchStartedResponse)
}
else if (bufferObj.event === 'parallel_branch_finished') {
onParallelBranchFinished?.(bufferObj as ParallelBranchFinishedResponse)
}
else if (bufferObj.event === 'text_chunk') { else if (bufferObj.event === 'text_chunk') {
onTextChunk?.(bufferObj as TextChunkResponse) onTextChunk?.(bufferObj as TextChunkResponse)
} }
@ -488,6 +502,8 @@ export const ssePost = (
onIterationStart, onIterationStart,
onIterationNext, onIterationNext,
onIterationFinish, onIterationFinish,
onParallelBranchStarted,
onParallelBranchFinished,
onTextChunk, onTextChunk,
onTTSChunk, onTTSChunk,
onTTSEnd, onTTSEnd,
@ -544,7 +560,7 @@ export const ssePost = (
return return
} }
onData?.(str, isFirstMessage, moreInfo) onData?.(str, isFirstMessage, moreInfo)
}, onCompleted, onThought, onMessageEnd, onMessageReplace, onFile, onWorkflowStarted, onWorkflowFinished, onNodeStarted, onNodeFinished, onIterationStart, onIterationNext, onIterationFinish, onTextChunk, onTTSChunk, onTTSEnd, onTextReplace) }, onCompleted, onThought, onMessageEnd, onMessageReplace, onFile, onWorkflowStarted, onWorkflowFinished, onNodeStarted, onNodeFinished, onIterationStart, onIterationNext, onIterationFinish, onParallelBranchStarted, onParallelBranchFinished, onTextChunk, onTTSChunk, onTTSEnd, onTextReplace)
}).catch((e) => { }).catch((e) => {
if (e.toString() !== 'AbortError: The user aborted a request.' && !e.toString().errorMessage.includes('TypeError: Cannot assign to read only property')) if (e.toString() !== 'AbortError: The user aborted a request.' && !e.toString().errorMessage.includes('TypeError: Cannot assign to read only property'))
Toast.notify({ type: 'error', message: e }) Toast.notify({ type: 'error', message: e })

View File

@ -187,6 +187,36 @@ export type IterationFinishedResponse = {
} }
} }
export type ParallelBranchStartedResponse = {
task_id: string
workflow_run_id: string
event: string
data: {
parallel_id: string
parallel_start_node_id: string
parent_parallel_id: string
parent_parallel_start_node_id: string
iteration_id?: string
created_at: number
}
}
export type ParallelBranchFinishedResponse = {
task_id: string
workflow_run_id: string
event: string
data: {
parallel_id: string
parallel_start_node_id: string
parent_parallel_id: string
parent_parallel_start_node_id: string
iteration_id?: string
status: string
created_at: number
error: string
}
}
export type TextChunkResponse = { export type TextChunkResponse = {
task_id: string task_id: string
workflow_run_id: string workflow_run_id: string