From e2bf18053cbd3f3a6fa9b665c1d7df231bf79888 Mon Sep 17 00:00:00 2001 From: Jyong <76649700+JohnJyong@users.noreply.github.com> Date: Tue, 23 May 2023 22:54:59 +0800 Subject: [PATCH] Fix/dateset update rule (#177) --- api/services/dataset_service.py | 8 ++- api/tasks/deal_dataset_vector_index_task.py | 75 +++++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 api/tasks/deal_dataset_vector_index_task.py diff --git a/api/services/dataset_service.py b/api/services/dataset_service.py index 39004c3437..36fe127cc5 100644 --- a/api/services/dataset_service.py +++ b/api/services/dataset_service.py @@ -18,6 +18,7 @@ from services.errors.account import NoPermissionError from services.errors.dataset import DatasetNameDuplicateError from services.errors.document import DocumentIndexingError from services.errors.file import FileNotExistsError +from tasks.deal_dataset_vector_index_task import deal_dataset_vector_index_task from tasks.document_indexing_task import document_indexing_task @@ -97,7 +98,12 @@ class DatasetService: def update_dataset(dataset_id, data, user): dataset = DatasetService.get_dataset(dataset_id) DatasetService.check_dataset_permission(dataset, user) - + if dataset.indexing_technique != data['indexing_technique']: + # if update indexing_technique + if data['indexing_technique'] == 'economy': + deal_dataset_vector_index_task.delay(dataset_id, 'remove') + elif data['indexing_technique'] == 'high_quality': + deal_dataset_vector_index_task.delay(dataset_id, 'add') filtered_data = {k: v for k, v in data.items() if v is not None or k == 'description'} filtered_data['updated_by'] = user.id diff --git a/api/tasks/deal_dataset_vector_index_task.py b/api/tasks/deal_dataset_vector_index_task.py new file mode 100644 index 0000000000..f5f9129558 --- /dev/null +++ b/api/tasks/deal_dataset_vector_index_task.py @@ -0,0 +1,75 @@ +import logging +import time + +import click +from celery import shared_task +from llama_index.data_structs.node_v2 import DocumentRelationship, Node +from core.index.vector_index import VectorIndex +from extensions.ext_database import db +from models.dataset import DocumentSegment, Document, Dataset + + +@shared_task +def deal_dataset_vector_index_task(dataset_id: str, action: str): + """ + Async deal dataset from index + :param dataset_id: dataset_id + :param action: action + Usage: deal_dataset_vector_index_task.delay(dataset_id, action) + """ + logging.info(click.style('Start deal dataset vector index: {}'.format(dataset_id), fg='green')) + start_at = time.perf_counter() + + try: + dataset = Dataset.query.filter_by( + id=dataset_id + ).first() + if not dataset: + raise Exception('Dataset not found') + documents = Document.query.filter_by(dataset_id=dataset_id).all() + if documents: + vector_index = VectorIndex(dataset=dataset) + for document in documents: + # delete from vector index + if action == "remove": + vector_index.del_doc(document.id) + elif action == "add": + segments = db.session.query(DocumentSegment).filter( + DocumentSegment.document_id == document.id, + DocumentSegment.enabled == True + ) .order_by(DocumentSegment.position.asc()).all() + + nodes = [] + previous_node = None + for segment in segments: + relationships = { + DocumentRelationship.SOURCE: document.id + } + + if previous_node: + relationships[DocumentRelationship.PREVIOUS] = previous_node.doc_id + + previous_node.relationships[DocumentRelationship.NEXT] = segment.index_node_id + + node = Node( + doc_id=segment.index_node_id, + doc_hash=segment.index_node_hash, + text=segment.content, + extra_info=None, + node_info=None, + relationships=relationships + ) + + previous_node = node + nodes.append(node) + # save vector index + vector_index.add_nodes( + nodes=nodes, + duplicate_check=True + ) + + end_at = time.perf_counter() + logging.info( + click.style('Deal dataset vector index: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green')) + except Exception: + logging.exception("Deal dataset vector index failed")