From bd678f9ca17f8d02312ad52bda83560bc717d127 Mon Sep 17 00:00:00 2001 From: Jyong <76649700+JohnJyong@users.noreply.github.com> Date: Wed, 16 Oct 2024 22:24:50 +0800 Subject: [PATCH] add clean 7 days datasets (#9424) --- api/schedule/clean_unused_datasets_task.py | 82 ++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/api/schedule/clean_unused_datasets_task.py b/api/schedule/clean_unused_datasets_task.py index 3d799bfd4e..826842e52b 100644 --- a/api/schedule/clean_unused_datasets_task.py +++ b/api/schedule/clean_unused_datasets_task.py @@ -9,7 +9,9 @@ import app from configs import dify_config from core.rag.index_processor.index_processor_factory import IndexProcessorFactory from extensions.ext_database import db +from extensions.ext_redis import redis_client from models.dataset import Dataset, DatasetQuery, Document +from services.feature_service import FeatureService @app.celery.task(queue="dataset") @@ -18,6 +20,7 @@ def clean_unused_datasets_task(): clean_days = dify_config.CLEAN_DAY_SETTING start_at = time.perf_counter() thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days) + seven_days_ago = datetime.datetime.now() - datetime.timedelta(days=7) page = 1 while True: try: @@ -88,5 +91,84 @@ def clean_unused_datasets_task(): click.echo( click.style("clean dataset index error: {} {}".format(e.__class__.__name__, str(e)), fg="red") ) + page = 1 + while True: + try: + # Subquery for counting new documents + document_subquery_new = ( + db.session.query(Document.dataset_id, func.count(Document.id).label("document_count")) + .filter( + Document.indexing_status == "completed", + Document.enabled == True, + Document.archived == False, + Document.updated_at > seven_days_ago, + ) + .group_by(Document.dataset_id) + .subquery() + ) + + # Subquery for counting old documents + document_subquery_old = ( + db.session.query(Document.dataset_id, func.count(Document.id).label("document_count")) + .filter( + Document.indexing_status == "completed", + Document.enabled == True, + Document.archived == False, + Document.updated_at < seven_days_ago, + ) + .group_by(Document.dataset_id) + .subquery() + ) + + # Main query with join and filter + datasets = ( + db.session.query(Dataset) + .outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id) + .outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id) + .filter( + Dataset.created_at < seven_days_ago, + func.coalesce(document_subquery_new.c.document_count, 0) == 0, + func.coalesce(document_subquery_old.c.document_count, 0) > 0, + ) + .order_by(Dataset.created_at.desc()) + .paginate(page=page, per_page=50) + ) + + except NotFound: + break + if datasets.items is None or len(datasets.items) == 0: + break + page += 1 + for dataset in datasets: + dataset_query = ( + db.session.query(DatasetQuery) + .filter(DatasetQuery.created_at > seven_days_ago, DatasetQuery.dataset_id == dataset.id) + .all() + ) + if not dataset_query or len(dataset_query) == 0: + try: + features_cache_key = f"features:{dataset.tenant_id}" + plan = redis_client.get(features_cache_key) + if plan is None: + features = FeatureService.get_features(dataset.tenant_id) + redis_client.setex(features_cache_key, 600, features.billing.subscription.plan) + plan = features.billing.subscription.plan + if plan == "sandbox": + # remove index + index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor() + index_processor.clean(dataset, None) + + # update document + update_params = {Document.enabled: False} + + Document.query.filter_by(dataset_id=dataset.id).update(update_params) + db.session.commit() + click.echo( + click.style("Cleaned unused dataset {} from db success!".format(dataset.id), fg="green") + ) + except Exception as e: + click.echo( + click.style("clean dataset index error: {} {}".format(e.__class__.__name__, str(e)), fg="red") + ) end_at = time.perf_counter() click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green"))