fix: remove the unused retry index field (#11903)

Signed-off-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: Novice Lee <novicelee@NoviPro.local>
Co-authored-by: -LAN- <laipz8200@outlook.com>
This commit is contained in:
Novice 2024-12-23 14:32:11 +08:00 committed by GitHub
parent 4b1e13e982
commit c1aa55f3ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 256 additions and 268 deletions

View File

@ -180,7 +180,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
else: else:
continue continue
raise Exception("Queue listening stopped unexpectedly.") raise ValueError("queue listening stopped unexpectedly.")
def _to_stream_response( def _to_stream_response(
self, generator: Generator[StreamResponse, None, None] self, generator: Generator[StreamResponse, None, None]
@ -291,9 +291,27 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
yield self._workflow_start_to_stream_response( yield self._workflow_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
) )
elif isinstance(
event,
QueueNodeRetryEvent,
):
if not workflow_run:
raise ValueError("workflow run not initialized.")
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)
response = self._workflow_node_retry_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
if response:
yield response
elif isinstance(event, QueueNodeStartedEvent): elif isinstance(event, QueueNodeStartedEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event) workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)
@ -331,63 +349,48 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
if response: if response:
yield response yield response
elif isinstance(
event,
QueueNodeRetryEvent,
):
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)
response = self._workflow_node_retry_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
if response:
yield response
elif isinstance(event, QueueParallelBranchRunStartedEvent): elif isinstance(event, QueueParallelBranchRunStartedEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
yield self._workflow_parallel_branch_start_to_stream_response( yield self._workflow_parallel_branch_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
) )
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent): elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
yield self._workflow_parallel_branch_finished_to_stream_response( yield self._workflow_parallel_branch_finished_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
) )
elif isinstance(event, QueueIterationStartEvent): elif isinstance(event, QueueIterationStartEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
yield self._workflow_iteration_start_to_stream_response( yield self._workflow_iteration_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
) )
elif isinstance(event, QueueIterationNextEvent): elif isinstance(event, QueueIterationNextEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
yield self._workflow_iteration_next_to_stream_response( yield self._workflow_iteration_next_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
) )
elif isinstance(event, QueueIterationCompletedEvent): elif isinstance(event, QueueIterationCompletedEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
yield self._workflow_iteration_completed_to_stream_response( yield self._workflow_iteration_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
) )
elif isinstance(event, QueueWorkflowSucceededEvent): elif isinstance(event, QueueWorkflowSucceededEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
if not graph_runtime_state: if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.") raise ValueError("workflow run not initialized.")
workflow_run = self._handle_workflow_run_success( workflow_run = self._handle_workflow_run_success(
workflow_run=workflow_run, workflow_run=workflow_run,
@ -406,10 +409,10 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE) self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
elif isinstance(event, QueueWorkflowPartialSuccessEvent): elif isinstance(event, QueueWorkflowPartialSuccessEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
if not graph_runtime_state: if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.") raise ValueError("graph runtime state not initialized.")
workflow_run = self._handle_workflow_run_partial_success( workflow_run = self._handle_workflow_run_partial_success(
workflow_run=workflow_run, workflow_run=workflow_run,
@ -429,10 +432,10 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE) self._queue_manager.publish(QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE)
elif isinstance(event, QueueWorkflowFailedEvent): elif isinstance(event, QueueWorkflowFailedEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
if not graph_runtime_state: if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.") raise ValueError("graph runtime state not initialized.")
workflow_run = self._handle_workflow_run_failed( workflow_run = self._handle_workflow_run_failed(
workflow_run=workflow_run, workflow_run=workflow_run,
@ -522,7 +525,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
yield self._message_replace_to_stream_response(answer=event.text) yield self._message_replace_to_stream_response(answer=event.text)
elif isinstance(event, QueueAdvancedChatMessageEndEvent): elif isinstance(event, QueueAdvancedChatMessageEndEvent):
if not graph_runtime_state: if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.") raise ValueError("graph runtime state not initialized.")
output_moderation_answer = self._handle_output_moderation_when_task_finished(self._task_state.answer) output_moderation_answer = self._handle_output_moderation_when_task_finished(self._task_state.answer)
if output_moderation_answer: if output_moderation_answer:

View File

@ -155,7 +155,7 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
else: else:
continue continue
raise Exception("Queue listening stopped unexpectedly.") raise ValueError("queue listening stopped unexpectedly.")
def _to_stream_response( def _to_stream_response(
self, generator: Generator[StreamResponse, None, None] self, generator: Generator[StreamResponse, None, None]
@ -218,7 +218,7 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
break break
else: else:
yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id) yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id)
except Exception as e: except Exception:
logger.exception(f"Fails to get audio trunk, task_id: {task_id}") logger.exception(f"Fails to get audio trunk, task_id: {task_id}")
break break
if tts_publisher: if tts_publisher:
@ -254,9 +254,27 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
yield self._workflow_start_to_stream_response( yield self._workflow_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run task_id=self._application_generate_entity.task_id, workflow_run=workflow_run
) )
elif isinstance(
event,
QueueNodeRetryEvent,
):
if not workflow_run:
raise ValueError("workflow run not initialized.")
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)
response = self._workflow_node_retry_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
if response:
yield response
elif isinstance(event, QueueNodeStartedEvent): elif isinstance(event, QueueNodeStartedEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event) workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)
@ -289,64 +307,48 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
) )
if node_failed_response: if node_failed_response:
yield node_failed_response yield node_failed_response
elif isinstance(
event,
QueueNodeRetryEvent,
):
workflow_node_execution = self._handle_workflow_node_execution_retried(
workflow_run=workflow_run, event=event
)
response = self._workflow_node_retry_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
if response:
yield response
elif isinstance(event, QueueParallelBranchRunStartedEvent): elif isinstance(event, QueueParallelBranchRunStartedEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
yield self._workflow_parallel_branch_start_to_stream_response( yield self._workflow_parallel_branch_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
) )
elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent): elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
yield self._workflow_parallel_branch_finished_to_stream_response( yield self._workflow_parallel_branch_finished_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
) )
elif isinstance(event, QueueIterationStartEvent): elif isinstance(event, QueueIterationStartEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
yield self._workflow_iteration_start_to_stream_response( yield self._workflow_iteration_start_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
) )
elif isinstance(event, QueueIterationNextEvent): elif isinstance(event, QueueIterationNextEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
yield self._workflow_iteration_next_to_stream_response( yield self._workflow_iteration_next_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
) )
elif isinstance(event, QueueIterationCompletedEvent): elif isinstance(event, QueueIterationCompletedEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
yield self._workflow_iteration_completed_to_stream_response( yield self._workflow_iteration_completed_to_stream_response(
task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event
) )
elif isinstance(event, QueueWorkflowSucceededEvent): elif isinstance(event, QueueWorkflowSucceededEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
if not graph_runtime_state: if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.") raise ValueError("graph runtime state not initialized.")
workflow_run = self._handle_workflow_run_success( workflow_run = self._handle_workflow_run_success(
workflow_run=workflow_run, workflow_run=workflow_run,
@ -366,10 +368,10 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
) )
elif isinstance(event, QueueWorkflowPartialSuccessEvent): elif isinstance(event, QueueWorkflowPartialSuccessEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
if not graph_runtime_state: if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.") raise ValueError("graph runtime state not initialized.")
workflow_run = self._handle_workflow_run_partial_success( workflow_run = self._handle_workflow_run_partial_success(
workflow_run=workflow_run, workflow_run=workflow_run,
@ -390,10 +392,10 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
) )
elif isinstance(event, QueueWorkflowFailedEvent | QueueStopEvent): elif isinstance(event, QueueWorkflowFailedEvent | QueueStopEvent):
if not workflow_run: if not workflow_run:
raise Exception("Workflow run not initialized.") raise ValueError("workflow run not initialized.")
if not graph_runtime_state: if not graph_runtime_state:
raise Exception("Graph runtime state not initialized.") raise ValueError("graph runtime state not initialized.")
workflow_run = self._handle_workflow_run_failed( workflow_run = self._handle_workflow_run_failed(
workflow_run=workflow_run, workflow_run=workflow_run,
start_at=graph_runtime_state.start_at, start_at=graph_runtime_state.start_at,

View File

@ -188,6 +188,41 @@ class WorkflowBasedAppRunner(AppRunner):
) )
elif isinstance(event, GraphRunFailedEvent): elif isinstance(event, GraphRunFailedEvent):
self._publish_event(QueueWorkflowFailedEvent(error=event.error, exceptions_count=event.exceptions_count)) self._publish_event(QueueWorkflowFailedEvent(error=event.error, exceptions_count=event.exceptions_count))
elif isinstance(event, NodeRunRetryEvent):
node_run_result = event.route_node_state.node_run_result
if node_run_result:
inputs = node_run_result.inputs
process_data = node_run_result.process_data
outputs = node_run_result.outputs
execution_metadata = node_run_result.metadata
else:
inputs = {}
process_data = {}
outputs = {}
execution_metadata = {}
self._publish_event(
QueueNodeRetryEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_data=event.node_data,
parallel_id=event.parallel_id,
parallel_start_node_id=event.parallel_start_node_id,
parent_parallel_id=event.parent_parallel_id,
parent_parallel_start_node_id=event.parent_parallel_start_node_id,
start_at=event.start_at,
node_run_index=event.route_node_state.index,
predecessor_node_id=event.predecessor_node_id,
in_iteration_id=event.in_iteration_id,
parallel_mode_run_id=event.parallel_mode_run_id,
inputs=inputs,
process_data=process_data,
outputs=outputs,
error=event.error,
execution_metadata=execution_metadata,
retry_index=event.retry_index,
)
)
elif isinstance(event, NodeRunStartedEvent): elif isinstance(event, NodeRunStartedEvent):
self._publish_event( self._publish_event(
QueueNodeStartedEvent( QueueNodeStartedEvent(
@ -207,6 +242,17 @@ class WorkflowBasedAppRunner(AppRunner):
) )
) )
elif isinstance(event, NodeRunSucceededEvent): elif isinstance(event, NodeRunSucceededEvent):
node_run_result = event.route_node_state.node_run_result
if node_run_result:
inputs = node_run_result.inputs
process_data = node_run_result.process_data
outputs = node_run_result.outputs
execution_metadata = node_run_result.metadata
else:
inputs = {}
process_data = {}
outputs = {}
execution_metadata = {}
self._publish_event( self._publish_event(
QueueNodeSucceededEvent( QueueNodeSucceededEvent(
node_execution_id=event.id, node_execution_id=event.id,
@ -218,18 +264,10 @@ class WorkflowBasedAppRunner(AppRunner):
parent_parallel_id=event.parent_parallel_id, parent_parallel_id=event.parent_parallel_id,
parent_parallel_start_node_id=event.parent_parallel_start_node_id, parent_parallel_start_node_id=event.parent_parallel_start_node_id,
start_at=event.route_node_state.start_at, start_at=event.route_node_state.start_at,
inputs=event.route_node_state.node_run_result.inputs inputs=inputs,
if event.route_node_state.node_run_result process_data=process_data,
else {}, outputs=outputs,
process_data=event.route_node_state.node_run_result.process_data execution_metadata=execution_metadata,
if event.route_node_state.node_run_result
else {},
outputs=event.route_node_state.node_run_result.outputs
if event.route_node_state.node_run_result
else {},
execution_metadata=event.route_node_state.node_run_result.metadata
if event.route_node_state.node_run_result
else {},
in_iteration_id=event.in_iteration_id, in_iteration_id=event.in_iteration_id,
) )
) )
@ -422,36 +460,6 @@ class WorkflowBasedAppRunner(AppRunner):
error=event.error if isinstance(event, IterationRunFailedEvent) else None, error=event.error if isinstance(event, IterationRunFailedEvent) else None,
) )
) )
elif isinstance(event, NodeRunRetryEvent):
self._publish_event(
QueueNodeRetryEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_data=event.node_data,
parallel_id=event.parallel_id,
parallel_start_node_id=event.parallel_start_node_id,
parent_parallel_id=event.parent_parallel_id,
parent_parallel_start_node_id=event.parent_parallel_start_node_id,
start_at=event.start_at,
inputs=event.route_node_state.node_run_result.inputs
if event.route_node_state.node_run_result
else {},
process_data=event.route_node_state.node_run_result.process_data
if event.route_node_state.node_run_result
else {},
outputs=event.route_node_state.node_run_result.outputs
if event.route_node_state.node_run_result
else {},
error=event.error,
execution_metadata=event.route_node_state.node_run_result.metadata
if event.route_node_state.node_run_result
else {},
in_iteration_id=event.in_iteration_id,
retry_index=event.retry_index,
start_index=event.start_index,
)
)
def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]: def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]:
""" """

View File

@ -1,3 +1,4 @@
from collections.abc import Mapping
from datetime import datetime from datetime import datetime
from enum import Enum, StrEnum from enum import Enum, StrEnum
from typing import Any, Optional from typing import Any, Optional
@ -85,9 +86,9 @@ class QueueIterationStartEvent(AppQueueEvent):
start_at: datetime start_at: datetime
node_run_index: int node_run_index: int
inputs: Optional[dict[str, Any]] = None inputs: Optional[Mapping[str, Any]] = None
predecessor_node_id: Optional[str] = None predecessor_node_id: Optional[str] = None
metadata: Optional[dict[str, Any]] = None metadata: Optional[Mapping[str, Any]] = None
class QueueIterationNextEvent(AppQueueEvent): class QueueIterationNextEvent(AppQueueEvent):
@ -139,9 +140,9 @@ class QueueIterationCompletedEvent(AppQueueEvent):
start_at: datetime start_at: datetime
node_run_index: int node_run_index: int
inputs: Optional[dict[str, Any]] = None inputs: Optional[Mapping[str, Any]] = None
outputs: Optional[dict[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None
metadata: Optional[dict[str, Any]] = None metadata: Optional[Mapping[str, Any]] = None
steps: int = 0 steps: int = 0
error: Optional[str] = None error: Optional[str] = None
@ -304,9 +305,9 @@ class QueueNodeSucceededEvent(AppQueueEvent):
"""iteration id if node is in iteration""" """iteration id if node is in iteration"""
start_at: datetime start_at: datetime
inputs: Optional[dict[str, Any]] = None inputs: Optional[Mapping[str, Any]] = None
process_data: Optional[dict[str, Any]] = None process_data: Optional[Mapping[str, Any]] = None
outputs: Optional[dict[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None
execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None
error: Optional[str] = None error: Optional[str] = None
@ -314,35 +315,18 @@ class QueueNodeSucceededEvent(AppQueueEvent):
iteration_duration_map: Optional[dict[str, float]] = None iteration_duration_map: Optional[dict[str, float]] = None
class QueueNodeRetryEvent(AppQueueEvent): class QueueNodeRetryEvent(QueueNodeStartedEvent):
"""QueueNodeRetryEvent entity""" """QueueNodeRetryEvent entity"""
event: QueueEvent = QueueEvent.RETRY event: QueueEvent = QueueEvent.RETRY
node_execution_id: str inputs: Optional[Mapping[str, Any]] = None
node_id: str process_data: Optional[Mapping[str, Any]] = None
node_type: NodeType outputs: Optional[Mapping[str, Any]] = None
node_data: BaseNodeData execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None
parallel_id: Optional[str] = None
"""parallel id if node is in parallel"""
parallel_start_node_id: Optional[str] = None
"""parallel start node id if node is in parallel"""
parent_parallel_id: Optional[str] = None
"""parent parallel id if node is in parallel"""
parent_parallel_start_node_id: Optional[str] = None
"""parent parallel start node id if node is in parallel"""
in_iteration_id: Optional[str] = None
"""iteration id if node is in iteration"""
start_at: datetime
inputs: Optional[dict[str, Any]] = None
process_data: Optional[dict[str, Any]] = None
outputs: Optional[dict[str, Any]] = None
execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None
error: str error: str
retry_index: int # retry index retry_index: int # retry index
start_index: int # start index
class QueueNodeInIterationFailedEvent(AppQueueEvent): class QueueNodeInIterationFailedEvent(AppQueueEvent):
@ -368,10 +352,10 @@ class QueueNodeInIterationFailedEvent(AppQueueEvent):
"""iteration id if node is in iteration""" """iteration id if node is in iteration"""
start_at: datetime start_at: datetime
inputs: Optional[dict[str, Any]] = None inputs: Optional[Mapping[str, Any]] = None
process_data: Optional[dict[str, Any]] = None process_data: Optional[Mapping[str, Any]] = None
outputs: Optional[dict[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None
execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None
error: str error: str
@ -399,10 +383,10 @@ class QueueNodeExceptionEvent(AppQueueEvent):
"""iteration id if node is in iteration""" """iteration id if node is in iteration"""
start_at: datetime start_at: datetime
inputs: Optional[dict[str, Any]] = None inputs: Optional[Mapping[str, Any]] = None
process_data: Optional[dict[str, Any]] = None process_data: Optional[Mapping[str, Any]] = None
outputs: Optional[dict[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None
execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None
error: str error: str
@ -430,10 +414,10 @@ class QueueNodeFailedEvent(AppQueueEvent):
"""iteration id if node is in iteration""" """iteration id if node is in iteration"""
start_at: datetime start_at: datetime
inputs: Optional[dict[str, Any]] = None inputs: Optional[Mapping[str, Any]] = None
process_data: Optional[dict[str, Any]] = None process_data: Optional[Mapping[str, Any]] = None
outputs: Optional[dict[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None
execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None
error: str error: str

View File

@ -445,6 +445,7 @@ class WorkflowCycleManage:
workflow_node_execution.workflow_id = workflow_run.workflow_id workflow_node_execution.workflow_id = workflow_run.workflow_id
workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value
workflow_node_execution.workflow_run_id = workflow_run.id workflow_node_execution.workflow_run_id = workflow_run.id
workflow_node_execution.predecessor_node_id = event.predecessor_node_id
workflow_node_execution.node_execution_id = event.node_execution_id workflow_node_execution.node_execution_id = event.node_execution_id
workflow_node_execution.node_id = event.node_id workflow_node_execution.node_id = event.node_id
workflow_node_execution.node_type = event.node_type.value workflow_node_execution.node_type = event.node_type.value
@ -461,9 +462,11 @@ class WorkflowCycleManage:
workflow_node_execution.execution_metadata = json.dumps( workflow_node_execution.execution_metadata = json.dumps(
{ {
NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id, NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id,
NodeRunMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id,
NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id,
} }
) )
workflow_node_execution.index = event.start_index workflow_node_execution.index = event.node_run_index
db.session.add(workflow_node_execution) db.session.add(workflow_node_execution)
db.session.commit() db.session.commit()

View File

@ -45,6 +45,7 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
) )
retries = 0 retries = 0
stream = kwargs.pop("stream", False)
while retries <= max_retries: while retries <= max_retries:
try: try:
if dify_config.SSRF_PROXY_ALL_URL: if dify_config.SSRF_PROXY_ALL_URL:
@ -64,11 +65,12 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
except httpx.RequestError as e: except httpx.RequestError as e:
logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}") logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}")
if max_retries == 0:
raise
retries += 1 retries += 1
if retries <= max_retries: if retries <= max_retries:
time.sleep(BACKOFF_FACTOR * (2 ** (retries - 1))) time.sleep(BACKOFF_FACTOR * (2 ** (retries - 1)))
raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}") raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}")

View File

@ -33,7 +33,7 @@ class GraphRunSucceededEvent(BaseGraphEvent):
class GraphRunFailedEvent(BaseGraphEvent): class GraphRunFailedEvent(BaseGraphEvent):
error: str = Field(..., description="failed reason") error: str = Field(..., description="failed reason")
exceptions_count: Optional[int] = Field(description="exception count", default=0) exceptions_count: int = Field(description="exception count", default=0)
class GraphRunPartialSucceededEvent(BaseGraphEvent): class GraphRunPartialSucceededEvent(BaseGraphEvent):
@ -97,11 +97,10 @@ class NodeInIterationFailedEvent(BaseNodeEvent):
error: str = Field(..., description="error") error: str = Field(..., description="error")
class NodeRunRetryEvent(BaseNodeEvent): class NodeRunRetryEvent(NodeRunStartedEvent):
error: str = Field(..., description="error") error: str = Field(..., description="error")
retry_index: int = Field(..., description="which retry attempt is about to be performed") retry_index: int = Field(..., description="which retry attempt is about to be performed")
start_at: datetime = Field(..., description="retry start time") start_at: datetime = Field(..., description="retry start time")
start_index: int = Field(..., description="retry start index")
########################################### ###########################################

View File

@ -641,7 +641,6 @@ class GraphEngine:
run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED
if node_instance.should_retry and retries < max_retries: if node_instance.should_retry and retries < max_retries:
retries += 1 retries += 1
self.graph_runtime_state.node_run_steps += 1
route_node_state.node_run_result = run_result route_node_state.node_run_result = run_result
yield NodeRunRetryEvent( yield NodeRunRetryEvent(
id=node_instance.id, id=node_instance.id,
@ -649,14 +648,14 @@ class GraphEngine:
node_type=node_instance.node_type, node_type=node_instance.node_type,
node_data=node_instance.node_data, node_data=node_instance.node_data,
route_node_state=route_node_state, route_node_state=route_node_state,
error=run_result.error, predecessor_node_id=node_instance.previous_node_id,
retry_index=retries,
parallel_id=parallel_id, parallel_id=parallel_id,
parallel_start_node_id=parallel_start_node_id, parallel_start_node_id=parallel_start_node_id,
parent_parallel_id=parent_parallel_id, parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id, parent_parallel_start_node_id=parent_parallel_start_node_id,
error=run_result.error,
retry_index=retries,
start_at=retry_start_at, start_at=retry_start_at,
start_index=self.graph_runtime_state.node_run_steps,
) )
time.sleep(retry_interval) time.sleep(retry_interval)
continue continue

View File

@ -39,15 +39,9 @@ class RunRetryEvent(BaseModel):
start_at: datetime = Field(..., description="Retry start time") start_at: datetime = Field(..., description="Retry start time")
class SingleStepRetryEvent(BaseModel): class SingleStepRetryEvent(NodeRunResult):
"""Single step retry event""" """Single step retry event"""
status: str = WorkflowNodeExecutionStatus.RETRY.value status: str = WorkflowNodeExecutionStatus.RETRY.value
inputs: dict | None = Field(..., description="input")
error: str = Field(..., description="error")
outputs: dict = Field(..., description="output")
retry_index: int = Field(..., description="Retry attempt number")
error: str = Field(..., description="error")
elapsed_time: float = Field(..., description="elapsed time") elapsed_time: float = Field(..., description="elapsed time")
execution_metadata: dict | None = Field(..., description="execution metadata")

View File

@ -249,9 +249,7 @@ class Executor:
# request_args = {k: v for k, v in request_args.items() if v is not None} # request_args = {k: v for k, v in request_args.items() if v is not None}
try: try:
response = getattr(ssrf_proxy, self.method)(**request_args) response = getattr(ssrf_proxy, self.method)(**request_args)
except httpx.RequestError as e: except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e:
raise HttpRequestNodeError(str(e))
except ssrf_proxy.MaxRetriesExceededError as e:
raise HttpRequestNodeError(str(e)) raise HttpRequestNodeError(str(e))
return response return response

View File

@ -82,13 +82,15 @@ workflow_run_detail_fields = {
} }
retry_event_field = { retry_event_field = {
"elapsed_time": fields.Float,
"status": fields.String,
"inputs": fields.Raw(attribute="inputs"),
"process_data": fields.Raw(attribute="process_data"),
"outputs": fields.Raw(attribute="outputs"),
"metadata": fields.Raw(attribute="metadata"),
"llm_usage": fields.Raw(attribute="llm_usage"),
"error": fields.String, "error": fields.String,
"retry_index": fields.Integer, "retry_index": fields.Integer,
"inputs": fields.Raw(attribute="inputs"),
"elapsed_time": fields.Float,
"execution_metadata": fields.Raw(attribute="execution_metadata_dict"),
"status": fields.String,
"outputs": fields.Raw(attribute="outputs"),
} }
@ -112,7 +114,6 @@ workflow_run_node_execution_fields = {
"created_by_account": fields.Nested(simple_account_fields, attribute="created_by_account", allow_null=True), "created_by_account": fields.Nested(simple_account_fields, attribute="created_by_account", allow_null=True),
"created_by_end_user": fields.Nested(simple_end_user_fields, attribute="created_by_end_user", allow_null=True), "created_by_end_user": fields.Nested(simple_end_user_fields, attribute="created_by_end_user", allow_null=True),
"finished_at": TimestampField, "finished_at": TimestampField,
"retry_events": fields.List(fields.Nested(retry_event_field)),
} }
workflow_run_node_execution_list_fields = { workflow_run_node_execution_list_fields = {

View File

@ -1,9 +1,7 @@
"""add retry_index field to node-execution model """add retry_index field to node-execution model
Revision ID: e1944c35e15e Revision ID: e1944c35e15e
Revises: 11b07f66c737 Revises: 11b07f66c737
Create Date: 2024-12-20 06:28:30.287197 Create Date: 2024-12-20 06:28:30.287197
""" """
from alembic import op from alembic import op
import models as models import models as models
@ -19,15 +17,21 @@ depends_on = None
def upgrade(): def upgrade():
# ### commands auto generated by Alembic - please adjust! ### # ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('workflow_node_executions', schema=None) as batch_op:
batch_op.add_column(sa.Column('retry_index', sa.Integer(), server_default=sa.text('0'), nullable=True)) # We don't need these fields anymore, but this file is already merged into the main branch,
# so we need to keep this file for the sake of history, and this change will be reverted in the next migration.
# with op.batch_alter_table('workflow_node_executions', schema=None) as batch_op:
# batch_op.add_column(sa.Column('retry_index', sa.Integer(), server_default=sa.text('0'), nullable=True))
pass
# ### end Alembic commands ### # ### end Alembic commands ###
def downgrade(): def downgrade():
# ### commands auto generated by Alembic - please adjust! ### # ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('workflow_node_executions', schema=None) as batch_op: # with op.batch_alter_table('workflow_node_executions', schema=None) as batch_op:
batch_op.drop_column('retry_index') # batch_op.drop_column('retry_index')
pass
# ### end Alembic commands ### # ### end Alembic commands ###

View File

@ -0,0 +1,34 @@
"""remove workflow_node_executions.retry_index if exists
Revision ID: d7999dfa4aae
Revises: e1944c35e15e
Create Date: 2024-12-23 11:54:15.344543
"""
from alembic import op
import models as models
import sqlalchemy as sa
from sqlalchemy import inspect
# revision identifiers, used by Alembic.
revision = 'd7999dfa4aae'
down_revision = 'e1944c35e15e'
branch_labels = None
depends_on = None
def upgrade():
# Check if column exists before attempting to remove it
conn = op.get_bind()
inspector = inspect(conn)
has_column = 'retry_index' in [col['name'] for col in inspector.get_columns('workflow_node_executions')]
if has_column:
with op.batch_alter_table('workflow_node_executions', schema=None) as batch_op:
batch_op.drop_column('retry_index')
def downgrade():
# No downgrade needed as we don't want to restore the column
pass

View File

@ -4,7 +4,7 @@ import uuid
from collections.abc import Mapping from collections.abc import Mapping
from datetime import datetime from datetime import datetime
from enum import Enum, StrEnum from enum import Enum, StrEnum
from typing import Any, Literal, Optional from typing import TYPE_CHECKING, Any, Literal, Optional
import sqlalchemy as sa import sqlalchemy as sa
from flask import request from flask import request
@ -24,6 +24,9 @@ from .account import Account, Tenant
from .engine import db from .engine import db
from .types import StringUUID from .types import StringUUID
if TYPE_CHECKING:
from .workflow import Workflow
class DifySetup(db.Model): class DifySetup(db.Model):
__tablename__ = "dify_setups" __tablename__ = "dify_setups"

View File

@ -641,7 +641,6 @@ class WorkflowNodeExecution(db.Model):
created_by_role = db.Column(db.String(255), nullable=False) created_by_role = db.Column(db.String(255), nullable=False)
created_by = db.Column(StringUUID, nullable=False) created_by = db.Column(StringUUID, nullable=False)
finished_at = db.Column(db.DateTime) finished_at = db.Column(db.DateTime)
retry_index = db.Column(db.Integer, server_default=db.text("0"))
@property @property
def created_by_account(self): def created_by_account(self):

View File

@ -15,7 +15,6 @@ from core.workflow.nodes.base.entities import BaseNodeData
from core.workflow.nodes.base.node import BaseNode from core.workflow.nodes.base.node import BaseNode
from core.workflow.nodes.enums import ErrorStrategy from core.workflow.nodes.enums import ErrorStrategy
from core.workflow.nodes.event import RunCompletedEvent from core.workflow.nodes.event import RunCompletedEvent
from core.workflow.nodes.event.event import SingleStepRetryEvent
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
from core.workflow.workflow_entry import WorkflowEntry from core.workflow.workflow_entry import WorkflowEntry
from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
@ -221,14 +220,8 @@ class WorkflowService:
# run draft workflow node # run draft workflow node
start_at = time.perf_counter() start_at = time.perf_counter()
retries = 0
max_retries = 0
should_retry = True
retry_events = []
try: try:
while retries <= max_retries and should_retry:
retry_start_at = time.perf_counter()
node_instance, generator = WorkflowEntry.single_step_run( node_instance, generator = WorkflowEntry.single_step_run(
workflow=draft_workflow, workflow=draft_workflow,
node_id=node_id, node_id=node_id,
@ -236,10 +229,6 @@ class WorkflowService:
user_id=account.id, user_id=account.id,
) )
node_instance = cast(BaseNode[BaseNodeData], node_instance) node_instance = cast(BaseNode[BaseNodeData], node_instance)
max_retries = (
node_instance.node_data.retry_config.max_retries if node_instance.node_data.retry_config else 0
)
retry_interval = node_instance.node_data.retry_config.retry_interval_seconds
node_run_result: NodeRunResult | None = None node_run_result: NodeRunResult | None = None
for event in generator: for event in generator:
if isinstance(event, RunCompletedEvent): if isinstance(event, RunCompletedEvent):
@ -252,40 +241,7 @@ class WorkflowService:
if not node_run_result: if not node_run_result:
raise ValueError("Node run failed with no run result") raise ValueError("Node run failed with no run result")
# single step debug mode error handling return # single step debug mode error handling return
if node_run_result.status == WorkflowNodeExecutionStatus.FAILED: if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error:
if (
retries == max_retries
and node_instance.node_type == NodeType.HTTP_REQUEST
and node_run_result.outputs
and not node_instance.should_continue_on_error
):
node_run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED
should_retry = False
else:
if node_instance.should_retry:
node_run_result.status = WorkflowNodeExecutionStatus.RETRY
retries += 1
node_run_result.retry_index = retries
retry_events.append(
SingleStepRetryEvent(
inputs=WorkflowEntry.handle_special_values(node_run_result.inputs)
if node_run_result.inputs
else None,
error=node_run_result.error,
outputs=WorkflowEntry.handle_special_values(node_run_result.outputs)
if node_run_result.outputs
else None,
retry_index=node_run_result.retry_index,
elapsed_time=time.perf_counter() - retry_start_at,
execution_metadata=WorkflowEntry.handle_special_values(node_run_result.metadata)
if node_run_result.metadata
else None,
)
)
time.sleep(retry_interval)
else:
should_retry = False
if node_instance.should_continue_on_error:
node_error_args = { node_error_args = {
"status": WorkflowNodeExecutionStatus.EXCEPTION, "status": WorkflowNodeExecutionStatus.EXCEPTION,
"error": node_run_result.error, "error": node_run_result.error,
@ -362,7 +318,6 @@ class WorkflowService:
db.session.add(workflow_node_execution) db.session.add(workflow_node_execution)
db.session.commit() db.session.commit()
workflow_node_execution.retry_events = retry_events
return workflow_node_execution return workflow_node_execution