From 3340775052e930777718f38284b6bde9adc4da27 Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Mon, 14 Apr 2025 11:10:44 +0800 Subject: [PATCH] r2 --- api/controllers/console/datasets/pipeline.py | 1 + api/models/dataset.py | 24 +++- .../pipeline_template/__init__.py | 0 .../pipeline_template/customized/__init__.py | 0 .../customized/customized_retrieval.py | 105 ++++++++++++++++++ .../pipeline_template/database/__init__.py | 0 .../database/database_retrieval.py | 105 ++++++++++++++++++ .../pipeline_template_base.py | 17 +++ .../pipeline_template_factory.py | 21 ++++ .../pipeline_template_type.py | 7 ++ .../pipeline_template/remote/__init__.py | 0 .../remote/remote_retrieval.py | 70 ++++++++++++ api/services/rag_pipeline/rag_pipeline.py | 34 +++++- 13 files changed, 381 insertions(+), 3 deletions(-) create mode 100644 api/services/rag_pipeline/pipeline_template/__init__.py create mode 100644 api/services/rag_pipeline/pipeline_template/customized/__init__.py create mode 100644 api/services/rag_pipeline/pipeline_template/customized/customized_retrieval.py create mode 100644 api/services/rag_pipeline/pipeline_template/database/__init__.py create mode 100644 api/services/rag_pipeline/pipeline_template/database/database_retrieval.py create mode 100644 api/services/rag_pipeline/pipeline_template/pipeline_template_base.py create mode 100644 api/services/rag_pipeline/pipeline_template/pipeline_template_factory.py create mode 100644 api/services/rag_pipeline/pipeline_template/pipeline_template_type.py create mode 100644 api/services/rag_pipeline/pipeline_template/remote/__init__.py create mode 100644 api/services/rag_pipeline/pipeline_template/remote/remote_retrieval.py diff --git a/api/controllers/console/datasets/pipeline.py b/api/controllers/console/datasets/pipeline.py index 20a3df8a1b..5826fd8ab6 100644 --- a/api/controllers/console/datasets/pipeline.py +++ b/api/controllers/console/datasets/pipeline.py @@ -35,6 +35,7 @@ class PipelineTemplateListApi(Resource): @enterprise_license_required def get(self): type = request.args.get("type", default="built-in", type=str, choices=["built-in", "customized"]) + language = request.args.get("language", default="en-US", type=str) # get pipeline templates return response, 200 diff --git a/api/models/dataset.py b/api/models/dataset.py index 1e274a31f8..fb4908f936 100644 --- a/api/models/dataset.py +++ b/api/models/dataset.py @@ -1146,7 +1146,7 @@ class PipelineBuiltInTemplate(db.Model): # type: ignore[name-defined] ) id = db.Column(StringUUID, server_default=db.text("uuid_generate_v4()")) - app_id = db.Column(StringUUID, nullable=False) + pipeline_id = db.Column(StringUUID, nullable=False) name = db.Column(db.String(255), nullable=False) description = db.Column(db.Text, nullable=False) icon = db.Column(db.JSON, nullable=False) @@ -1168,7 +1168,7 @@ class PipelineCustomizedTemplate(db.Model): # type: ignore[name-defined] id = db.Column(StringUUID, server_default=db.text("uuid_generate_v4()")) tenant_id = db.Column(StringUUID, nullable=False) - app_id = db.Column(StringUUID, nullable=False) + pipeline_id = db.Column(StringUUID, nullable=False) name = db.Column(db.String(255), nullable=False) description = db.Column(db.Text, nullable=False) icon = db.Column(db.JSON, nullable=False) @@ -1177,3 +1177,23 @@ class PipelineCustomizedTemplate(db.Model): # type: ignore[name-defined] language = db.Column(db.String(255), nullable=False) created_at = db.Column(db.DateTime, nullable=False, server_default=func.current_timestamp()) updated_at = db.Column(db.DateTime, nullable=False, server_default=func.current_timestamp()) + + +class Pipeline(db.Model): # type: ignore[name-defined] + __tablename__ = "pipelines" + __table_args__ = ( + db.PrimaryKeyConstraint("id", name="pipeline_pkey"), + ) + + id = db.Column(StringUUID, server_default=db.text("uuid_generate_v4()")) + tenant_id: Mapped[str] = db.Column(StringUUID, nullable=False) + name = db.Column(db.String(255), nullable=False) + description = db.Column(db.Text, nullable=False, server_default=db.text("''::character varying")) + mode = db.Column(db.String(255), nullable=False) + workflow_id = db.Column(StringUUID, nullable=True) + is_public = db.Column(db.Boolean, nullable=False, server_default=db.text("false")) + is_published = db.Column(db.Boolean, nullable=False, server_default=db.text("false")) + created_by = db.Column(StringUUID, nullable=True) + created_at = db.Column(db.DateTime, nullable=False, server_default=func.current_timestamp()) + updated_by = db.Column(StringUUID, nullable=True) + updated_at = db.Column(db.DateTime, nullable=False, server_default=func.current_timestamp()) diff --git a/api/services/rag_pipeline/pipeline_template/__init__.py b/api/services/rag_pipeline/pipeline_template/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/services/rag_pipeline/pipeline_template/customized/__init__.py b/api/services/rag_pipeline/pipeline_template/customized/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/services/rag_pipeline/pipeline_template/customized/customized_retrieval.py b/api/services/rag_pipeline/pipeline_template/customized/customized_retrieval.py new file mode 100644 index 0000000000..fccf09ef0a --- /dev/null +++ b/api/services/rag_pipeline/pipeline_template/customized/customized_retrieval.py @@ -0,0 +1,105 @@ +from typing import Optional + +from constants.languages import languages +from extensions.ext_database import db +from models.model import App, RecommendedApp +from services.app_dsl_service import AppDslService +from services.rag_pipeline.pipeline_template.pipeline_template_base import PipelineTemplateRetrievalBase +from services.rag_pipeline.pipeline_template.pipeline_template_type import PipelineTemplateType + + +class CustomizedPipelineTemplateRetrieval(PipelineTemplateRetrievalBase): + """ + Retrieval recommended app from database + """ + + def get_pipeline_templates(self, language: str) -> dict: + result = self.fetch_pipeline_templates_from_db(language) + return result + + def get_pipeline_template_detail(self, pipeline_id: str): + result = self.fetch_pipeline_template_detail_from_db(pipeline_id) + return result + + def get_type(self) -> str: + return PipelineTemplateType.CUSTOMIZED + + @classmethod + def fetch_recommended_apps_from_db(cls, language: str) -> dict: + """ + Fetch recommended apps from db. + :param language: language + :return: + """ + recommended_apps = ( + db.session.query(RecommendedApp) + .filter(RecommendedApp.is_listed == True, RecommendedApp.language == language) + .all() + ) + + if len(recommended_apps) == 0: + recommended_apps = ( + db.session.query(RecommendedApp) + .filter(RecommendedApp.is_listed == True, RecommendedApp.language == languages[0]) + .all() + ) + + categories = set() + recommended_apps_result = [] + for recommended_app in recommended_apps: + app = recommended_app.app + if not app or not app.is_public: + continue + + site = app.site + if not site: + continue + + recommended_app_result = { + "id": recommended_app.id, + "app": recommended_app.app, + "app_id": recommended_app.app_id, + "description": site.description, + "copyright": site.copyright, + "privacy_policy": site.privacy_policy, + "custom_disclaimer": site.custom_disclaimer, + "category": recommended_app.category, + "position": recommended_app.position, + "is_listed": recommended_app.is_listed, + } + recommended_apps_result.append(recommended_app_result) + + categories.add(recommended_app.category) + + return {"recommended_apps": recommended_apps_result, "categories": sorted(categories)} + + @classmethod + def fetch_recommended_app_detail_from_db(cls, app_id: str) -> Optional[dict]: + """ + Fetch recommended app detail from db. + :param app_id: App ID + :return: + """ + # is in public recommended list + recommended_app = ( + db.session.query(RecommendedApp) + .filter(RecommendedApp.is_listed == True, RecommendedApp.app_id == app_id) + .first() + ) + + if not recommended_app: + return None + + # get app detail + app_model = db.session.query(App).filter(App.id == app_id).first() + if not app_model or not app_model.is_public: + return None + + return { + "id": app_model.id, + "name": app_model.name, + "icon": app_model.icon, + "icon_background": app_model.icon_background, + "mode": app_model.mode, + "export_data": AppDslService.export_dsl(app_model=app_model), + } diff --git a/api/services/rag_pipeline/pipeline_template/database/__init__.py b/api/services/rag_pipeline/pipeline_template/database/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/services/rag_pipeline/pipeline_template/database/database_retrieval.py b/api/services/rag_pipeline/pipeline_template/database/database_retrieval.py new file mode 100644 index 0000000000..3158e9f91c --- /dev/null +++ b/api/services/rag_pipeline/pipeline_template/database/database_retrieval.py @@ -0,0 +1,105 @@ +from typing import Optional + +from constants.languages import languages +from extensions.ext_database import db +from models.model import App, RecommendedApp +from services.app_dsl_service import AppDslService +from services.recommend_app.recommend_app_base import RecommendAppRetrievalBase +from services.recommend_app.recommend_app_type import RecommendAppType + + +class DatabasePipelineTemplateRetrieval(RecommendAppRetrievalBase): + """ + Retrieval recommended app from database + """ + + def get_recommended_apps_and_categories(self, language: str) -> dict: + result = self.fetch_recommended_apps_from_db(language) + return result + + def get_recommend_app_detail(self, app_id: str): + result = self.fetch_recommended_app_detail_from_db(app_id) + return result + + def get_type(self) -> str: + return RecommendAppType.DATABASE + + @classmethod + def fetch_recommended_apps_from_db(cls, language: str) -> dict: + """ + Fetch recommended apps from db. + :param language: language + :return: + """ + recommended_apps = ( + db.session.query(RecommendedApp) + .filter(RecommendedApp.is_listed == True, RecommendedApp.language == language) + .all() + ) + + if len(recommended_apps) == 0: + recommended_apps = ( + db.session.query(RecommendedApp) + .filter(RecommendedApp.is_listed == True, RecommendedApp.language == languages[0]) + .all() + ) + + categories = set() + recommended_apps_result = [] + for recommended_app in recommended_apps: + app = recommended_app.app + if not app or not app.is_public: + continue + + site = app.site + if not site: + continue + + recommended_app_result = { + "id": recommended_app.id, + "app": recommended_app.app, + "app_id": recommended_app.app_id, + "description": site.description, + "copyright": site.copyright, + "privacy_policy": site.privacy_policy, + "custom_disclaimer": site.custom_disclaimer, + "category": recommended_app.category, + "position": recommended_app.position, + "is_listed": recommended_app.is_listed, + } + recommended_apps_result.append(recommended_app_result) + + categories.add(recommended_app.category) + + return {"recommended_apps": recommended_apps_result, "categories": sorted(categories)} + + @classmethod + def fetch_recommended_app_detail_from_db(cls, app_id: str) -> Optional[dict]: + """ + Fetch recommended app detail from db. + :param app_id: App ID + :return: + """ + # is in public recommended list + recommended_app = ( + db.session.query(RecommendedApp) + .filter(RecommendedApp.is_listed == True, RecommendedApp.app_id == app_id) + .first() + ) + + if not recommended_app: + return None + + # get app detail + app_model = db.session.query(App).filter(App.id == app_id).first() + if not app_model or not app_model.is_public: + return None + + return { + "id": app_model.id, + "name": app_model.name, + "icon": app_model.icon, + "icon_background": app_model.icon_background, + "mode": app_model.mode, + "export_data": AppDslService.export_dsl(app_model=app_model), + } diff --git a/api/services/rag_pipeline/pipeline_template/pipeline_template_base.py b/api/services/rag_pipeline/pipeline_template/pipeline_template_base.py new file mode 100644 index 0000000000..45860f9d74 --- /dev/null +++ b/api/services/rag_pipeline/pipeline_template/pipeline_template_base.py @@ -0,0 +1,17 @@ +from abc import ABC, abstractmethod + + +class PipelineTemplateRetrievalBase(ABC): + """Interface for pipeline template retrieval.""" + + @abstractmethod + def get_pipeline_templates(self, language: str) -> dict: + raise NotImplementedError + + @abstractmethod + def get_pipeline_template_detail(self, pipeline_id: str): + raise NotImplementedError + + @abstractmethod + def get_type(self) -> str: + raise NotImplementedError diff --git a/api/services/rag_pipeline/pipeline_template/pipeline_template_factory.py b/api/services/rag_pipeline/pipeline_template/pipeline_template_factory.py new file mode 100644 index 0000000000..0348387119 --- /dev/null +++ b/api/services/rag_pipeline/pipeline_template/pipeline_template_factory.py @@ -0,0 +1,21 @@ +from services.rag_pipeline.pipeline_template.pipeline_template_base import PipelineTemplateRetrievalBase +from services.rag_pipeline.pipeline_template.pipeline_template_type import PipelineTemplateType + + + +class RecommendAppRetrievalFactory: + @staticmethod + def get_pipeline_template_factory(mode: str) -> type[PipelineTemplateRetrievalBase]: + match mode: + case PipelineTemplateType.REMOTE: + return RemotePipelineTemplateRetrieval + case PipelineTemplateType.CUSTOMIZED: + return DatabasePipelineTemplateRetrieval + case PipelineTemplateType.BUILTIN: + return BuildInPipelineTemplateRetrieval + case _: + raise ValueError(f"invalid fetch recommended apps mode: {mode}") + + @staticmethod + def get_buildin_recommend_app_retrieval(): + return BuildInRecommendAppRetrieval diff --git a/api/services/rag_pipeline/pipeline_template/pipeline_template_type.py b/api/services/rag_pipeline/pipeline_template/pipeline_template_type.py new file mode 100644 index 0000000000..98bc109a67 --- /dev/null +++ b/api/services/rag_pipeline/pipeline_template/pipeline_template_type.py @@ -0,0 +1,7 @@ +from enum import StrEnum + + +class PipelineTemplateType(StrEnum): + REMOTE = "remote" + BUILTIN = "builtin" + CUSTOMIZED = "customized" diff --git a/api/services/rag_pipeline/pipeline_template/remote/__init__.py b/api/services/rag_pipeline/pipeline_template/remote/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/services/rag_pipeline/pipeline_template/remote/remote_retrieval.py b/api/services/rag_pipeline/pipeline_template/remote/remote_retrieval.py new file mode 100644 index 0000000000..3ba6b0e64c --- /dev/null +++ b/api/services/rag_pipeline/pipeline_template/remote/remote_retrieval.py @@ -0,0 +1,70 @@ +import logging +from typing import Optional + +import requests + +from configs import dify_config +from services.recommend_app.buildin.buildin_retrieval import BuildInRecommendAppRetrieval +from services.recommend_app.recommend_app_type import RecommendAppType +from services.rag_pipeline.pipeline_template.pipeline_template_base import PipelineTemplateRetrievalBase +logger = logging.getLogger(__name__) + + +class RemotePipelineTemplateRetrieval(PipelineTemplateRetrievalBase): + """ + Retrieval recommended app from dify official + """ + + def get_pipeline_template_detail(self, pipeline_id: str): + try: + result = self.fetch_pipeline_template_detail_from_dify_official(pipeline_id) + except Exception as e: + logger.warning(f"fetch recommended app detail from dify official failed: {e}, switch to built-in.") + result = BuildInRecommendAppRetrieval.fetch_recommended_app_detail_from_builtin(pipeline_id) + return result + + def get_recommended_apps_and_categories(self, language: str) -> dict: + try: + result = self.fetch_recommended_apps_from_dify_official(language) + except Exception as e: + logger.warning(f"fetch recommended apps from dify official failed: {e}, switch to built-in.") + result = BuildInRecommendAppRetrieval.fetch_recommended_apps_from_builtin(language) + return result + + def get_type(self) -> str: + return RecommendAppType.REMOTE + + @classmethod + def fetch_recommended_app_detail_from_dify_official(cls, app_id: str) -> Optional[dict]: + """ + Fetch recommended app detail from dify official. + :param app_id: App ID + :return: + """ + domain = dify_config.HOSTED_FETCH_APP_TEMPLATES_REMOTE_DOMAIN + url = f"{domain}/apps/{app_id}" + response = requests.get(url, timeout=(3, 10)) + if response.status_code != 200: + return None + data: dict = response.json() + return data + + @classmethod + def fetch_recommended_apps_from_dify_official(cls, language: str) -> dict: + """ + Fetch recommended apps from dify official. + :param language: language + :return: + """ + domain = dify_config.HOSTED_FETCH_APP_TEMPLATES_REMOTE_DOMAIN + url = f"{domain}/apps?language={language}" + response = requests.get(url, timeout=(3, 10)) + if response.status_code != 200: + raise ValueError(f"fetch recommended apps failed, status code: {response.status_code}") + + result: dict = response.json() + + if "categories" in result: + result["categories"] = sorted(result["categories"]) + + return result diff --git a/api/services/rag_pipeline/rag_pipeline.py b/api/services/rag_pipeline/rag_pipeline.py index c6d1769679..c215606817 100644 --- a/api/services/rag_pipeline/rag_pipeline.py +++ b/api/services/rag_pipeline/rag_pipeline.py @@ -7,7 +7,7 @@ from typing import Any, List, Literal, Union from flask_login import current_user from models.dataset import PipelineBuiltInTemplate, PipelineCustomizedTemplate # type: ignore - +from configs import dify_config class RagPipelineService: @staticmethod @@ -18,3 +18,35 @@ class RagPipelineService: return PipelineBuiltInTemplate.query.all() else: return PipelineCustomizedTemplate.query.all() + + @staticmethod + def get_pipeline_templates(cls, type: Literal["built-in", "customized"] = "built-in", language: str) -> dict: + """ + Get pipeline templates. + :param type: type + :param language: language + :return: + """ + mode = dify_config.HOSTED_FETCH_APP_TEMPLATES_MODE + retrieval_instance = RecommendAppRetrievalFactory.get_recommend_app_factory(mode)() + result = retrieval_instance.get_recommended_apps_and_categories(language) + if not result.get("recommended_apps") and language != "en-US": + result = ( + RecommendAppRetrievalFactory.get_buildin_recommend_app_retrieval().fetch_recommended_apps_from_builtin( + "en-US" + ) + ) + + return result + + @classmethod + def get_recommend_app_detail(cls, app_id: str) -> Optional[dict]: + """ + Get recommend app detail. + :param app_id: app id + :return: + """ + mode = dify_config.HOSTED_FETCH_APP_TEMPLATES_MODE + retrieval_instance = RecommendAppRetrievalFactory.get_recommend_app_factory(mode)() + result: dict = retrieval_instance.get_recommend_app_detail(app_id) + return result