dify/api/services/app_dsl_service.py
2024-11-22 15:05:04 +08:00

486 lines
17 KiB
Python

import logging
import uuid
from enum import Enum
from typing import Optional
from uuid import uuid4
import yaml
from packaging import version
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.helper import ssrf_proxy
from events.app_event import app_model_config_was_updated, app_was_created
from extensions.ext_redis import redis_client
from factories import variable_factory
from models import Account, App, AppMode
from models.model import AppModelConfig
from services.workflow_service import WorkflowService
logger = logging.getLogger(__name__)
IMPORT_INFO_REDIS_KEY_PREFIX = "app_import_info:"
IMPORT_INFO_REDIS_EXPIRY = 180 # 3 minutes
CURRENT_DSL_VERSION = "0.2.0"
class ImportMode(str, Enum):
YAML_CONTENT = "yaml-content"
YAML_URL = "yaml-url"
class ImportStatus(str, Enum):
COMPLETED = "completed"
COMPLETED_WITH_WARNINGS = "completed-with-warnings"
PENDING = "pending"
FAILED = "failed"
class Import(BaseModel):
id: str
status: ImportStatus
app_id: Optional[str] = None
current_dsl_version: str = CURRENT_DSL_VERSION
imported_dsl_version: str = ""
error: str = ""
def _check_version_compatibility(imported_version: str) -> ImportStatus:
"""Determine import status based on version comparison"""
try:
current_ver = version.parse(CURRENT_DSL_VERSION)
imported_ver = version.parse(imported_version)
except version.InvalidVersion:
return ImportStatus.FAILED
# Compare major version and minor version
if current_ver.major != imported_ver.major or current_ver.minor != imported_ver.minor:
return ImportStatus.PENDING
if current_ver.micro != imported_ver.micro:
return ImportStatus.COMPLETED_WITH_WARNINGS
return ImportStatus.COMPLETED
class PendingData(BaseModel):
import_mode: str
yaml_content: str
name: str | None
description: str | None
icon_type: str | None
icon: str | None
icon_background: str | None
app_id: str | None
class AppDslService:
def __init__(self, session: Session):
self._session = session
def import_app(
self,
*,
account: Account,
import_mode: str,
yaml_content: Optional[str] = None,
yaml_url: Optional[str] = None,
name: Optional[str] = None,
description: Optional[str] = None,
icon_type: Optional[str] = None,
icon: Optional[str] = None,
icon_background: Optional[str] = None,
app_id: Optional[str] = None,
) -> Import:
"""Import an app from YAML content or URL."""
import_id = str(uuid.uuid4())
# Validate import mode
try:
mode = ImportMode(import_mode)
except ValueError:
raise ValueError(f"Invalid import_mode: {import_mode}")
# Get YAML content
content = ""
if mode == ImportMode.YAML_URL:
if not yaml_url:
return Import(
id=import_id,
status=ImportStatus.FAILED,
error="yaml_url is required when import_mode is yaml-url",
)
try:
max_size = 10 * 1024 * 1024 # 10MB
response = ssrf_proxy.get(yaml_url.strip(), follow_redirects=True, timeout=(10, 10))
response.raise_for_status()
content = response.content
if len(content) > max_size:
return Import(
id=import_id,
status=ImportStatus.FAILED,
error="File size exceeds the limit of 10MB",
)
if not content:
return Import(
id=import_id,
status=ImportStatus.FAILED,
error="Empty content from url",
)
try:
content = content.decode("utf-8")
except UnicodeDecodeError as e:
return Import(
id=import_id,
status=ImportStatus.FAILED,
error=f"Error decoding content: {e}",
)
except Exception as e:
return Import(
id=import_id,
status=ImportStatus.FAILED,
error=f"Error fetching YAML from URL: {str(e)}",
)
elif mode == ImportMode.YAML_CONTENT:
if not yaml_content:
return Import(
id=import_id,
status=ImportStatus.FAILED,
error="yaml_content is required when import_mode is yaml-content",
)
content = yaml_content
# Process YAML content
try:
# Parse YAML to validate format
data = yaml.safe_load(content)
if not isinstance(data, dict):
return Import(
id=import_id,
status=ImportStatus.FAILED,
error="Invalid YAML format: content must be a mapping",
)
# Validate and fix DSL version
if not data.get("version"):
data["version"] = "0.1.0"
if not data.get("kind") or data.get("kind") != "app":
data["kind"] = "app"
imported_version = data.get("version", "0.1.0")
status = _check_version_compatibility(imported_version)
# Extract app data
app_data = data.get("app")
if not app_data:
return Import(
id=import_id,
status=ImportStatus.FAILED,
error="Missing app data in YAML content",
)
# If app_id is provided, check if it exists
app = None
if app_id:
stmt = select(App).where(App.id == app_id, App.tenant_id == account.current_tenant_id)
app = self._session.scalar(stmt)
if not app:
return Import(
id=import_id,
status=ImportStatus.FAILED,
error="App not found",
)
if app.mode not in [AppMode.WORKFLOW.value, AppMode.ADVANCED_CHAT.value]:
return Import(
id=import_id,
status=ImportStatus.FAILED,
error="Only workflow or advanced chat apps can be overwritten",
)
# If major version mismatch, store import info in Redis
if status == ImportStatus.PENDING:
panding_data = PendingData(
import_mode=import_mode,
yaml_content=content,
name=name,
description=description,
icon_type=icon_type,
icon=icon,
icon_background=icon_background,
app_id=app_id,
)
redis_client.setex(
f"{IMPORT_INFO_REDIS_KEY_PREFIX}{import_id}",
IMPORT_INFO_REDIS_EXPIRY,
panding_data.model_dump_json(),
)
return Import(
id=import_id,
status=status,
app_id=app_id,
imported_dsl_version=imported_version,
)
# Create or update app
app = self._create_or_update_app(
app=app,
data=data,
account=account,
name=name,
description=description,
icon_type=icon_type,
icon=icon,
icon_background=icon_background,
)
return Import(
id=import_id,
status=status,
app_id=app.id,
imported_dsl_version=imported_version,
)
except yaml.YAMLError as e:
return Import(
id=import_id,
status=ImportStatus.FAILED,
error=f"Invalid YAML format: {str(e)}",
)
except Exception as e:
logger.exception("Failed to import app")
return Import(
id=import_id,
status=ImportStatus.FAILED,
error=str(e),
)
def confirm_import(self, *, import_id: str, account: Account) -> Import:
"""
Confirm an import that requires confirmation
"""
redis_key = f"{IMPORT_INFO_REDIS_KEY_PREFIX}{import_id}"
pending_data = redis_client.get(redis_key)
if not pending_data:
return Import(
id=import_id,
status=ImportStatus.FAILED,
error="Import information expired or does not exist",
)
try:
if not isinstance(pending_data, str | bytes):
return Import(
id=import_id,
status=ImportStatus.FAILED,
error="Invalid import information",
)
pending_data = PendingData.model_validate_json(pending_data)
data = yaml.safe_load(pending_data.yaml_content)
app = None
if pending_data.app_id:
stmt = select(App).where(App.id == pending_data.app_id, App.tenant_id == account.current_tenant_id)
app = self._session.scalar(stmt)
# Create or update app
app = self._create_or_update_app(
app=app,
data=data,
account=account,
name=pending_data.name,
description=pending_data.description,
icon_type=pending_data.icon_type,
icon=pending_data.icon,
icon_background=pending_data.icon_background,
)
# Delete import info from Redis
redis_client.delete(redis_key)
return Import(
id=import_id,
status=ImportStatus.COMPLETED,
app_id=app.id,
current_dsl_version=CURRENT_DSL_VERSION,
imported_dsl_version=data.get("version", "0.1.0"),
)
except Exception as e:
logger.exception("Error confirming import")
return Import(
id=import_id,
status=ImportStatus.FAILED,
error=str(e),
)
def _create_or_update_app(
self,
*,
app: Optional[App],
data: dict,
account: Account,
name: Optional[str] = None,
description: Optional[str] = None,
icon_type: Optional[str] = None,
icon: Optional[str] = None,
icon_background: Optional[str] = None,
) -> App:
"""Create a new app or update an existing one."""
app_data = data.get("app", {})
app_mode = AppMode(app_data["mode"])
# Set icon type
icon_type_value = icon_type or app_data.get("icon_type")
if icon_type_value in ["emoji", "link"]:
icon_type = icon_type_value
else:
icon_type = "emoji"
icon = icon or str(app_data.get("icon", ""))
if app:
# Update existing app
app.name = name or app_data.get("name", app.name)
app.description = description or app_data.get("description", app.description)
app.icon_type = icon_type
app.icon = icon
app.icon_background = icon_background or app_data.get("icon_background", app.icon_background)
app.updated_by = account.id
else:
# Create new app
app = App()
app.id = str(uuid4())
app.tenant_id = account.current_tenant_id
app.mode = app_mode.value
app.name = name or app_data.get("name", "")
app.description = description or app_data.get("description", "")
app.icon_type = icon_type
app.icon = icon
app.icon_background = icon_background or app_data.get("icon_background", "#FFFFFF")
app.enable_site = True
app.enable_api = True
app.use_icon_as_answer_icon = app_data.get("use_icon_as_answer_icon", False)
app.created_by = account.id
app.updated_by = account.id
self._session.add(app)
self._session.commit()
app_was_created.send(app, account=account)
# Initialize app based on mode
if app_mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:
workflow_data = data.get("workflow")
if not workflow_data or not isinstance(workflow_data, dict):
raise ValueError("Missing workflow data for workflow/advanced chat app")
environment_variables_list = workflow_data.get("environment_variables", [])
environment_variables = [
variable_factory.build_variable_from_mapping(obj) for obj in environment_variables_list
]
conversation_variables_list = workflow_data.get("conversation_variables", [])
conversation_variables = [
variable_factory.build_variable_from_mapping(obj) for obj in conversation_variables_list
]
workflow_service = WorkflowService()
current_draft_workflow = workflow_service.get_draft_workflow(app_model=app)
if current_draft_workflow:
unique_hash = current_draft_workflow.unique_hash
else:
unique_hash = None
workflow_service.sync_draft_workflow(
app_model=app,
graph=workflow_data.get("graph", {}),
features=workflow_data.get("features", {}),
unique_hash=unique_hash,
account=account,
environment_variables=environment_variables,
conversation_variables=conversation_variables,
)
elif app_mode in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.COMPLETION}:
# Initialize model config
model_config = data.get("model_config")
if not model_config or not isinstance(model_config, dict):
raise ValueError("Missing model_config for chat/agent-chat/completion app")
# Initialize or update model config
if not app.app_model_config:
app_model_config = AppModelConfig().from_model_config_dict(model_config)
app_model_config.id = str(uuid4())
app_model_config.app_id = app.id
app_model_config.created_by = account.id
app_model_config.updated_by = account.id
app.app_model_config_id = app_model_config.id
self._session.add(app_model_config)
app_model_config_was_updated.send(app, app_model_config=app_model_config)
else:
raise ValueError("Invalid app mode")
return app
@classmethod
def export_dsl(cls, app_model: App, include_secret: bool = False) -> str:
"""
Export app
:param app_model: App instance
:return:
"""
app_mode = AppMode.value_of(app_model.mode)
export_data = {
"version": CURRENT_DSL_VERSION,
"kind": "app",
"app": {
"name": app_model.name,
"mode": app_model.mode,
"icon": "🤖" if app_model.icon_type == "image" else app_model.icon,
"icon_background": "#FFEAD5" if app_model.icon_type == "image" else app_model.icon_background,
"description": app_model.description,
"use_icon_as_answer_icon": app_model.use_icon_as_answer_icon,
},
}
if app_mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:
cls._append_workflow_export_data(
export_data=export_data, app_model=app_model, include_secret=include_secret
)
else:
cls._append_model_config_export_data(export_data, app_model)
return yaml.dump(export_data, allow_unicode=True)
@classmethod
def _append_workflow_export_data(cls, *, export_data: dict, app_model: App, include_secret: bool) -> None:
"""
Append workflow export data
:param export_data: export data
:param app_model: App instance
"""
workflow_service = WorkflowService()
workflow = workflow_service.get_draft_workflow(app_model)
if not workflow:
raise ValueError("Missing draft workflow configuration, please check.")
export_data["workflow"] = workflow.to_dict(include_secret=include_secret)
@classmethod
def _append_model_config_export_data(cls, export_data: dict, app_model: App) -> None:
"""
Append model config export data
:param export_data: export data
:param app_model: App instance
"""
app_model_config = app_model.app_model_config
if not app_model_config:
raise ValueError("Missing app configuration, please check.")
export_data["model_config"] = app_model_config.to_dict()