diff --git a/api/tasks/remove_app_and_related_data_task.py b/api/tasks/remove_app_and_related_data_task.py index ae6f923ca8..4e527bbaed 100644 --- a/api/tasks/remove_app_and_related_data_task.py +++ b/api/tasks/remove_app_and_related_data_task.py @@ -4,16 +4,12 @@ from collections.abc import Callable import click from celery import shared_task # type: ignore -from sqlalchemy import delete, select +from sqlalchemy import delete from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.orm import Session -from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from extensions.ext_database import db from models import ( - Account, ApiToken, - App, AppAnnotationHitHistory, AppAnnotationSetting, AppDatasetJoin, @@ -34,7 +30,7 @@ from models import ( ) from models.tools import WorkflowToolProvider from models.web import PinnedConversation, SavedMessage -from models.workflow import ConversationVariable, Workflow, WorkflowAppLog, WorkflowRun +from models.workflow import ConversationVariable, Workflow, WorkflowAppLog, WorkflowNodeExecution, WorkflowRun @shared_task(queue="app_deletion", bind=True, max_retries=3) @@ -191,31 +187,18 @@ def _delete_app_workflow_runs(tenant_id: str, app_id: str): def _delete_app_workflow_node_executions(tenant_id: str, app_id: str): - # Get app's owner - with Session(db.engine, expire_on_commit=False) as session: - stmt = select(Account).where(Account.id == App.created_by).where(App.id == app_id) - user = session.scalar(stmt) - - if user is None: - errmsg = ( - f"Failed to delete workflow node executions for tenant {tenant_id} and app {app_id}, app's owner not found" + def del_workflow_node_execution(workflow_node_execution_id: str): + db.session.query(WorkflowNodeExecution).filter(WorkflowNodeExecution.id == workflow_node_execution_id).delete( + synchronize_session=False ) - logging.error(errmsg) - raise ValueError(errmsg) - # Create a repository instance for WorkflowNodeExecution - repository = SQLAlchemyWorkflowNodeExecutionRepository( - session_factory=db.engine, - user=user, - app_id=app_id, - triggered_from=None, + _delete_records( + """select id from workflow_node_executions where tenant_id=:tenant_id and app_id=:app_id limit 1000""", + {"tenant_id": tenant_id, "app_id": app_id}, + del_workflow_node_execution, + "workflow node execution", ) - # Use the clear method to delete all records for this tenant_id and app_id - repository.clear() - - logging.info(click.style(f"Deleted workflow node executions for tenant {tenant_id} and app {app_id}", fg="green")) - def _delete_app_workflow_app_logs(tenant_id: str, app_id: str): def del_workflow_app_log(workflow_app_log_id: str):