diff --git a/.github/workflows/api-tests.yml b/.github/workflows/api-tests.yml index aa7e68dbac..4ba91c467f 100644 --- a/.github/workflows/api-tests.yml +++ b/.github/workflows/api-tests.yml @@ -89,6 +89,5 @@ jobs: pgvecto-rs pgvector chroma - myscale - name: Test Vector Stores run: poetry run -C api bash dev/pytest/pytest_vdb.sh diff --git a/api/services/app_service.py b/api/services/app_service.py index f88e824b07..e433bb59bb 100644 --- a/api/services/app_service.py +++ b/api/services/app_service.py @@ -287,8 +287,12 @@ class AppService: """ db.session.delete(app) db.session.commit() + # Trigger asynchronous deletion of app and related data - remove_app_and_related_data_task.delay(app.id) + remove_app_and_related_data_task.delay( + tenant_id=app.tenant_id, + app_id=app.id + ) def get_app_meta(self, app_model: App) -> dict: """ diff --git a/api/tasks/remove_app_and_related_data_task.py b/api/tasks/remove_app_and_related_data_task.py index 117ce8d923..5aea8552da 100644 --- a/api/tasks/remove_app_and_related_data_task.py +++ b/api/tasks/remove_app_and_related_data_task.py @@ -3,7 +3,6 @@ import time import click from celery import shared_task -from sqlalchemy import select from sqlalchemy.exc import SQLAlchemyError from extensions.ext_database import db @@ -25,6 +24,7 @@ from models.model import ( RecommendedApp, Site, TagBinding, + TraceAppConfig, ) from models.tools import WorkflowToolProvider from models.web import PinnedConversation, SavedMessage @@ -32,122 +32,252 @@ from models.workflow import Workflow, WorkflowAppLog, WorkflowNodeExecution, Wor @shared_task(queue='app_deletion', bind=True, max_retries=3) -def remove_app_and_related_data_task(self, app_id: str): - logging.info(click.style(f'Start deleting app and related data: {app_id}', fg='green')) +def remove_app_and_related_data_task(self, tenant_id: str, app_id: str): + logging.info(click.style(f'Start deleting app and related data: {tenant_id}:{app_id}', fg='green')) start_at = time.perf_counter() try: - # Use a transaction to ensure all deletions succeed or none do - with db.session.begin_nested(): - # Delete related data - _delete_app_model_configs(app_id) - _delete_app_site(app_id) - _delete_app_api_tokens(app_id) - _delete_installed_apps(app_id) - _delete_recommended_apps(app_id) - _delete_app_annotation_data(app_id) - _delete_app_dataset_joins(app_id) - _delete_app_workflows(app_id) - _delete_app_conversations(app_id) - _delete_app_messages(app_id) - _delete_workflow_tool_providers(app_id) - _delete_app_tag_bindings(app_id) - _delete_end_users(app_id) - - # If we reach here, the transaction was successful - db.session.commit() + # Delete related data + _delete_app_model_configs(tenant_id, app_id) + _delete_app_site(tenant_id, app_id) + _delete_app_api_tokens(tenant_id, app_id) + _delete_installed_apps(tenant_id, app_id) + _delete_recommended_apps(tenant_id, app_id) + _delete_app_annotation_data(tenant_id, app_id) + _delete_app_dataset_joins(tenant_id, app_id) + _delete_app_workflows(tenant_id, app_id) + _delete_app_conversations(tenant_id, app_id) + _delete_app_messages(tenant_id, app_id) + _delete_workflow_tool_providers(tenant_id, app_id) + _delete_app_tag_bindings(tenant_id, app_id) + _delete_end_users(tenant_id, app_id) + _delete_trace_app_configs(tenant_id, app_id) end_at = time.perf_counter() logging.info(click.style(f'App and related data deleted: {app_id} latency: {end_at - start_at}', fg='green')) - except SQLAlchemyError as e: - db.session.rollback() logging.exception( click.style(f"Database error occurred while deleting app {app_id} and related data", fg='red')) raise self.retry(exc=e, countdown=60) # Retry after 60 seconds - except Exception as e: logging.exception(click.style(f"Error occurred while deleting app {app_id} and related data", fg='red')) raise self.retry(exc=e, countdown=60) # Retry after 60 seconds -def _delete_app_model_configs(app_id: str): - db.session.query(AppModelConfig).filter(AppModelConfig.app_id == app_id).delete() +def _delete_app_model_configs(tenant_id: str, app_id: str): + def del_model_config(model_config_id: str): + db.session.query(AppModelConfig).filter(AppModelConfig.id == model_config_id).delete(synchronize_session=False) + + _delete_records( + """select id from app_model_configs where app_id=:app_id limit 1000""", + {"app_id": app_id}, + del_model_config, + "app model config" + ) -def _delete_app_site(app_id: str): - db.session.query(Site).filter(Site.app_id == app_id).delete() +def _delete_app_site(tenant_id: str, app_id: str): + def del_site(site_id: str): + db.session.query(Site).filter(Site.id == site_id).delete(synchronize_session=False) + + _delete_records( + """select id from sites where app_id=:app_id limit 1000""", + {"app_id": app_id}, + del_site, + "site" + ) -def _delete_app_api_tokens(app_id: str): - db.session.query(ApiToken).filter(ApiToken.app_id == app_id).delete() +def _delete_app_api_tokens(tenant_id: str, app_id: str): + def del_api_token(api_token_id: str): + db.session.query(ApiToken).filter(ApiToken.id == api_token_id).delete(synchronize_session=False) + + _delete_records( + """select id from api_tokens where app_id=:app_id limit 1000""", + {"app_id": app_id}, + del_api_token, + "api token" + ) -def _delete_installed_apps(app_id: str): - db.session.query(InstalledApp).filter(InstalledApp.app_id == app_id).delete() +def _delete_installed_apps(tenant_id: str, app_id: str): + def del_installed_app(installed_app_id: str): + db.session.query(InstalledApp).filter(InstalledApp.id == installed_app_id).delete(synchronize_session=False) + + _delete_records( + """select id from installed_apps where tenant_id=:tenant_id and app_id=:app_id limit 1000""", + {"tenant_id": tenant_id, "app_id": app_id}, + del_installed_app, + "installed app" + ) -def _delete_recommended_apps(app_id: str): - db.session.query(RecommendedApp).filter(RecommendedApp.app_id == app_id).delete() +def _delete_recommended_apps(tenant_id: str, app_id: str): + def del_recommended_app(recommended_app_id: str): + db.session.query(RecommendedApp).filter(RecommendedApp.id == recommended_app_id).delete( + synchronize_session=False) + + _delete_records( + """select id from recommended_apps where app_id=:app_id limit 1000""", + {"app_id": app_id}, + del_recommended_app, + "recommended app" + ) -def _delete_app_annotation_data(app_id: str): - db.session.query(AppAnnotationHitHistory).filter(AppAnnotationHitHistory.app_id == app_id).delete() - db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.app_id == app_id).delete() +def _delete_app_annotation_data(tenant_id: str, app_id: str): + def del_annotation_hit_history(annotation_hit_history_id: str): + db.session.query(AppAnnotationHitHistory).filter( + AppAnnotationHitHistory.id == annotation_hit_history_id).delete(synchronize_session=False) + + _delete_records( + """select id from app_annotation_hit_histories where app_id=:app_id limit 1000""", + {"app_id": app_id}, + del_annotation_hit_history, + "annotation hit history" + ) + + def del_annotation_setting(annotation_setting_id: str): + db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.id == annotation_setting_id).delete( + synchronize_session=False) + + _delete_records( + """select id from app_annotation_settings where app_id=:app_id limit 1000""", + {"app_id": app_id}, + del_annotation_setting, + "annotation setting" + ) -def _delete_app_dataset_joins(app_id: str): - db.session.query(AppDatasetJoin).filter(AppDatasetJoin.app_id == app_id).delete() +def _delete_app_dataset_joins(tenant_id: str, app_id: str): + def del_dataset_join(dataset_join_id: str): + db.session.query(AppDatasetJoin).filter(AppDatasetJoin.id == dataset_join_id).delete(synchronize_session=False) + + _delete_records( + """select id from app_dataset_joins where app_id=:app_id limit 1000""", + {"app_id": app_id}, + del_dataset_join, + "dataset join" + ) -def _delete_app_workflows(app_id: str): - db.session.query(WorkflowRun).filter( - WorkflowRun.workflow_id.in_( - db.session.query(Workflow.id).filter(Workflow.app_id == app_id) - ) - ).delete(synchronize_session=False) - db.session.query(WorkflowNodeExecution).filter( - WorkflowNodeExecution.workflow_id.in_( - db.session.query(Workflow.id).filter(Workflow.app_id == app_id) - ) - ).delete(synchronize_session=False) - db.session.query(WorkflowAppLog).filter(WorkflowAppLog.app_id == app_id).delete(synchronize_session=False) - db.session.query(Workflow).filter(Workflow.app_id == app_id).delete(synchronize_session=False) +def _delete_app_workflows(tenant_id: str, app_id: str): + def del_workflow(workflow_id: str): + db.session.query(WorkflowRun).filter(WorkflowRun.workflow_id == workflow_id).delete(synchronize_session=False) + db.session.query(WorkflowNodeExecution).filter(WorkflowNodeExecution.workflow_id == workflow_id).delete( + synchronize_session=False) + db.session.query(WorkflowAppLog).filter(WorkflowAppLog.workflow_id == workflow_id).delete( + synchronize_session=False) + db.session.query(Workflow).filter(Workflow.id == workflow_id).delete(synchronize_session=False) + + _delete_records( + """select id from workflows where tenant_id=:tenant_id and app_id=:app_id limit 1000""", + {"tenant_id": tenant_id, "app_id": app_id}, + del_workflow, + "workflow" + ) -def _delete_app_conversations(app_id: str): - db.session.query(PinnedConversation).filter( - PinnedConversation.conversation_id.in_( - db.session.query(Conversation.id).filter(Conversation.app_id == app_id) - ) - ).delete(synchronize_session=False) - db.session.query(Conversation).filter(Conversation.app_id == app_id).delete() +def _delete_app_conversations(tenant_id: str, app_id: str): + def del_conversation(conversation_id: str): + db.session.query(PinnedConversation).filter(PinnedConversation.conversation_id == conversation_id).delete( + synchronize_session=False) + db.session.query(Conversation).filter(Conversation.id == conversation_id).delete(synchronize_session=False) + + _delete_records( + """select id from conversations where app_id=:app_id limit 1000""", + {"app_id": app_id}, + del_conversation, + "conversation" + ) -def _delete_app_messages(app_id: str): - message_ids = select(Message.id).filter(Message.app_id == app_id).scalar_subquery() - db.session.query(MessageFeedback).filter(MessageFeedback.message_id.in_(message_ids)).delete( - synchronize_session=False) - db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id.in_(message_ids)).delete( - synchronize_session=False) - db.session.query(MessageChain).filter(MessageChain.message_id.in_(message_ids)).delete(synchronize_session=False) - db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id.in_(message_ids)).delete( - synchronize_session=False) - db.session.query(MessageFile).filter(MessageFile.message_id.in_(message_ids)).delete(synchronize_session=False) - db.session.query(SavedMessage).filter(SavedMessage.message_id.in_(message_ids)).delete(synchronize_session=False) - db.session.query(Message).filter(Message.app_id == app_id).delete(synchronize_session=False) +def _delete_app_messages(tenant_id: str, app_id: str): + def del_message(message_id: str): + db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message_id).delete( + synchronize_session=False) + db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message_id).delete( + synchronize_session=False) + db.session.query(MessageChain).filter(MessageChain.message_id == message_id).delete( + synchronize_session=False) + db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message_id).delete( + synchronize_session=False) + db.session.query(MessageFile).filter(MessageFile.message_id == message_id).delete(synchronize_session=False) + db.session.query(SavedMessage).filter(SavedMessage.message_id == message_id).delete( + synchronize_session=False) + db.session.query(Message).filter(Message.id == message_id).delete() + + _delete_records( + """select id from messages where app_id=:app_id limit 1000""", + {"app_id": app_id}, + del_message, + "message" + ) -def _delete_workflow_tool_providers(app_id: str): - db.session.query(WorkflowToolProvider).filter( - WorkflowToolProvider.app_id == app_id - ).delete(synchronize_session=False) +def _delete_workflow_tool_providers(tenant_id: str, app_id: str): + def del_tool_provider(tool_provider_id: str): + db.session.query(WorkflowToolProvider).filter(WorkflowToolProvider.id == tool_provider_id).delete( + synchronize_session=False) + + _delete_records( + """select id from tool_workflow_providers where tenant_id=:tenant_id and app_id=:app_id limit 1000""", + {"tenant_id": tenant_id, "app_id": app_id}, + del_tool_provider, + "tool workflow provider" + ) -def _delete_app_tag_bindings(app_id: str): - db.session.query(TagBinding).filter( - TagBinding.target_id == app_id - ).delete(synchronize_session=False) +def _delete_app_tag_bindings(tenant_id: str, app_id: str): + def del_tag_binding(tag_binding_id: str): + db.session.query(TagBinding).filter(TagBinding.id == tag_binding_id).delete(synchronize_session=False) + + _delete_records( + """select id from tag_bindings where tenant_id=:tenant_id and target_id=:app_id limit 1000""", + {"tenant_id": tenant_id, "app_id": app_id}, + del_tag_binding, + "tag binding" + ) -def _delete_end_users(app_id: str): - db.session.query(EndUser).filter(EndUser.app_id == app_id).delete() +def _delete_end_users(tenant_id: str, app_id: str): + def del_end_user(end_user_id: str): + db.session.query(EndUser).filter(EndUser.id == end_user_id).delete(synchronize_session=False) + + _delete_records( + """select id from end_users where tenant_id=:tenant_id and app_id=:app_id limit 1000""", + {"tenant_id": tenant_id, "app_id": app_id}, + del_end_user, + "end user" + ) + + +def _delete_trace_app_configs(tenant_id: str, app_id: str): + def del_trace_app_config(trace_app_config_id: str): + db.session.query(TraceAppConfig).filter(TraceAppConfig.id == trace_app_config_id).delete( + synchronize_session=False) + + _delete_records( + """select id from trace_app_config where app_id=:app_id limit 1000""", + {"app_id": app_id}, + del_trace_app_config, + "trace app config" + ) + + +def _delete_records(query_sql: str, params: dict, delete_func: callable, name: str) -> None: + while True: + with db.engine.begin() as conn: + rs = conn.execute(db.text(query_sql), params) + if rs.rowcount == 0: + break + + for i in rs: + record_id = str(i.id) + try: + delete_func(record_id) + db.session.commit() + logging.info(click.style(f"Deleted {name} {record_id}", fg='green')) + except Exception: + logging.exception(f"Error occurred while deleting {name} {record_id}") + continue + rs.close() diff --git a/dev/pytest/pytest_vdb.sh b/dev/pytest/pytest_vdb.sh index cb8aae6740..c954c528fb 100755 --- a/dev/pytest/pytest_vdb.sh +++ b/dev/pytest/pytest_vdb.sh @@ -3,7 +3,6 @@ set -x pytest api/tests/integration_tests/vdb/chroma \ api/tests/integration_tests/vdb/milvus \ - api/tests/integration_tests/vdb/myscale \ api/tests/integration_tests/vdb/pgvecto_rs \ api/tests/integration_tests/vdb/pgvector \ api/tests/integration_tests/vdb/qdrant \