fix: correct output order in parallel mode for iteration nodes (#10323)

This commit is contained in:
Novice 2024-11-08 15:32:40 +08:00 committed by GitHub
parent aa3da0e24c
commit a7dbe58c85
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 7 additions and 5 deletions

View File

@ -156,7 +156,7 @@ class IterationNode(BaseNode[IterationNodeData]):
index=0, index=0,
pre_iteration_output=None, pre_iteration_output=None,
) )
outputs: list[Any] = [] outputs: list[Any] = [None] * len(iterator_list_value)
try: try:
if self.node_data.is_parallel: if self.node_data.is_parallel:
futures: list[Future] = [] futures: list[Future] = []
@ -214,6 +214,8 @@ class IterationNode(BaseNode[IterationNodeData]):
graph_engine, graph_engine,
iteration_graph, iteration_graph,
) )
if self.node_data.error_handle_mode == ErrorHandleMode.REMOVE_ABNORMAL_OUTPUT:
outputs = [output for output in outputs if output is not None]
yield IterationRunSucceededEvent( yield IterationRunSucceededEvent(
iteration_id=self.id, iteration_id=self.id,
iteration_node_id=self.node_id, iteration_node_id=self.node_id,
@ -425,7 +427,7 @@ class IterationNode(BaseNode[IterationNodeData]):
yield NodeInIterationFailedEvent( yield NodeInIterationFailedEvent(
**metadata_event.model_dump(), **metadata_event.model_dump(),
) )
outputs.insert(current_index, None) outputs[current_index] = None
variable_pool.add([self.node_id, "index"], next_index) variable_pool.add([self.node_id, "index"], next_index)
if next_index < len(iterator_list_value): if next_index < len(iterator_list_value):
variable_pool.add([self.node_id, "item"], iterator_list_value[next_index]) variable_pool.add([self.node_id, "item"], iterator_list_value[next_index])
@ -473,7 +475,7 @@ class IterationNode(BaseNode[IterationNodeData]):
yield metadata_event yield metadata_event
current_iteration_output = variable_pool.get(self.node_data.output_selector).value current_iteration_output = variable_pool.get(self.node_data.output_selector).value
outputs.insert(current_index, current_iteration_output) outputs[current_index] = current_iteration_output
# remove all nodes outputs from variable pool # remove all nodes outputs from variable pool
for node_id in iteration_graph.node_ids: for node_id in iteration_graph.node_ids:
variable_pool.remove([node_id]) variable_pool.remove([node_id])

View File

@ -569,8 +569,8 @@ const translation = {
errorResponseMethod: 'Error response method', errorResponseMethod: 'Error response method',
ErrorMethod: { ErrorMethod: {
operationTerminated: 'terminated', operationTerminated: 'terminated',
continueOnError: 'continue-on-error', continueOnError: 'continue on error',
removeAbnormalOutput: 'remove-abnormal-output', removeAbnormalOutput: 'remove abnormal output',
}, },
answerNodeWarningDesc: 'Parallel mode warning: Answer nodes, conversation variable assignments, and persistent read/write operations within iterations may cause exceptions.', answerNodeWarningDesc: 'Parallel mode warning: Answer nodes, conversation variable assignments, and persistent read/write operations within iterations may cause exceptions.',
}, },