diff --git a/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py b/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py index 7e0115e4e7..b213449be5 100644 --- a/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py +++ b/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py @@ -4,8 +4,8 @@ SQLAlchemy implementation of the WorkflowNodeExecutionRepository. import json import logging -from collections.abc import Mapping, Sequence -from typing import Any, Optional, Union, cast +from collections.abc import Sequence +from typing import Optional, Union from sqlalchemy import UnaryExpression, asc, delete, desc, select from sqlalchemy.engine import Engine @@ -86,8 +86,8 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository) self._creator_user_role = CreatorUserRole.ACCOUNT if isinstance(user, Account) else CreatorUserRole.END_USER # Initialize in-memory cache for node executions - # Key: node_execution_id, Value: NodeExecution - self._node_execution_cache: dict[str, NodeExecution] = {} + # Key: node_execution_id, Value: WorkflowNodeExecution (DB model) + self._node_execution_cache: dict[str, WorkflowNodeExecution] = {} def _to_domain_model(self, db_model: WorkflowNodeExecution) -> NodeExecution: """ @@ -103,7 +103,10 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository) inputs = db_model.inputs_dict process_data = db_model.process_data_dict outputs = db_model.outputs_dict - metadata = db_model.execution_metadata_dict + if db_model.execution_metadata_dict: + metadata = {NodeRunMetadataKey(k): v for k, v in db_model.execution_metadata_dict.items()} + else: + metadata = {} # Convert status to domain enum status = NodeExecutionStatus(db_model.status) @@ -124,12 +127,7 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository) status=status, error=db_model.error, elapsed_time=db_model.elapsed_time, - # FIXME(QuantumGhost): a temporary workaround for the following type check failure in Python 3.11. - # However, this problem is not occurred in Python 3.12. - # - # A case of this error is: - # https://github.com/langgenius/dify/actions/runs/15112698604/job/42475659482?pr=19737#step:9:24 - metadata=cast(Mapping[NodeRunMetadataKey, Any] | None, metadata), + metadata=metadata, created_at=db_model.created_at, finished_at=db_model.finished_at, ) @@ -211,7 +209,7 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository) # Only cache if we have a node_execution_id to use as the cache key if db_model.node_execution_id: logger.debug(f"Updating cache for node_execution_id: {db_model.node_execution_id}") - self._node_execution_cache[db_model.node_execution_id] = execution + self._node_execution_cache[db_model.node_execution_id] = db_model def get_by_node_execution_id(self, node_execution_id: str) -> Optional[NodeExecution]: """ @@ -229,7 +227,9 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository) # First check the cache if node_execution_id in self._node_execution_cache: logger.debug(f"Cache hit for node_execution_id: {node_execution_id}") - return self._node_execution_cache[node_execution_id] + # Convert cached DB model to domain model + cached_db_model = self._node_execution_cache[node_execution_id] + return self._to_domain_model(cached_db_model) # If not in cache, query the database logger.debug(f"Cache miss for node_execution_id: {node_execution_id}, querying database") @@ -244,26 +244,25 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository) db_model = session.scalar(stmt) if db_model: - # Convert to domain model - domain_model = self._to_domain_model(db_model) + # Add DB model to cache + self._node_execution_cache[node_execution_id] = db_model - # Add to cache - self._node_execution_cache[node_execution_id] = domain_model - - return domain_model + # Convert to domain model and return + return self._to_domain_model(db_model) return None - def get_by_workflow_run( + def get_db_models_by_workflow_run( self, workflow_run_id: str, order_config: Optional[OrderConfig] = None, - ) -> Sequence[NodeExecution]: + ) -> Sequence[WorkflowNodeExecution]: """ - Retrieve all NodeExecution instances for a specific workflow run. + Retrieve all WorkflowNodeExecution database models for a specific workflow run. - This method always queries the database to ensure complete and ordered results, - but updates the cache with any retrieved executions. + This method directly returns database models without converting to domain models, + which is useful when you need to access database-specific fields like triggered_from. + It also updates the in-memory cache with the retrieved models. Args: workflow_run_id: The workflow run ID @@ -272,7 +271,7 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository) order_config.order_direction: Direction to order ("asc" or "desc") Returns: - A list of NodeExecution instances + A list of WorkflowNodeExecution database models """ with self._session_factory() as session: stmt = select(WorkflowNodeExecution).where( @@ -301,16 +300,43 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository) db_models = session.scalars(stmt).all() - # Convert database models to domain models and update cache - domain_models = [] + # Update the cache with the retrieved DB models for model in db_models: - domain_model = self._to_domain_model(model) - # Update cache if node_execution_id is present - if domain_model.node_execution_id: - self._node_execution_cache[domain_model.node_execution_id] = domain_model - domain_models.append(domain_model) + if model.node_execution_id: + self._node_execution_cache[model.node_execution_id] = model - return domain_models + return db_models + + def get_by_workflow_run( + self, + workflow_run_id: str, + order_config: Optional[OrderConfig] = None, + ) -> Sequence[NodeExecution]: + """ + Retrieve all NodeExecution instances for a specific workflow run. + + This method always queries the database to ensure complete and ordered results, + but updates the cache with any retrieved executions. + + Args: + workflow_run_id: The workflow run ID + order_config: Optional configuration for ordering results + order_config.order_by: List of fields to order by (e.g., ["index", "created_at"]) + order_config.order_direction: Direction to order ("asc" or "desc") + + Returns: + A list of NodeExecution instances + """ + # Get the database models using the new method + db_models = self.get_db_models_by_workflow_run(workflow_run_id, order_config) + + # Convert database models to domain models + domain_models = [] + for model in db_models: + domain_model = self._to_domain_model(model) + domain_models.append(domain_model) + + return domain_models def get_running_executions(self, workflow_run_id: str) -> Sequence[NodeExecution]: """ @@ -340,10 +366,12 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository) domain_models = [] for model in db_models: - domain_model = self._to_domain_model(model) # Update cache if node_execution_id is present - if domain_model.node_execution_id: - self._node_execution_cache[domain_model.node_execution_id] = domain_model + if model.node_execution_id: + self._node_execution_cache[model.node_execution_id] = model + + # Convert to domain model + domain_model = self._to_domain_model(model) domain_models.append(domain_model) return domain_models diff --git a/api/services/workflow_run_service.py b/api/services/workflow_run_service.py index 3efdb3a79d..21366a4552 100644 --- a/api/services/workflow_run_service.py +++ b/api/services/workflow_run_service.py @@ -15,6 +15,7 @@ from models import ( WorkflowRun, WorkflowRunTriggeredFrom, ) +from models.workflow import WorkflowNodeExecutionTriggeredFrom class WorkflowRunService: @@ -140,14 +141,13 @@ class WorkflowRunService: session_factory=db.engine, user=user, app_id=app_model.id, - triggered_from=None, + triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN, ) - # Use the repository to get the node executions with ordering + # Use the repository to get the database models directly order_config = OrderConfig(order_by=["index"], order_direction="desc") - node_executions = repository.get_by_workflow_run(workflow_run_id=run_id, order_config=order_config) - - # Convert domain models to database models - workflow_node_executions = [repository.to_db_model(node_execution) for node_execution in node_executions] + workflow_node_executions = repository.get_db_models_by_workflow_run( + workflow_run_id=run_id, order_config=order_config + ) return workflow_node_executions