diff --git a/api/schedule/clean_unused_datasets_task.py b/api/schedule/clean_unused_datasets_task.py index 2033791ace..9cbee6e81e 100644 --- a/api/schedule/clean_unused_datasets_task.py +++ b/api/schedule/clean_unused_datasets_task.py @@ -2,6 +2,7 @@ import datetime import time import click +from sqlalchemy import func from werkzeug.exceptions import NotFound import app @@ -20,10 +21,46 @@ def clean_unused_datasets_task(): page = 1 while True: try: - datasets = db.session.query(Dataset).filter(Dataset.created_at < thirty_days_ago) \ - .order_by(Dataset.created_at.desc()).paginate(page=page, per_page=50) + # Subquery for counting new documents + document_subquery_new = db.session.query( + Document.dataset_id, + func.count(Document.id).label('document_count') + ).filter( + Document.indexing_status == 'completed', + Document.enabled == True, + Document.archived == False, + Document.updated_at > thirty_days_ago + ).group_by(Document.dataset_id).subquery() + + # Subquery for counting old documents + document_subquery_old = db.session.query( + Document.dataset_id, + func.count(Document.id).label('document_count') + ).filter( + Document.indexing_status == 'completed', + Document.enabled == True, + Document.archived == False, + Document.updated_at < thirty_days_ago + ).group_by(Document.dataset_id).subquery() + + # Main query with join and filter + datasets = (db.session.query(Dataset) + .outerjoin( + document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id + ).outerjoin( + document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id + ).filter( + Dataset.created_at < thirty_days_ago, + func.coalesce(document_subquery_new.c.document_count, 0) == 0, + func.coalesce(document_subquery_old.c.document_count, 0) > 0 + ).order_by( + Dataset.created_at.desc() + ).paginate(page=page, per_page=50)) + except NotFound: break + if datasets.items is None or len(datasets.items) == 0: + break page += 1 for dataset in datasets: dataset_query = db.session.query(DatasetQuery).filter( @@ -31,31 +68,23 @@ def clean_unused_datasets_task(): DatasetQuery.dataset_id == dataset.id ).all() if not dataset_query or len(dataset_query) == 0: - documents = db.session.query(Document).filter( - Document.dataset_id == dataset.id, - Document.indexing_status == 'completed', - Document.enabled == True, - Document.archived == False, - Document.updated_at > thirty_days_ago - ).all() - if not documents or len(documents) == 0: - try: - # remove index - index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor() - index_processor.clean(dataset, None) + try: + # remove index + index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor() + index_processor.clean(dataset, None) - # update document - update_params = { - Document.enabled: False - } + # update document + update_params = { + Document.enabled: False + } - Document.query.filter_by(dataset_id=dataset.id).update(update_params) - db.session.commit() - click.echo(click.style('Cleaned unused dataset {} from db success!'.format(dataset.id), - fg='green')) - except Exception as e: - click.echo( - click.style('clean dataset index error: {} {}'.format(e.__class__.__name__, str(e)), - fg='red')) + Document.query.filter_by(dataset_id=dataset.id).update(update_params) + db.session.commit() + click.echo(click.style('Cleaned unused dataset {} from db success!'.format(dataset.id), + fg='green')) + except Exception as e: + click.echo( + click.style('clean dataset index error: {} {}'.format(e.__class__.__name__, str(e)), + fg='red')) end_at = time.perf_counter() click.echo(click.style('Cleaned unused dataset from db success latency: {}'.format(end_at - start_at), fg='green'))