From 026f0bfce9a31cc2512998aee987be21ce059112 Mon Sep 17 00:00:00 2001 From: Jyong <76649700+JohnJyong@users.noreply.github.com> Date: Wed, 19 Jul 2023 21:30:25 +0800 Subject: [PATCH] Feat/clean vector dataset (#605) --- api/commands.py | 70 ++++++++++++++++++-- api/config.py | 4 +- api/controllers/console/datasets/datasets.py | 1 - 3 files changed, 69 insertions(+), 6 deletions(-) diff --git a/api/commands.py b/api/commands.py index 4eed5de047..67b4e3984d 100644 --- a/api/commands.py +++ b/api/commands.py @@ -2,6 +2,7 @@ import datetime import logging import random import string +import time import click from flask import current_app @@ -13,7 +14,7 @@ from libs.helper import email as email_validate from extensions.ext_database import db from libs.rsa import generate_key_pair from models.account import InvitationCode, Tenant -from models.dataset import Dataset +from models.dataset import Dataset, DatasetQuery, Document, DocumentSegment from models.model import Account import secrets import base64 @@ -172,7 +173,7 @@ def recreate_all_dataset_indexes(): page = 1 while True: try: - datasets = db.session.query(Dataset).filter(Dataset.indexing_technique == 'high_quality')\ + datasets = db.session.query(Dataset).filter(Dataset.indexing_technique == 'high_quality') \ .order_by(Dataset.created_at.desc()).paginate(page=page, per_page=50) except NotFound: break @@ -188,12 +189,70 @@ def recreate_all_dataset_indexes(): else: click.echo('passed.') except Exception as e: - click.echo(click.style('Recreate dataset index error: {} {}'.format(e.__class__.__name__, str(e)), fg='red')) + click.echo( + click.style('Recreate dataset index error: {} {}'.format(e.__class__.__name__, str(e)), fg='red')) continue click.echo(click.style('Congratulations! Recreate {} dataset indexes.'.format(recreate_count), fg='green')) +@click.command('clean-unused-dataset-indexes', help='Clean unused dataset indexes.') +def clean_unused_dataset_indexes(): + click.echo(click.style('Start clean unused dataset indexes.', fg='green')) + clean_days = int(current_app.config.get('CLEAN_DAY_SETTING')) + start_at = time.perf_counter() + thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days) + page = 1 + while True: + try: + datasets = db.session.query(Dataset).filter(Dataset.created_at < thirty_days_ago) \ + .order_by(Dataset.created_at.desc()).paginate(page=page, per_page=50) + except NotFound: + break + page += 1 + for dataset in datasets: + dataset_query = db.session.query(DatasetQuery).filter( + DatasetQuery.created_at > thirty_days_ago, + DatasetQuery.dataset_id == dataset.id + ).all() + if not dataset_query or len(dataset_query) == 0: + documents = db.session.query(Document).filter( + Document.dataset_id == dataset.id, + Document.indexing_status == 'completed', + Document.enabled == True, + Document.archived == False, + Document.updated_at > thirty_days_ago + ).all() + if not documents or len(documents) == 0: + try: + all_documents = db.session.query(Document).filter( + Document.dataset_id == dataset.id, + Document.indexing_status == 'completed', + Document.enabled == True, + Document.archived == False, + ).all() + if all_documents and len(all_documents)>0: + update_params = { + Document.enabled: False + } + + Document.query.filter_by(dataset_id=dataset.id).update(update_params) + db.session.commit() + # remove index + vector_index = IndexBuilder.get_index(dataset, 'high_quality') + kw_index = IndexBuilder.get_index(dataset, 'economy') + # delete from vector index + if vector_index: + vector_index.delete() + kw_index.delete() + except Exception as e: + click.echo( + click.style('clean dataset index error: {} {}'.format(e.__class__.__name__, str(e)), + fg='red')) + end_at = time.perf_counter() + click.echo(click.style('Cleaned unused dataset from db success latency: {}'.format(end_at - start_at), fg='green')) + + @click.command('sync-anthropic-hosted-providers', help='Sync anthropic hosted providers.') def sync_anthropic_hosted_providers(): click.echo(click.style('Start sync anthropic hosted providers.', fg='green')) @@ -218,7 +277,9 @@ def sync_anthropic_hosted_providers(): ) count += 1 except Exception as e: - click.echo(click.style('Sync tenant anthropic hosted provider error: {} {}'.format(e.__class__.__name__, str(e)), fg='red')) + click.echo(click.style( + 'Sync tenant anthropic hosted provider error: {} {}'.format(e.__class__.__name__, str(e)), + fg='red')) continue click.echo(click.style('Congratulations! Synced {} anthropic hosted providers.'.format(count), fg='green')) @@ -231,3 +292,4 @@ def register_commands(app): app.cli.add_command(reset_encrypt_key_pair) app.cli.add_command(recreate_all_dataset_indexes) app.cli.add_command(sync_anthropic_hosted_providers) + app.cli.add_command(clean_unused_dataset_indexes) diff --git a/api/config.py b/api/config.py index 4ee1826d3f..205a82fa29 100644 --- a/api/config.py +++ b/api/config.py @@ -53,7 +53,8 @@ DEFAULTS = { 'DEFAULT_LLM_PROVIDER': 'openai', 'OPENAI_HOSTED_QUOTA_LIMIT': 200, 'ANTHROPIC_HOSTED_QUOTA_LIMIT': 1000, - 'TENANT_DOCUMENT_COUNT': 100 + 'TENANT_DOCUMENT_COUNT': 100, + 'CLEAN_DAY_SETTING': 30 } @@ -215,6 +216,7 @@ class Config: self.NOTION_INTEGRATION_TOKEN = get_env('NOTION_INTEGRATION_TOKEN') self.TENANT_DOCUMENT_COUNT = get_env('TENANT_DOCUMENT_COUNT') + self.CLEAN_DAY_SETTING = get_env('CLEAN_DAY_SETTING') class CloudEditionConfig(Config): diff --git a/api/controllers/console/datasets/datasets.py b/api/controllers/console/datasets/datasets.py index b7898d9667..4c880e5243 100644 --- a/api/controllers/console/datasets/datasets.py +++ b/api/controllers/console/datasets/datasets.py @@ -3,7 +3,6 @@ from flask import request from flask_login import login_required, current_user from flask_restful import Resource, reqparse, fields, marshal, marshal_with from werkzeug.exceptions import NotFound, Forbidden - import services from controllers.console import api from controllers.console.datasets.error import DatasetNameDuplicateError