From 83136ee4e8581da15085bde35c86d74ba4e36909 Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Fri, 21 Mar 2025 21:18:07 +0800 Subject: [PATCH] feat: add command to clear free plan tenant expired data --- .../clear_free_plan_tenant_expired_logs.py | 305 ++++++++++++++++++ 1 file changed, 305 insertions(+) create mode 100644 api/services/clear_free_plan_tenant_expired_logs.py diff --git a/api/services/clear_free_plan_tenant_expired_logs.py b/api/services/clear_free_plan_tenant_expired_logs.py new file mode 100644 index 0000000000..25cad92e14 --- /dev/null +++ b/api/services/clear_free_plan_tenant_expired_logs.py @@ -0,0 +1,305 @@ +import datetime +import json +import logging +import time +from concurrent.futures import ThreadPoolExecutor + +import click +from flask import Flask, current_app +from sqlalchemy.orm import Session + +from configs import dify_config +from core.model_runtime.utils.encoders import jsonable_encoder +from extensions.ext_database import db +from extensions.ext_storage import storage +from models.account import Tenant +from models.model import App, Conversation, Message +from models.workflow import WorkflowNodeExecution, WorkflowRun +from services.billing_service import BillingService + +logger = logging.getLogger(__name__) + + +class ClearFreePlanTenantExpiredLogs: + @classmethod + def process_tenant(cls, flask_app: Flask, tenant_id: str, days: int, batch: int): + with flask_app.app_context(): + apps = db.session.query(App).filter(App.tenant_id == tenant_id).all() + app_ids = [app.id for app in apps] + while True: + with Session(db.engine).no_autoflush as session: + messages = ( + session.query(Message) + .filter( + Message.app_id.in_(app_ids), + Message.created_at < datetime.datetime.now() - datetime.timedelta(days=days), + ) + .limit(batch) + .all() + ) + if len(messages) == 0: + break + + storage.save( + f"free_plan_tenant_expired_logs/" + f"{tenant_id}/messages/{datetime.datetime.now().strftime('%Y-%m-%d')}" + f"-{time.time()}.json", + json.dumps( + jsonable_encoder( + [message.to_dict() for message in messages], + ), + ).encode("utf-8"), + ) + + message_ids = [message.id for message in messages] + + # delete messages + session.query(Message).filter( + Message.id.in_(message_ids), + ).delete(synchronize_session=False) + + session.commit() + + click.echo( + click.style( + f"[{datetime.datetime.now()}] Processed {len(message_ids)} messages for tenant {tenant_id} " + ) + ) + + while True: + with Session(db.engine).no_autoflush as session: + conversations = ( + session.query(Conversation) + .filter( + Conversation.app_id.in_(app_ids), + Conversation.updated_at < datetime.datetime.now() - datetime.timedelta(days=days), + ) + .limit(batch) + .all() + ) + + if len(conversations) == 0: + break + + storage.save( + f"free_plan_tenant_expired_logs/" + f"{tenant_id}/conversations/{datetime.datetime.now().strftime('%Y-%m-%d')}" + f"-{time.time()}.json", + json.dumps( + jsonable_encoder( + [conversation.to_dict() for conversation in conversations], + ), + ).encode("utf-8"), + ) + + conversation_ids = [conversation.id for conversation in conversations] + session.query(Conversation).filter( + Conversation.id.in_(conversation_ids), + ).delete(synchronize_session=False) + session.commit() + + click.echo( + click.style( + f"[{datetime.datetime.now()}] Processed {len(conversation_ids)}" + f" conversations for tenant {tenant_id}" + ) + ) + + while True: + with Session(db.engine).no_autoflush as session: + workflow_node_executions = ( + session.query(WorkflowNodeExecution) + .filter( + WorkflowNodeExecution.tenant_id == tenant_id, + WorkflowNodeExecution.created_at < datetime.datetime.now() - datetime.timedelta(days=days), + ) + .limit(batch) + .all() + ) + + if len(workflow_node_executions) == 0: + break + + # save workflow node executions + storage.save( + f"free_plan_tenant_expired_logs/" + f"{tenant_id}/workflow_node_executions/{datetime.datetime.now().strftime('%Y-%m-%d')}" + f"-{time.time()}.json", + json.dumps( + jsonable_encoder(workflow_node_executions), + ).encode("utf-8"), + ) + + workflow_node_execution_ids = [ + workflow_node_execution.id for workflow_node_execution in workflow_node_executions + ] + + # delete workflow node executions + session.query(WorkflowNodeExecution).filter( + WorkflowNodeExecution.id.in_(workflow_node_execution_ids), + ).delete(synchronize_session=False) + session.commit() + + click.echo( + click.style( + f"[{datetime.datetime.now()}] Processed {len(workflow_node_execution_ids)}" + f" workflow node executions for tenant {tenant_id}" + ) + ) + + while True: + with Session(db.engine).no_autoflush as session: + workflow_runs = session.query(WorkflowRun).filter(WorkflowRun.tenant_id == tenant_id).all() + + if len(workflow_runs) == 0: + break + + # save workflow runs + + storage.save( + f"free_plan_tenant_expired_logs/" + f"{tenant_id}/workflow_runs/{datetime.datetime.now().strftime('%Y-%m-%d')}" + f"-{time.time()}.json", + json.dumps( + jsonable_encoder( + [workflow_run.to_dict() for workflow_run in workflow_runs], + ), + ).encode("utf-8"), + ) + + workflow_run_ids = [workflow_run.id for workflow_run in workflow_runs] + + # delete workflow runs + session.query(WorkflowRun).filter( + WorkflowRun.id.in_(workflow_run_ids), + ).delete(synchronize_session=False) + session.commit() + + @classmethod + def process(cls, days: int, batch: int, tenant_ids: list[str]): + """ + Clear free plan tenant expired logs. + """ + + click.echo(click.style("Clearing free plan tenant expired logs", fg="white")) + ended_at = datetime.datetime.now() + started_at = datetime.datetime(2023, 4, 3, 8, 59, 24) + current_time = started_at + + with Session(db.engine) as session: + total_tenant_count = session.query(Tenant.id).count() + + click.echo(click.style(f"Total tenant count: {total_tenant_count}", fg="white")) + + handled_tenant_count = 0 + + thread_pool = ThreadPoolExecutor(max_workers=10) + + def process_tenant(flask_app: Flask, tenant_id: str) -> None: + try: + if ( + not dify_config.BILLING_ENABLED + or BillingService.get_info(tenant_id)["subscription"]["plan"] == "sandbox" + ): + # only process sandbox tenant + cls.process_tenant(flask_app, tenant_id, days, batch) + except Exception: + logger.exception(f"Failed to process tenant {tenant_id}") + finally: + nonlocal handled_tenant_count + handled_tenant_count += 1 + if handled_tenant_count % 100 == 0: + click.echo( + click.style( + f"[{datetime.datetime.now()}] " + f"Processed {handled_tenant_count} tenants " + f"({(handled_tenant_count / total_tenant_count) * 100:.1f}%), " + f"{handled_tenant_count}/{total_tenant_count}", + fg="green", + ) + ) + + futures = [] + + if tenant_ids: + for tenant_id in tenant_ids: + futures.append( + thread_pool.submit( + process_tenant, + current_app._get_current_object(), # type: ignore[attr-defined] + tenant_id, + ) + ) + else: + while current_time < ended_at: + click.echo( + click.style(f"Current time: {current_time}, Started at: {datetime.datetime.now()}", fg="white") + ) + # Initial interval of 1 day, will be dynamically adjusted based on tenant count + interval = datetime.timedelta(days=1) + # Process tenants in this batch + with Session(db.engine) as session: + # Calculate tenant count in next batch with current interval + # Try different intervals until we find one with a reasonable tenant count + test_intervals = [ + datetime.timedelta(days=1), + datetime.timedelta(hours=12), + datetime.timedelta(hours=6), + datetime.timedelta(hours=3), + datetime.timedelta(hours=1), + ] + + for test_interval in test_intervals: + tenant_count = ( + session.query(Tenant.id) + .filter(Tenant.created_at.between(current_time, current_time + test_interval)) + .count() + ) + if tenant_count <= 100: + interval = test_interval + break + else: + # If all intervals have too many tenants, use minimum interval + interval = datetime.timedelta(hours=1) + + # Adjust interval to target ~100 tenants per batch + if tenant_count > 0: + # Scale interval based on ratio to target count + interval = min( + datetime.timedelta(days=1), # Max 1 day + max( + datetime.timedelta(hours=1), # Min 1 hour + interval * (100 / tenant_count), # Scale to target 100 + ), + ) + + batch_end = min(current_time + interval, ended_at) + + rs = ( + session.query(Tenant.id) + .filter(Tenant.created_at.between(current_time, batch_end)) + .order_by(Tenant.created_at) + ) + + tenants = [] + for row in rs: + tenant_id = str(row.id) + try: + tenants.append(tenant_id) + except Exception: + logger.exception(f"Failed to process tenant {tenant_id}") + continue + + futures.append( + thread_pool.submit( + process_tenant, + current_app._get_current_object(), # type: ignore[attr-defined] + tenant_id, + ) + ) + + current_time = batch_end + + # wait for all threads to finish + for future in futures: + future.result()