diff --git a/api/app_factory.py b/api/app_factory.py
index 586f2ded9e..1c886ac5c7 100644
--- a/api/app_factory.py
+++ b/api/app_factory.py
@@ -54,7 +54,6 @@ def initialize_extensions(app: DifyApp):
ext_otel,
ext_proxy_fix,
ext_redis,
- ext_repositories,
ext_sentry,
ext_set_secretkey,
ext_storage,
@@ -75,7 +74,6 @@ def initialize_extensions(app: DifyApp):
ext_migrate,
ext_redis,
ext_storage,
- ext_repositories,
ext_celery,
ext_login,
ext_mail,
diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py
index fd0d7fafbd..4b0e64130b 100644
--- a/api/core/app/apps/advanced_chat/app_generator.py
+++ b/api/core/app/apps/advanced_chat/app_generator.py
@@ -25,7 +25,7 @@ from core.app.entities.task_entities import ChatbotAppBlockingResponse, ChatbotA
from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.ops.ops_trace_manager import TraceQueueManager
from core.prompt.utils.get_thread_messages_length import get_thread_messages_length
-from core.workflow.repository import RepositoryFactory
+from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from extensions.ext_database import db
from factories import file_factory
@@ -163,12 +163,10 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
# Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
- workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository(
- params={
- "tenant_id": application_generate_entity.app_config.tenant_id,
- "app_id": application_generate_entity.app_config.app_id,
- "session_factory": session_factory,
- }
+ workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
+ session_factory=session_factory,
+ tenant_id=application_generate_entity.app_config.tenant_id,
+ app_id=application_generate_entity.app_config.app_id,
)
return self._generate(
@@ -231,12 +229,10 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
# Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
- workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository(
- params={
- "tenant_id": application_generate_entity.app_config.tenant_id,
- "app_id": application_generate_entity.app_config.app_id,
- "session_factory": session_factory,
- }
+ workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
+ session_factory=session_factory,
+ tenant_id=application_generate_entity.app_config.tenant_id,
+ app_id=application_generate_entity.app_config.app_id,
)
return self._generate(
@@ -297,12 +293,10 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
# Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
- workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository(
- params={
- "tenant_id": application_generate_entity.app_config.tenant_id,
- "app_id": application_generate_entity.app_config.app_id,
- "session_factory": session_factory,
- }
+ workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
+ session_factory=session_factory,
+ tenant_id=application_generate_entity.app_config.tenant_id,
+ app_id=application_generate_entity.app_config.app_id,
)
return self._generate(
diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py
index 1f4db54a9c..f71c49d112 100644
--- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py
+++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py
@@ -9,7 +9,6 @@ from sqlalchemy import select
from sqlalchemy.orm import Session
from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY_YIELD_CPU_TIME
-from core.app.apps.advanced_chat.app_generator_tts_publisher import AppGeneratorTTSPublisher, AudioTrunk
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.entities.app_invoke_entities import (
AdvancedChatAppGenerateEntity,
@@ -58,7 +57,7 @@ from core.app.entities.task_entities import (
)
from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline
from core.app.task_pipeline.message_cycle_manage import MessageCycleManage
-from core.app.task_pipeline.workflow_cycle_manage import WorkflowCycleManage
+from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.model_runtime.entities.llm_entities import LLMUsage
from core.model_runtime.utils.encoders import jsonable_encoder
from core.ops.ops_trace_manager import TraceQueueManager
@@ -66,6 +65,7 @@ from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from core.workflow.nodes import NodeType
from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
+from core.workflow.workflow_cycle_manager import WorkflowCycleManager
from events.message_event import message_was_created
from extensions.ext_database import db
from models import Conversation, EndUser, Message, MessageFile
@@ -113,7 +113,7 @@ class AdvancedChatAppGenerateTaskPipeline:
else:
raise NotImplementedError(f"User type not supported: {type(user)}")
- self._workflow_cycle_manager = WorkflowCycleManage(
+ self._workflow_cycle_manager = WorkflowCycleManager(
application_generate_entity=application_generate_entity,
workflow_system_variables={
SystemVariableKey.QUERY: message.query,
diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py
index 9c3d78a338..1d67671974 100644
--- a/api/core/app/apps/workflow/app_generator.py
+++ b/api/core/app/apps/workflow/app_generator.py
@@ -18,13 +18,13 @@ from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.app.apps.workflow.app_queue_manager import WorkflowAppQueueManager
from core.app.apps.workflow.app_runner import WorkflowAppRunner
from core.app.apps.workflow.generate_response_converter import WorkflowAppGenerateResponseConverter
-from core.app.apps.workflow.generate_task_pipeline import WorkflowAppGenerateTaskPipeline
from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity
from core.app.entities.task_entities import WorkflowAppBlockingResponse, WorkflowAppStreamResponse
from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.ops.ops_trace_manager import TraceQueueManager
-from core.workflow.repository import RepositoryFactory
+from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
+from core.workflow.workflow_app_generate_task_pipeline import WorkflowAppGenerateTaskPipeline
from extensions.ext_database import db
from factories import file_factory
from models import Account, App, EndUser, Workflow
@@ -138,12 +138,10 @@ class WorkflowAppGenerator(BaseAppGenerator):
# Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
- workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository(
- params={
- "tenant_id": application_generate_entity.app_config.tenant_id,
- "app_id": application_generate_entity.app_config.app_id,
- "session_factory": session_factory,
- }
+ workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
+ session_factory=session_factory,
+ tenant_id=application_generate_entity.app_config.tenant_id,
+ app_id=application_generate_entity.app_config.app_id,
)
return self._generate(
@@ -264,12 +262,10 @@ class WorkflowAppGenerator(BaseAppGenerator):
# Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
- workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository(
- params={
- "tenant_id": application_generate_entity.app_config.tenant_id,
- "app_id": application_generate_entity.app_config.app_id,
- "session_factory": session_factory,
- }
+ workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
+ session_factory=session_factory,
+ tenant_id=application_generate_entity.app_config.tenant_id,
+ app_id=application_generate_entity.app_config.app_id,
)
return self._generate(
@@ -329,12 +325,10 @@ class WorkflowAppGenerator(BaseAppGenerator):
# Create workflow node execution repository
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
- workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository(
- params={
- "tenant_id": application_generate_entity.app_config.tenant_id,
- "app_id": application_generate_entity.app_config.app_id,
- "session_factory": session_factory,
- }
+ workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
+ session_factory=session_factory,
+ tenant_id=application_generate_entity.app_config.tenant_id,
+ app_id=application_generate_entity.app_config.app_id,
)
return self._generate(
diff --git a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py
index 8c9c26d36e..a98a42f5df 100644
--- a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py
+++ b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py
@@ -9,7 +9,6 @@ from sqlalchemy import select
from sqlalchemy.orm import Session
from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY_YIELD_CPU_TIME
-from core.app.apps.advanced_chat.app_generator_tts_publisher import AppGeneratorTTSPublisher, AudioTrunk
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.entities.app_invoke_entities import (
AgentChatAppGenerateEntity,
@@ -45,6 +44,7 @@ from core.app.entities.task_entities import (
)
from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline
from core.app.task_pipeline.message_cycle_manage import MessageCycleManage
+from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.model_manager import ModelInstance
from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage
from core.model_runtime.entities.message_entities import (
diff --git a/api/core/base/__init__.py b/api/core/base/__init__.py
new file mode 100644
index 0000000000..3f4bd3b771
--- /dev/null
+++ b/api/core/base/__init__.py
@@ -0,0 +1 @@
+# Core base package
diff --git a/api/core/base/tts/__init__.py b/api/core/base/tts/__init__.py
new file mode 100644
index 0000000000..37b6eeebb0
--- /dev/null
+++ b/api/core/base/tts/__init__.py
@@ -0,0 +1,6 @@
+from core.base.tts.app_generator_tts_publisher import AppGeneratorTTSPublisher, AudioTrunk
+
+__all__ = [
+ "AppGeneratorTTSPublisher",
+ "AudioTrunk",
+]
diff --git a/api/core/app/apps/advanced_chat/app_generator_tts_publisher.py b/api/core/base/tts/app_generator_tts_publisher.py
similarity index 100%
rename from api/core/app/apps/advanced_chat/app_generator_tts_publisher.py
rename to api/core/base/tts/app_generator_tts_publisher.py
diff --git a/api/core/ops/langfuse_trace/langfuse_trace.py b/api/core/ops/langfuse_trace/langfuse_trace.py
index b229d244f7..c74617e558 100644
--- a/api/core/ops/langfuse_trace/langfuse_trace.py
+++ b/api/core/ops/langfuse_trace/langfuse_trace.py
@@ -29,7 +29,7 @@ from core.ops.langfuse_trace.entities.langfuse_trace_entity import (
UnitEnum,
)
from core.ops.utils import filter_none_values
-from core.workflow.repository.repository_factory import RepositoryFactory
+from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from extensions.ext_database import db
from models.model import EndUser
@@ -113,8 +113,8 @@ class LangFuseDataTrace(BaseTraceInstance):
# through workflow_run_id get all_nodes_execution using repository
session_factory = sessionmaker(bind=db.engine)
- workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository(
- params={"tenant_id": trace_info.tenant_id, "session_factory": session_factory},
+ workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
+ session_factory=session_factory, tenant_id=trace_info.tenant_id
)
# Get all executions for this workflow run
diff --git a/api/core/ops/langsmith_trace/langsmith_trace.py b/api/core/ops/langsmith_trace/langsmith_trace.py
index 78a51ff36e..d1e16d3152 100644
--- a/api/core/ops/langsmith_trace/langsmith_trace.py
+++ b/api/core/ops/langsmith_trace/langsmith_trace.py
@@ -28,7 +28,7 @@ from core.ops.langsmith_trace.entities.langsmith_trace_entity import (
LangSmithRunUpdateModel,
)
from core.ops.utils import filter_none_values, generate_dotted_order
-from core.workflow.repository.repository_factory import RepositoryFactory
+from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from extensions.ext_database import db
from models.model import EndUser, MessageFile
@@ -137,12 +137,8 @@ class LangSmithDataTrace(BaseTraceInstance):
# through workflow_run_id get all_nodes_execution using repository
session_factory = sessionmaker(bind=db.engine)
- workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository(
- params={
- "tenant_id": trace_info.tenant_id,
- "app_id": trace_info.metadata.get("app_id"),
- "session_factory": session_factory,
- },
+ workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
+ session_factory=session_factory, tenant_id=trace_info.tenant_id, app_id=trace_info.metadata.get("app_id")
)
# Get all executions for this workflow run
diff --git a/api/core/ops/opik_trace/opik_trace.py b/api/core/ops/opik_trace/opik_trace.py
index a14b5afb8e..1484041447 100644
--- a/api/core/ops/opik_trace/opik_trace.py
+++ b/api/core/ops/opik_trace/opik_trace.py
@@ -22,7 +22,7 @@ from core.ops.entities.trace_entity import (
TraceTaskName,
WorkflowTraceInfo,
)
-from core.workflow.repository.repository_factory import RepositoryFactory
+from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from extensions.ext_database import db
from models.model import EndUser, MessageFile
@@ -150,12 +150,8 @@ class OpikDataTrace(BaseTraceInstance):
# through workflow_run_id get all_nodes_execution using repository
session_factory = sessionmaker(bind=db.engine)
- workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository(
- params={
- "tenant_id": trace_info.tenant_id,
- "app_id": trace_info.metadata.get("app_id"),
- "session_factory": session_factory,
- },
+ workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository(
+ session_factory=session_factory, tenant_id=trace_info.tenant_id, app_id=trace_info.metadata.get("app_id")
)
# Get all executions for this workflow run
diff --git a/api/core/repositories/__init__.py b/api/core/repositories/__init__.py
index 5c70d50cde..6452317120 100644
--- a/api/core/repositories/__init__.py
+++ b/api/core/repositories/__init__.py
@@ -4,3 +4,9 @@ Repository implementations for data access.
This package contains concrete implementations of the repository interfaces
defined in the core.workflow.repository package.
"""
+
+from core.repositories.sqlalchemy_workflow_node_execution_repository import SQLAlchemyWorkflowNodeExecutionRepository
+
+__all__ = [
+ "SQLAlchemyWorkflowNodeExecutionRepository",
+]
diff --git a/api/core/repositories/repository_registry.py b/api/core/repositories/repository_registry.py
deleted file mode 100644
index b66f3ba8e6..0000000000
--- a/api/core/repositories/repository_registry.py
+++ /dev/null
@@ -1,87 +0,0 @@
-"""
-Registry for repository implementations.
-
-This module is responsible for registering factory functions with the repository factory.
-"""
-
-import logging
-from collections.abc import Mapping
-from typing import Any
-
-from sqlalchemy.orm import sessionmaker
-
-from configs import dify_config
-from core.repositories.workflow_node_execution import SQLAlchemyWorkflowNodeExecutionRepository
-from core.workflow.repository.repository_factory import RepositoryFactory
-from extensions.ext_database import db
-
-logger = logging.getLogger(__name__)
-
-# Storage type constants
-STORAGE_TYPE_RDBMS = "rdbms"
-STORAGE_TYPE_HYBRID = "hybrid"
-
-
-def register_repositories() -> None:
- """
- Register repository factory functions with the RepositoryFactory.
-
- This function reads configuration settings to determine which repository
- implementations to register.
- """
- # Configure WorkflowNodeExecutionRepository factory based on configuration
- workflow_node_execution_storage = dify_config.WORKFLOW_NODE_EXECUTION_STORAGE
-
- # Check storage type and register appropriate implementation
- if workflow_node_execution_storage == STORAGE_TYPE_RDBMS:
- # Register SQLAlchemy implementation for RDBMS storage
- logger.info("Registering WorkflowNodeExecution repository with RDBMS storage")
- RepositoryFactory.register_workflow_node_execution_factory(create_workflow_node_execution_repository)
- elif workflow_node_execution_storage == STORAGE_TYPE_HYBRID:
- # Hybrid storage is not yet implemented
- raise NotImplementedError("Hybrid storage for WorkflowNodeExecution repository is not yet implemented")
- else:
- # Unknown storage type
- raise ValueError(
- f"Unknown storage type '{workflow_node_execution_storage}' for WorkflowNodeExecution repository. "
- f"Supported types: {STORAGE_TYPE_RDBMS}"
- )
-
-
-def create_workflow_node_execution_repository(params: Mapping[str, Any]) -> SQLAlchemyWorkflowNodeExecutionRepository:
- """
- Create a WorkflowNodeExecutionRepository instance using SQLAlchemy implementation.
-
- This factory function creates a repository for the RDBMS storage type.
-
- Args:
- params: Parameters for creating the repository, including:
- - tenant_id: Required. The tenant ID for multi-tenancy.
- - app_id: Optional. The application ID for filtering.
- - session_factory: Optional. A SQLAlchemy sessionmaker instance. If not provided,
- a new sessionmaker will be created using the global database engine.
-
- Returns:
- A WorkflowNodeExecutionRepository instance
-
- Raises:
- ValueError: If required parameters are missing
- """
- # Extract required parameters
- tenant_id = params.get("tenant_id")
- if tenant_id is None:
- raise ValueError("tenant_id is required for WorkflowNodeExecution repository with RDBMS storage")
-
- # Extract optional parameters
- app_id = params.get("app_id")
-
- # Use the session_factory from params if provided, otherwise create one using the global db engine
- session_factory = params.get("session_factory")
- if session_factory is None:
- # Create a sessionmaker using the same engine as the global db session
- session_factory = sessionmaker(bind=db.engine)
-
- # Create and return the repository
- return SQLAlchemyWorkflowNodeExecutionRepository(
- session_factory=session_factory, tenant_id=tenant_id, app_id=app_id
- )
diff --git a/api/core/repositories/workflow_node_execution/sqlalchemy_repository.py b/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py
similarity index 98%
rename from api/core/repositories/workflow_node_execution/sqlalchemy_repository.py
rename to api/core/repositories/sqlalchemy_workflow_node_execution_repository.py
index b1d37163a4..8bf2ab8761 100644
--- a/api/core/repositories/workflow_node_execution/sqlalchemy_repository.py
+++ b/api/core/repositories/sqlalchemy_workflow_node_execution_repository.py
@@ -10,13 +10,13 @@ from sqlalchemy import UnaryExpression, asc, delete, desc, select
from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker
-from core.workflow.repository.workflow_node_execution_repository import OrderConfig
+from core.workflow.repository.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository
from models.workflow import WorkflowNodeExecution, WorkflowNodeExecutionStatus, WorkflowNodeExecutionTriggeredFrom
logger = logging.getLogger(__name__)
-class SQLAlchemyWorkflowNodeExecutionRepository:
+class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository):
"""
SQLAlchemy implementation of the WorkflowNodeExecutionRepository interface.
diff --git a/api/core/repositories/workflow_node_execution/__init__.py b/api/core/repositories/workflow_node_execution/__init__.py
deleted file mode 100644
index 76e8282b7d..0000000000
--- a/api/core/repositories/workflow_node_execution/__init__.py
+++ /dev/null
@@ -1,9 +0,0 @@
-"""
-WorkflowNodeExecution repository implementations.
-"""
-
-from core.repositories.workflow_node_execution.sqlalchemy_repository import SQLAlchemyWorkflowNodeExecutionRepository
-
-__all__ = [
- "SQLAlchemyWorkflowNodeExecutionRepository",
-]
diff --git a/api/core/workflow/repository/__init__.py b/api/core/workflow/repository/__init__.py
index d91506e72f..672abb6583 100644
--- a/api/core/workflow/repository/__init__.py
+++ b/api/core/workflow/repository/__init__.py
@@ -6,10 +6,9 @@ for accessing and manipulating data, regardless of the underlying
storage mechanism.
"""
-from core.workflow.repository.repository_factory import RepositoryFactory
-from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
+from core.workflow.repository.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository
__all__ = [
- "RepositoryFactory",
+ "OrderConfig",
"WorkflowNodeExecutionRepository",
]
diff --git a/api/core/workflow/repository/repository_factory.py b/api/core/workflow/repository/repository_factory.py
deleted file mode 100644
index 45d6f5d842..0000000000
--- a/api/core/workflow/repository/repository_factory.py
+++ /dev/null
@@ -1,97 +0,0 @@
-"""
-Repository factory for creating repository instances.
-
-This module provides a simple factory interface for creating repository instances.
-It does not contain any implementation details or dependencies on specific repositories.
-"""
-
-from collections.abc import Callable, Mapping
-from typing import Any, Literal, Optional, cast
-
-from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
-
-# Type for factory functions - takes a dict of parameters and returns any repository type
-RepositoryFactoryFunc = Callable[[Mapping[str, Any]], Any]
-
-# Type for workflow node execution factory function
-WorkflowNodeExecutionFactoryFunc = Callable[[Mapping[str, Any]], WorkflowNodeExecutionRepository]
-
-# Repository type literals
-_RepositoryType = Literal["workflow_node_execution"]
-
-
-class RepositoryFactory:
- """
- Factory class for creating repository instances.
-
- This factory delegates the actual repository creation to implementation-specific
- factory functions that are registered with the factory at runtime.
- """
-
- # Dictionary to store factory functions
- _factory_functions: dict[str, RepositoryFactoryFunc] = {}
-
- @classmethod
- def _register_factory(cls, repository_type: _RepositoryType, factory_func: RepositoryFactoryFunc) -> None:
- """
- Register a factory function for a specific repository type.
- This is a private method and should not be called directly.
-
- Args:
- repository_type: The type of repository (e.g., 'workflow_node_execution')
- factory_func: A function that takes parameters and returns a repository instance
- """
- cls._factory_functions[repository_type] = factory_func
-
- @classmethod
- def _create_repository(cls, repository_type: _RepositoryType, params: Optional[Mapping[str, Any]] = None) -> Any:
- """
- Create a new repository instance with the provided parameters.
- This is a private method and should not be called directly.
-
- Args:
- repository_type: The type of repository to create
- params: A dictionary of parameters to pass to the factory function
-
- Returns:
- A new instance of the requested repository
-
- Raises:
- ValueError: If no factory function is registered for the repository type
- """
- if repository_type not in cls._factory_functions:
- raise ValueError(f"No factory function registered for repository type '{repository_type}'")
-
- # Use empty dict if params is None
- params = params or {}
-
- return cls._factory_functions[repository_type](params)
-
- @classmethod
- def register_workflow_node_execution_factory(cls, factory_func: WorkflowNodeExecutionFactoryFunc) -> None:
- """
- Register a factory function for the workflow node execution repository.
-
- Args:
- factory_func: A function that takes parameters and returns a WorkflowNodeExecutionRepository instance
- """
- cls._register_factory("workflow_node_execution", factory_func)
-
- @classmethod
- def create_workflow_node_execution_repository(
- cls, params: Optional[Mapping[str, Any]] = None
- ) -> WorkflowNodeExecutionRepository:
- """
- Create a new WorkflowNodeExecutionRepository instance with the provided parameters.
-
- Args:
- params: A dictionary of parameters to pass to the factory function
-
- Returns:
- A new instance of the WorkflowNodeExecutionRepository
-
- Raises:
- ValueError: If no factory function is registered for the workflow_node_execution repository type
- """
- # We can safely cast here because we've registered a WorkflowNodeExecutionFactoryFunc
- return cast(WorkflowNodeExecutionRepository, cls._create_repository("workflow_node_execution", params))
diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/workflow/workflow_app_generate_task_pipeline.py
similarity index 99%
rename from api/core/app/apps/workflow/generate_task_pipeline.py
rename to api/core/workflow/workflow_app_generate_task_pipeline.py
index 67cad9c998..10a2d8b38b 100644
--- a/api/core/app/apps/workflow/generate_task_pipeline.py
+++ b/api/core/workflow/workflow_app_generate_task_pipeline.py
@@ -6,7 +6,6 @@ from typing import Optional, Union
from sqlalchemy.orm import Session
from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY_YIELD_CPU_TIME
-from core.app.apps.advanced_chat.app_generator_tts_publisher import AppGeneratorTTSPublisher, AudioTrunk
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import (
InvokeFrom,
@@ -52,10 +51,11 @@ from core.app.entities.task_entities import (
WorkflowTaskState,
)
from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline
-from core.app.task_pipeline.workflow_cycle_manage import WorkflowCycleManage
+from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk
from core.ops.ops_trace_manager import TraceQueueManager
from core.workflow.enums import SystemVariableKey
from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
+from core.workflow.workflow_cycle_manager import WorkflowCycleManager
from extensions.ext_database import db
from models.account import Account
from models.enums import CreatedByRole
@@ -102,7 +102,7 @@ class WorkflowAppGenerateTaskPipeline:
else:
raise ValueError(f"Invalid user type: {type(user)}")
- self._workflow_cycle_manager = WorkflowCycleManage(
+ self._workflow_cycle_manager = WorkflowCycleManager(
application_generate_entity=application_generate_entity,
workflow_system_variables={
SystemVariableKey.FILES: application_generate_entity.files,
diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/workflow/workflow_cycle_manager.py
similarity index 99%
rename from api/core/app/task_pipeline/workflow_cycle_manage.py
rename to api/core/workflow/workflow_cycle_manager.py
index 09e2ee74e6..01d5db4303 100644
--- a/api/core/app/task_pipeline/workflow_cycle_manage.py
+++ b/api/core/workflow/workflow_cycle_manager.py
@@ -69,7 +69,7 @@ from models.workflow import (
)
-class WorkflowCycleManage:
+class WorkflowCycleManager:
def __init__(
self,
*,
diff --git a/api/extensions/ext_repositories.py b/api/extensions/ext_repositories.py
deleted file mode 100644
index b8cfea121b..0000000000
--- a/api/extensions/ext_repositories.py
+++ /dev/null
@@ -1,18 +0,0 @@
-"""
-Extension for initializing repositories.
-
-This extension registers repository implementations with the RepositoryFactory.
-"""
-
-from core.repositories.repository_registry import register_repositories
-from dify_app import DifyApp
-
-
-def init_app(_app: DifyApp) -> None:
- """
- Initialize repository implementations.
-
- Args:
- _app: The Flask application instance (unused)
- """
- register_repositories()
diff --git a/api/migrations/versions/2024_10_10_0516-bbadea11becb_add_name_and_size_to_tool_files.py b/api/migrations/versions/2024_10_10_0516-bbadea11becb_add_name_and_size_to_tool_files.py
index 5b5656e7ed..00f2b15802 100644
--- a/api/migrations/versions/2024_10_10_0516-bbadea11becb_add_name_and_size_to_tool_files.py
+++ b/api/migrations/versions/2024_10_10_0516-bbadea11becb_add_name_and_size_to_tool_files.py
@@ -5,45 +5,61 @@ Revises: 33f5fac87f29
Create Date: 2024-10-10 05:16:14.764268
"""
-from alembic import op
-import models as models
+
import sqlalchemy as sa
-from sqlalchemy.dialects import postgresql
+from alembic import op, context
# revision identifiers, used by Alembic.
-revision = 'bbadea11becb'
-down_revision = 'd8e744d88ed6'
+revision = "bbadea11becb"
+down_revision = "d8e744d88ed6"
branch_labels = None
depends_on = None
def upgrade():
+ def _has_name_or_size_column() -> bool:
+ # We cannot access the database in offline mode, so assume
+ # the "name" and "size" columns do not exist.
+ if context.is_offline_mode():
+ # Log a warning message to inform the user that the database schema cannot be inspected
+ # in offline mode, and the generated SQL may not accurately reflect the actual execution.
+ op.execute(
+ "-- Executing in offline mode, assuming the name and size columns do not exist.\n"
+ "-- The generated SQL may differ from what will actually be executed.\n"
+ "-- Please review the migration script carefully!"
+ )
+
+ return False
+ # Use SQLAlchemy inspector to get the columns of the 'tool_files' table
+ inspector = sa.inspect(conn)
+ columns = [col["name"] for col in inspector.get_columns("tool_files")]
+
+ # If 'name' or 'size' columns already exist, exit the upgrade function
+ if "name" in columns or "size" in columns:
+ return True
+ return False
+
# ### commands auto generated by Alembic - please adjust! ###
# Get the database connection
conn = op.get_bind()
- # Use SQLAlchemy inspector to get the columns of the 'tool_files' table
- inspector = sa.inspect(conn)
- columns = [col['name'] for col in inspector.get_columns('tool_files')]
-
- # If 'name' or 'size' columns already exist, exit the upgrade function
- if 'name' in columns or 'size' in columns:
+ if _has_name_or_size_column():
return
- with op.batch_alter_table('tool_files', schema=None) as batch_op:
- batch_op.add_column(sa.Column('name', sa.String(), nullable=True))
- batch_op.add_column(sa.Column('size', sa.Integer(), nullable=True))
+ with op.batch_alter_table("tool_files", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("name", sa.String(), nullable=True))
+ batch_op.add_column(sa.Column("size", sa.Integer(), nullable=True))
op.execute("UPDATE tool_files SET name = '' WHERE name IS NULL")
op.execute("UPDATE tool_files SET size = -1 WHERE size IS NULL")
- with op.batch_alter_table('tool_files', schema=None) as batch_op:
- batch_op.alter_column('name', existing_type=sa.String(), nullable=False)
- batch_op.alter_column('size', existing_type=sa.Integer(), nullable=False)
+ with op.batch_alter_table("tool_files", schema=None) as batch_op:
+ batch_op.alter_column("name", existing_type=sa.String(), nullable=False)
+ batch_op.alter_column("size", existing_type=sa.Integer(), nullable=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
- with op.batch_alter_table('tool_files', schema=None) as batch_op:
- batch_op.drop_column('size')
- batch_op.drop_column('name')
+ with op.batch_alter_table("tool_files", schema=None) as batch_op:
+ batch_op.drop_column("size")
+ batch_op.drop_column("name")
# ### end Alembic commands ###
diff --git a/api/migrations/versions/2024_12_23_1154-d7999dfa4aae_remove_workflow_node_executions_retry_.py b/api/migrations/versions/2024_12_23_1154-d7999dfa4aae_remove_workflow_node_executions_retry_.py
index 07454b0917..adf6421e57 100644
--- a/api/migrations/versions/2024_12_23_1154-d7999dfa4aae_remove_workflow_node_executions_retry_.py
+++ b/api/migrations/versions/2024_12_23_1154-d7999dfa4aae_remove_workflow_node_executions_retry_.py
@@ -5,28 +5,38 @@ Revises: e1944c35e15e
Create Date: 2024-12-23 11:54:15.344543
"""
-from alembic import op
-import models as models
-import sqlalchemy as sa
+
+from alembic import op, context
from sqlalchemy import inspect
# revision identifiers, used by Alembic.
-revision = 'd7999dfa4aae'
-down_revision = 'e1944c35e15e'
+revision = "d7999dfa4aae"
+down_revision = "e1944c35e15e"
branch_labels = None
depends_on = None
def upgrade():
- # Check if column exists before attempting to remove it
- conn = op.get_bind()
- inspector = inspect(conn)
- has_column = 'retry_index' in [col['name'] for col in inspector.get_columns('workflow_node_executions')]
+ def _has_retry_index_column() -> bool:
+ if context.is_offline_mode():
+ # Log a warning message to inform the user that the database schema cannot be inspected
+ # in offline mode, and the generated SQL may not accurately reflect the actual execution.
+ op.execute(
+ '-- Executing in offline mode: assuming the "retry_index" column does not exist.\n'
+ "-- The generated SQL may differ from what will actually be executed.\n"
+ "-- Please review the migration script carefully!"
+ )
+ return False
+ conn = op.get_bind()
+ inspector = inspect(conn)
+ return "retry_index" in [col["name"] for col in inspector.get_columns("workflow_node_executions")]
+
+ has_column = _has_retry_index_column()
if has_column:
- with op.batch_alter_table('workflow_node_executions', schema=None) as batch_op:
- batch_op.drop_column('retry_index')
+ with op.batch_alter_table("workflow_node_executions", schema=None) as batch_op:
+ batch_op.drop_column("retry_index")
def downgrade():
diff --git a/api/models/tools.py b/api/models/tools.py
index 05604b9330..e027475e38 100644
--- a/api/models/tools.py
+++ b/api/models/tools.py
@@ -1,6 +1,6 @@
import json
from datetime import datetime
-from typing import Any, Optional, cast
+from typing import Any, cast
import sqlalchemy as sa
from deprecated import deprecated
@@ -304,8 +304,11 @@ class DeprecatedPublishedAppTool(Base):
db.UniqueConstraint("app_id", "user_id", name="unique_published_app_tool"),
)
+ id = db.Column(StringUUID, server_default=db.text("uuid_generate_v4()"))
# id of the app
app_id = db.Column(StringUUID, ForeignKey("apps.id"), nullable=False)
+
+ user_id: Mapped[str] = db.Column(StringUUID, nullable=False)
# who published this tool
description = db.Column(db.Text, nullable=False)
# llm_description of the tool, for LLM
@@ -325,34 +328,3 @@ class DeprecatedPublishedAppTool(Base):
@property
def description_i18n(self) -> I18nObject:
return I18nObject(**json.loads(self.description))
-
- id = db.Column(StringUUID, server_default=db.text("uuid_generate_v4()"))
- user_id: Mapped[str] = db.Column(StringUUID, nullable=False)
- tenant_id: Mapped[str] = db.Column(StringUUID, nullable=False)
- conversation_id: Mapped[Optional[str]] = db.Column(StringUUID, nullable=True)
- file_key: Mapped[str] = db.Column(db.String(255), nullable=False)
- mimetype: Mapped[str] = db.Column(db.String(255), nullable=False)
- original_url: Mapped[Optional[str]] = db.Column(db.String(2048), nullable=True)
- name: Mapped[str] = mapped_column(default="")
- size: Mapped[int] = mapped_column(default=-1)
-
- def __init__(
- self,
- *,
- user_id: str,
- tenant_id: str,
- conversation_id: Optional[str] = None,
- file_key: str,
- mimetype: str,
- original_url: Optional[str] = None,
- name: str,
- size: int,
- ):
- self.user_id = user_id
- self.tenant_id = tenant_id
- self.conversation_id = conversation_id
- self.file_key = file_key
- self.mimetype = mimetype
- self.original_url = original_url
- self.name = name
- self.size = size
diff --git a/api/services/workflow_run_service.py b/api/services/workflow_run_service.py
index f7c4f500a8..6d5b737962 100644
--- a/api/services/workflow_run_service.py
+++ b/api/services/workflow_run_service.py
@@ -2,7 +2,7 @@ import threading
from typing import Optional
import contexts
-from core.workflow.repository import RepositoryFactory
+from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.workflow.repository.workflow_node_execution_repository import OrderConfig
from extensions.ext_database import db
from libs.infinite_scroll_pagination import InfiniteScrollPagination
@@ -129,12 +129,8 @@ class WorkflowRunService:
return []
# Use the repository to get the node executions
- repository = RepositoryFactory.create_workflow_node_execution_repository(
- params={
- "tenant_id": app_model.tenant_id,
- "app_id": app_model.id,
- "session_factory": db.session.get_bind(),
- }
+ repository = SQLAlchemyWorkflowNodeExecutionRepository(
+ session_factory=db.engine, tenant_id=app_model.tenant_id, app_id=app_model.id
)
# Use the repository to get the node executions with ordering
diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py
index ebe65e5d5f..331dba8bf1 100644
--- a/api/services/workflow_service.py
+++ b/api/services/workflow_service.py
@@ -11,6 +11,7 @@ from sqlalchemy.orm import Session
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
from core.model_runtime.utils.encoders import jsonable_encoder
+from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.variables import Variable
from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.errors import WorkflowNodeRunFailedError
@@ -21,7 +22,6 @@ from core.workflow.nodes.enums import ErrorStrategy
from core.workflow.nodes.event import RunCompletedEvent
from core.workflow.nodes.event.types import NodeEvent
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
-from core.workflow.repository import RepositoryFactory
from core.workflow.workflow_entry import WorkflowEntry
from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
from extensions.ext_database import db
@@ -285,12 +285,8 @@ class WorkflowService:
workflow_node_execution.workflow_id = draft_workflow.id
# Use the repository to save the workflow node execution
- repository = RepositoryFactory.create_workflow_node_execution_repository(
- params={
- "tenant_id": app_model.tenant_id,
- "app_id": app_model.id,
- "session_factory": db.session.get_bind(),
- }
+ repository = SQLAlchemyWorkflowNodeExecutionRepository(
+ session_factory=db.engine, tenant_id=app_model.tenant_id, app_id=app_model.id
)
repository.save(workflow_node_execution)
diff --git a/api/tasks/remove_app_and_related_data_task.py b/api/tasks/remove_app_and_related_data_task.py
index dedf1c5334..d5a783396a 100644
--- a/api/tasks/remove_app_and_related_data_task.py
+++ b/api/tasks/remove_app_and_related_data_task.py
@@ -7,7 +7,7 @@ from celery import shared_task # type: ignore
from sqlalchemy import delete
from sqlalchemy.exc import SQLAlchemyError
-from core.workflow.repository import RepositoryFactory
+from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from extensions.ext_database import db
from models.dataset import AppDatasetJoin
from models.model import (
@@ -189,12 +189,8 @@ def _delete_app_workflow_runs(tenant_id: str, app_id: str):
def _delete_app_workflow_node_executions(tenant_id: str, app_id: str):
# Create a repository instance for WorkflowNodeExecution
- repository = RepositoryFactory.create_workflow_node_execution_repository(
- params={
- "tenant_id": tenant_id,
- "app_id": app_id,
- "session_factory": db.session.get_bind(),
- }
+ repository = SQLAlchemyWorkflowNodeExecutionRepository(
+ session_factory=db.engine, tenant_id=tenant_id, app_id=app_id
)
# Use the clear method to delete all records for this tenant_id and app_id
diff --git a/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py
new file mode 100644
index 0000000000..6b00b203c4
--- /dev/null
+++ b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py
@@ -0,0 +1,348 @@
+import json
+import time
+from datetime import UTC, datetime
+from unittest.mock import MagicMock, patch
+
+import pytest
+from sqlalchemy.orm import Session
+
+from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom
+from core.app.entities.queue_entities import (
+ QueueNodeFailedEvent,
+ QueueNodeStartedEvent,
+ QueueNodeSucceededEvent,
+)
+from core.workflow.enums import SystemVariableKey
+from core.workflow.nodes import NodeType
+from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
+from core.workflow.workflow_cycle_manager import WorkflowCycleManager
+from models.enums import CreatedByRole
+from models.workflow import (
+ Workflow,
+ WorkflowNodeExecution,
+ WorkflowNodeExecutionStatus,
+ WorkflowRun,
+ WorkflowRunStatus,
+)
+
+
+@pytest.fixture
+def mock_app_generate_entity():
+ entity = MagicMock(spec=AdvancedChatAppGenerateEntity)
+ entity.inputs = {"query": "test query"}
+ entity.invoke_from = InvokeFrom.WEB_APP
+ # Create app_config as a separate mock
+ app_config = MagicMock()
+ app_config.tenant_id = "test-tenant-id"
+ app_config.app_id = "test-app-id"
+ entity.app_config = app_config
+ return entity
+
+
+@pytest.fixture
+def mock_workflow_system_variables():
+ return {
+ SystemVariableKey.QUERY: "test query",
+ SystemVariableKey.CONVERSATION_ID: "test-conversation-id",
+ SystemVariableKey.USER_ID: "test-user-id",
+ SystemVariableKey.APP_ID: "test-app-id",
+ SystemVariableKey.WORKFLOW_ID: "test-workflow-id",
+ SystemVariableKey.WORKFLOW_RUN_ID: "test-workflow-run-id",
+ }
+
+
+@pytest.fixture
+def mock_node_execution_repository():
+ repo = MagicMock(spec=WorkflowNodeExecutionRepository)
+ repo.get_by_node_execution_id.return_value = None
+ repo.get_running_executions.return_value = []
+ return repo
+
+
+@pytest.fixture
+def workflow_cycle_manager(mock_app_generate_entity, mock_workflow_system_variables, mock_node_execution_repository):
+ return WorkflowCycleManager(
+ application_generate_entity=mock_app_generate_entity,
+ workflow_system_variables=mock_workflow_system_variables,
+ workflow_node_execution_repository=mock_node_execution_repository,
+ )
+
+
+@pytest.fixture
+def mock_session():
+ session = MagicMock(spec=Session)
+ return session
+
+
+@pytest.fixture
+def mock_workflow():
+ workflow = MagicMock(spec=Workflow)
+ workflow.id = "test-workflow-id"
+ workflow.tenant_id = "test-tenant-id"
+ workflow.app_id = "test-app-id"
+ workflow.type = "chat"
+ workflow.version = "1.0"
+ workflow.graph = json.dumps({"nodes": [], "edges": []})
+ return workflow
+
+
+@pytest.fixture
+def mock_workflow_run():
+ workflow_run = MagicMock(spec=WorkflowRun)
+ workflow_run.id = "test-workflow-run-id"
+ workflow_run.tenant_id = "test-tenant-id"
+ workflow_run.app_id = "test-app-id"
+ workflow_run.workflow_id = "test-workflow-id"
+ workflow_run.status = WorkflowRunStatus.RUNNING
+ workflow_run.created_by_role = CreatedByRole.ACCOUNT
+ workflow_run.created_by = "test-user-id"
+ workflow_run.created_at = datetime.now(UTC).replace(tzinfo=None)
+ workflow_run.inputs_dict = {"query": "test query"}
+ workflow_run.outputs_dict = {"answer": "test answer"}
+ return workflow_run
+
+
+def test_init(
+ workflow_cycle_manager, mock_app_generate_entity, mock_workflow_system_variables, mock_node_execution_repository
+):
+ """Test initialization of WorkflowCycleManager"""
+ assert workflow_cycle_manager._workflow_run is None
+ assert workflow_cycle_manager._workflow_node_executions == {}
+ assert workflow_cycle_manager._application_generate_entity == mock_app_generate_entity
+ assert workflow_cycle_manager._workflow_system_variables == mock_workflow_system_variables
+ assert workflow_cycle_manager._workflow_node_execution_repository == mock_node_execution_repository
+
+
+def test_handle_workflow_run_start(workflow_cycle_manager, mock_session, mock_workflow):
+ """Test _handle_workflow_run_start method"""
+ # Mock session.scalar to return the workflow and max sequence
+ mock_session.scalar.side_effect = [mock_workflow, 5]
+
+ # Call the method
+ workflow_run = workflow_cycle_manager._handle_workflow_run_start(
+ session=mock_session,
+ workflow_id="test-workflow-id",
+ user_id="test-user-id",
+ created_by_role=CreatedByRole.ACCOUNT,
+ )
+
+ # Verify the result
+ assert workflow_run.tenant_id == mock_workflow.tenant_id
+ assert workflow_run.app_id == mock_workflow.app_id
+ assert workflow_run.workflow_id == mock_workflow.id
+ assert workflow_run.sequence_number == 6 # max_sequence + 1
+ assert workflow_run.status == WorkflowRunStatus.RUNNING
+ assert workflow_run.created_by_role == CreatedByRole.ACCOUNT
+ assert workflow_run.created_by == "test-user-id"
+
+ # Verify session.add was called
+ mock_session.add.assert_called_once_with(workflow_run)
+
+
+def test_handle_workflow_run_success(workflow_cycle_manager, mock_session, mock_workflow_run):
+ """Test _handle_workflow_run_success method"""
+ # Mock _get_workflow_run to return the mock_workflow_run
+ with patch.object(workflow_cycle_manager, "_get_workflow_run", return_value=mock_workflow_run):
+ # Call the method
+ result = workflow_cycle_manager._handle_workflow_run_success(
+ session=mock_session,
+ workflow_run_id="test-workflow-run-id",
+ start_at=time.perf_counter() - 10, # 10 seconds ago
+ total_tokens=100,
+ total_steps=5,
+ outputs={"answer": "test answer"},
+ )
+
+ # Verify the result
+ assert result == mock_workflow_run
+ assert result.status == WorkflowRunStatus.SUCCEEDED
+ assert result.outputs == json.dumps({"answer": "test answer"})
+ assert result.total_tokens == 100
+ assert result.total_steps == 5
+ assert result.finished_at is not None
+
+
+def test_handle_workflow_run_failed(workflow_cycle_manager, mock_session, mock_workflow_run):
+ """Test _handle_workflow_run_failed method"""
+ # Mock _get_workflow_run to return the mock_workflow_run
+ with patch.object(workflow_cycle_manager, "_get_workflow_run", return_value=mock_workflow_run):
+ # Mock get_running_executions to return an empty list
+ workflow_cycle_manager._workflow_node_execution_repository.get_running_executions.return_value = []
+
+ # Call the method
+ result = workflow_cycle_manager._handle_workflow_run_failed(
+ session=mock_session,
+ workflow_run_id="test-workflow-run-id",
+ start_at=time.perf_counter() - 10, # 10 seconds ago
+ total_tokens=50,
+ total_steps=3,
+ status=WorkflowRunStatus.FAILED,
+ error="Test error message",
+ )
+
+ # Verify the result
+ assert result == mock_workflow_run
+ assert result.status == WorkflowRunStatus.FAILED.value
+ assert result.error == "Test error message"
+ assert result.total_tokens == 50
+ assert result.total_steps == 3
+ assert result.finished_at is not None
+
+
+def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_run):
+ """Test _handle_node_execution_start method"""
+ # Create a mock event
+ event = MagicMock(spec=QueueNodeStartedEvent)
+ event.node_execution_id = "test-node-execution-id"
+ event.node_id = "test-node-id"
+ event.node_type = NodeType.LLM
+
+ # Create node_data as a separate mock
+ node_data = MagicMock()
+ node_data.title = "Test Node"
+ event.node_data = node_data
+
+ event.predecessor_node_id = "test-predecessor-node-id"
+ event.node_run_index = 1
+ event.parallel_mode_run_id = "test-parallel-mode-run-id"
+ event.in_iteration_id = "test-iteration-id"
+ event.in_loop_id = "test-loop-id"
+
+ # Call the method
+ result = workflow_cycle_manager._handle_node_execution_start(
+ workflow_run=mock_workflow_run,
+ event=event,
+ )
+
+ # Verify the result
+ assert result.tenant_id == mock_workflow_run.tenant_id
+ assert result.app_id == mock_workflow_run.app_id
+ assert result.workflow_id == mock_workflow_run.workflow_id
+ assert result.workflow_run_id == mock_workflow_run.id
+ assert result.node_execution_id == event.node_execution_id
+ assert result.node_id == event.node_id
+ assert result.node_type == event.node_type.value
+ assert result.title == event.node_data.title
+ assert result.status == WorkflowNodeExecutionStatus.RUNNING.value
+ assert result.created_by_role == mock_workflow_run.created_by_role
+ assert result.created_by == mock_workflow_run.created_by
+
+ # Verify save was called
+ workflow_cycle_manager._workflow_node_execution_repository.save.assert_called_once_with(result)
+
+ # Verify the node execution was added to the cache
+ assert workflow_cycle_manager._workflow_node_executions[event.node_execution_id] == result
+
+
+def test_get_workflow_run(workflow_cycle_manager, mock_session, mock_workflow_run):
+ """Test _get_workflow_run method"""
+ # Mock session.scalar to return the workflow run
+ mock_session.scalar.return_value = mock_workflow_run
+
+ # Call the method
+ result = workflow_cycle_manager._get_workflow_run(
+ session=mock_session,
+ workflow_run_id="test-workflow-run-id",
+ )
+
+ # Verify the result
+ assert result == mock_workflow_run
+ assert workflow_cycle_manager._workflow_run == mock_workflow_run
+
+
+def test_handle_workflow_node_execution_success(workflow_cycle_manager):
+ """Test _handle_workflow_node_execution_success method"""
+ # Create a mock event
+ event = MagicMock(spec=QueueNodeSucceededEvent)
+ event.node_execution_id = "test-node-execution-id"
+ event.inputs = {"input": "test input"}
+ event.process_data = {"process": "test process"}
+ event.outputs = {"output": "test output"}
+ event.execution_metadata = {"metadata": "test metadata"}
+ event.start_at = datetime.now(UTC).replace(tzinfo=None)
+
+ # Create a mock workflow node execution
+ node_execution = MagicMock(spec=WorkflowNodeExecution)
+ node_execution.node_execution_id = "test-node-execution-id"
+
+ # Mock _get_workflow_node_execution to return the mock node execution
+ with patch.object(workflow_cycle_manager, "_get_workflow_node_execution", return_value=node_execution):
+ # Call the method
+ result = workflow_cycle_manager._handle_workflow_node_execution_success(
+ event=event,
+ )
+
+ # Verify the result
+ assert result == node_execution
+ assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED.value
+ assert result.inputs == json.dumps(event.inputs)
+ assert result.process_data == json.dumps(event.process_data)
+ assert result.outputs == json.dumps(event.outputs)
+ assert result.finished_at is not None
+ assert result.elapsed_time is not None
+
+ # Verify update was called
+ workflow_cycle_manager._workflow_node_execution_repository.update.assert_called_once_with(node_execution)
+
+
+def test_handle_workflow_run_partial_success(workflow_cycle_manager, mock_session, mock_workflow_run):
+ """Test _handle_workflow_run_partial_success method"""
+ # Mock _get_workflow_run to return the mock_workflow_run
+ with patch.object(workflow_cycle_manager, "_get_workflow_run", return_value=mock_workflow_run):
+ # Call the method
+ result = workflow_cycle_manager._handle_workflow_run_partial_success(
+ session=mock_session,
+ workflow_run_id="test-workflow-run-id",
+ start_at=time.perf_counter() - 10, # 10 seconds ago
+ total_tokens=75,
+ total_steps=4,
+ outputs={"partial_answer": "test partial answer"},
+ exceptions_count=2,
+ )
+
+ # Verify the result
+ assert result == mock_workflow_run
+ assert result.status == WorkflowRunStatus.PARTIAL_SUCCEEDED.value
+ assert result.outputs == json.dumps({"partial_answer": "test partial answer"})
+ assert result.total_tokens == 75
+ assert result.total_steps == 4
+ assert result.exceptions_count == 2
+ assert result.finished_at is not None
+
+
+def test_handle_workflow_node_execution_failed(workflow_cycle_manager):
+ """Test _handle_workflow_node_execution_failed method"""
+ # Create a mock event
+ event = MagicMock(spec=QueueNodeFailedEvent)
+ event.node_execution_id = "test-node-execution-id"
+ event.inputs = {"input": "test input"}
+ event.process_data = {"process": "test process"}
+ event.outputs = {"output": "test output"}
+ event.execution_metadata = {"metadata": "test metadata"}
+ event.start_at = datetime.now(UTC).replace(tzinfo=None)
+ event.error = "Test error message"
+
+ # Create a mock workflow node execution
+ node_execution = MagicMock(spec=WorkflowNodeExecution)
+ node_execution.node_execution_id = "test-node-execution-id"
+
+ # Mock _get_workflow_node_execution to return the mock node execution
+ with patch.object(workflow_cycle_manager, "_get_workflow_node_execution", return_value=node_execution):
+ # Call the method
+ result = workflow_cycle_manager._handle_workflow_node_execution_failed(
+ event=event,
+ )
+
+ # Verify the result
+ assert result == node_execution
+ assert result.status == WorkflowNodeExecutionStatus.FAILED.value
+ assert result.error == "Test error message"
+ assert result.inputs == json.dumps(event.inputs)
+ assert result.process_data == json.dumps(event.process_data)
+ assert result.outputs == json.dumps(event.outputs)
+ assert result.finished_at is not None
+ assert result.elapsed_time is not None
+ assert result.execution_metadata == json.dumps(event.execution_metadata)
+
+ # Verify update was called
+ workflow_cycle_manager._workflow_node_execution_repository.update.assert_called_once_with(node_execution)
diff --git a/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py b/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py
index c16b453cba..9cda873e90 100644
--- a/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py
+++ b/api/tests/unit_tests/repositories/workflow_node_execution/test_sqlalchemy_repository.py
@@ -8,7 +8,7 @@ import pytest
from pytest_mock import MockerFixture
from sqlalchemy.orm import Session, sessionmaker
-from core.repositories.workflow_node_execution.sqlalchemy_repository import SQLAlchemyWorkflowNodeExecutionRepository
+from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.workflow.repository.workflow_node_execution_repository import OrderConfig
from models.workflow import WorkflowNodeExecution
@@ -80,7 +80,7 @@ def test_get_by_node_execution_id(repository, session, mocker: MockerFixture):
"""Test get_by_node_execution_id method."""
session_obj, _ = session
# Set up mock
- mock_select = mocker.patch("core.repositories.workflow_node_execution.sqlalchemy_repository.select")
+ mock_select = mocker.patch("core.repositories.sqlalchemy_workflow_node_execution_repository.select")
mock_stmt = mocker.MagicMock()
mock_select.return_value = mock_stmt
mock_stmt.where.return_value = mock_stmt
@@ -99,7 +99,7 @@ def test_get_by_workflow_run(repository, session, mocker: MockerFixture):
"""Test get_by_workflow_run method."""
session_obj, _ = session
# Set up mock
- mock_select = mocker.patch("core.repositories.workflow_node_execution.sqlalchemy_repository.select")
+ mock_select = mocker.patch("core.repositories.sqlalchemy_workflow_node_execution_repository.select")
mock_stmt = mocker.MagicMock()
mock_select.return_value = mock_stmt
mock_stmt.where.return_value = mock_stmt
@@ -120,7 +120,7 @@ def test_get_running_executions(repository, session, mocker: MockerFixture):
"""Test get_running_executions method."""
session_obj, _ = session
# Set up mock
- mock_select = mocker.patch("core.repositories.workflow_node_execution.sqlalchemy_repository.select")
+ mock_select = mocker.patch("core.repositories.sqlalchemy_workflow_node_execution_repository.select")
mock_stmt = mocker.MagicMock()
mock_select.return_value = mock_stmt
mock_stmt.where.return_value = mock_stmt
@@ -158,7 +158,7 @@ def test_clear(repository, session, mocker: MockerFixture):
"""Test clear method."""
session_obj, _ = session
# Set up mock
- mock_delete = mocker.patch("core.repositories.workflow_node_execution.sqlalchemy_repository.delete")
+ mock_delete = mocker.patch("core.repositories.sqlalchemy_workflow_node_execution_repository.delete")
mock_stmt = mocker.MagicMock()
mock_delete.return_value = mock_stmt
mock_stmt.where.return_value = mock_stmt
diff --git a/dev/mypy-check b/dev/mypy-check
index 24cfb06824..c043faffe6 100755
--- a/dev/mypy-check
+++ b/dev/mypy-check
@@ -2,6 +2,9 @@
set -x
+SCRIPT_DIR="$(dirname "$(realpath "$0")")"
+cd "$SCRIPT_DIR/.."
+
# run mypy checks
uv run --directory api --dev --with pip \
python -m mypy --install-types --non-interactive --cache-fine-grained --sqlite-cache .
diff --git a/dev/pytest/pytest_all_tests.sh b/dev/pytest/pytest_all_tests.sh
index f0c8a78548..30898b4fcf 100755
--- a/dev/pytest/pytest_all_tests.sh
+++ b/dev/pytest/pytest_all_tests.sh
@@ -1,6 +1,9 @@
#!/bin/bash
set -x
+SCRIPT_DIR="$(dirname "$(realpath "$0")")"
+cd "$SCRIPT_DIR/../.."
+
# ModelRuntime
dev/pytest/pytest_model_runtime.sh
diff --git a/dev/pytest/pytest_artifacts.sh b/dev/pytest/pytest_artifacts.sh
index d52acb2273..3086ef5cc4 100755
--- a/dev/pytest/pytest_artifacts.sh
+++ b/dev/pytest/pytest_artifacts.sh
@@ -1,4 +1,7 @@
#!/bin/bash
set -x
+SCRIPT_DIR="$(dirname "$(realpath "$0")")"
+cd "$SCRIPT_DIR/../.."
+
pytest api/tests/artifact_tests/
diff --git a/dev/pytest/pytest_model_runtime.sh b/dev/pytest/pytest_model_runtime.sh
index dc6c6ac627..2cbbbbfd81 100755
--- a/dev/pytest/pytest_model_runtime.sh
+++ b/dev/pytest/pytest_model_runtime.sh
@@ -1,6 +1,9 @@
#!/bin/bash
set -x
+SCRIPT_DIR="$(dirname "$(realpath "$0")")"
+cd "$SCRIPT_DIR/../.."
+
pytest api/tests/integration_tests/model_runtime/anthropic \
api/tests/integration_tests/model_runtime/azure_openai \
api/tests/integration_tests/model_runtime/openai api/tests/integration_tests/model_runtime/chatglm \
diff --git a/dev/pytest/pytest_tools.sh b/dev/pytest/pytest_tools.sh
index 0b1a8c9877..d10934626f 100755
--- a/dev/pytest/pytest_tools.sh
+++ b/dev/pytest/pytest_tools.sh
@@ -1,4 +1,7 @@
#!/bin/bash
set -x
+SCRIPT_DIR="$(dirname "$(realpath "$0")")"
+cd "$SCRIPT_DIR/../.."
+
pytest api/tests/integration_tests/tools
diff --git a/dev/pytest/pytest_unit_tests.sh b/dev/pytest/pytest_unit_tests.sh
index 2075596b7f..1a1819ca28 100755
--- a/dev/pytest/pytest_unit_tests.sh
+++ b/dev/pytest/pytest_unit_tests.sh
@@ -1,5 +1,8 @@
#!/bin/bash
set -x
+SCRIPT_DIR="$(dirname "$(realpath "$0")")"
+cd "$SCRIPT_DIR/../.."
+
# libs
pytest api/tests/unit_tests
diff --git a/dev/pytest/pytest_vdb.sh b/dev/pytest/pytest_vdb.sh
index dd03ca3514..7f617a9c05 100755
--- a/dev/pytest/pytest_vdb.sh
+++ b/dev/pytest/pytest_vdb.sh
@@ -1,6 +1,9 @@
#!/bin/bash
set -x
+SCRIPT_DIR="$(dirname "$(realpath "$0")")"
+cd "$SCRIPT_DIR/../.."
+
pytest api/tests/integration_tests/vdb/chroma \
api/tests/integration_tests/vdb/milvus \
api/tests/integration_tests/vdb/pgvecto_rs \
diff --git a/dev/pytest/pytest_workflow.sh b/dev/pytest/pytest_workflow.sh
index db8fdb2fb9..b63d49069f 100755
--- a/dev/pytest/pytest_workflow.sh
+++ b/dev/pytest/pytest_workflow.sh
@@ -1,4 +1,7 @@
#!/bin/bash
set -x
+SCRIPT_DIR="$(dirname "$(realpath "$0")")"
+cd "$SCRIPT_DIR/../.."
+
pytest api/tests/integration_tests/workflow
diff --git a/dev/reformat b/dev/reformat
index 53d7703fce..71cb6abb1e 100755
--- a/dev/reformat
+++ b/dev/reformat
@@ -2,6 +2,9 @@
set -x
+SCRIPT_DIR="$(dirname "$(realpath "$0")")"
+cd "$SCRIPT_DIR/.."
+
# run ruff linter
uv run --directory api --dev ruff check --fix ./
diff --git a/dev/sync-uv b/dev/sync-uv
index 7bc3bb22be..67a8133f5a 100755
--- a/dev/sync-uv
+++ b/dev/sync-uv
@@ -6,5 +6,8 @@ if ! command -v uv &> /dev/null; then
pip install uv
fi
+SCRIPT_DIR="$(dirname "$(realpath "$0")")"
+cd "$SCRIPT_DIR/.."
+
# check uv.lock in sync with pyproject.toml
uv lock --project api
diff --git a/docker/docker-compose-template.yaml b/docker/docker-compose-template.yaml
index bfbfe6c19a..487d358b24 100644
--- a/docker/docker-compose-template.yaml
+++ b/docker/docker-compose-template.yaml
@@ -188,7 +188,7 @@ services:
# ssrf_proxy server
# for more information, please refer to
- # https://docs.dify.ai/learn-more/faq/install-faq#id-18.-why-is-ssrf_proxy-needed
+ # https://docs.dify.ai/learn-more/faq/install-faq#18-why-is-ssrf-proxy-needed%3F
ssrf_proxy:
image: ubuntu/squid:latest
restart: always
diff --git a/docker/docker-compose.middleware.yaml b/docker/docker-compose.middleware.yaml
index 01c7573a95..498390b708 100644
--- a/docker/docker-compose.middleware.yaml
+++ b/docker/docker-compose.middleware.yaml
@@ -123,7 +123,7 @@ services:
# ssrf_proxy server
# for more information, please refer to
- # https://docs.dify.ai/learn-more/faq/install-faq#id-18.-why-is-ssrf_proxy-needed
+ # https://docs.dify.ai/learn-more/faq/install-faq#18-why-is-ssrf-proxy-needed%3F
ssrf_proxy:
image: ubuntu/squid:latest
restart: always
diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml
index 3ed0f60e96..aee36b3986 100644
--- a/docker/docker-compose.yaml
+++ b/docker/docker-compose.yaml
@@ -677,7 +677,7 @@ services:
# ssrf_proxy server
# for more information, please refer to
- # https://docs.dify.ai/learn-more/faq/install-faq#id-18.-why-is-ssrf_proxy-needed
+ # https://docs.dify.ai/learn-more/faq/install-faq#18-why-is-ssrf-proxy-needed%3F
ssrf_proxy:
image: ubuntu/squid:latest
restart: always
diff --git a/web/app/(commonLayout)/datasets/(datasetDetailLayout)/[datasetId]/layout-main.tsx b/web/app/(commonLayout)/datasets/(datasetDetailLayout)/[datasetId]/layout-main.tsx
index 078c7ebd8c..5619b1e445 100644
--- a/web/app/(commonLayout)/datasets/(datasetDetailLayout)/[datasetId]/layout-main.tsx
+++ b/web/app/(commonLayout)/datasets/(datasetDetailLayout)/[datasetId]/layout-main.tsx
@@ -98,7 +98,7 @@ const ExtraInfo = ({ isMobile, relatedApps, expand }: IExtraInfoProps) => {
className='mt-2 inline-flex cursor-pointer items-center text-xs text-text-accent'
href={
locale === LanguagesSupported[1]
- ? 'https://docs.dify.ai/v/zh-hans/guides/knowledge-base/integrate-knowledge-within-application'
+ ? 'https://docs.dify.ai/zh-hans/guides/knowledge-base/integrate-knowledge-within-application'
: 'https://docs.dify.ai/guides/knowledge-base/integrate-knowledge-within-application'
}
target='_blank' rel='noopener noreferrer'
diff --git a/web/app/components/app/configuration/config-prompt/conversation-history/history-panel.tsx b/web/app/components/app/configuration/config-prompt/conversation-history/history-panel.tsx
index 78e8d8f506..592c95261c 100644
--- a/web/app/components/app/configuration/config-prompt/conversation-history/history-panel.tsx
+++ b/web/app/components/app/configuration/config-prompt/conversation-history/history-panel.tsx
@@ -46,8 +46,8 @@ const HistoryPanel: FC