From a25cc4e8afba463b8b02404e738ea92c7f7f90ea Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Tue, 6 May 2025 13:56:13 +0800 Subject: [PATCH] r2 --- .../processor/paragraph_index_processor.py | 7 +- .../knowledge_index/knowledge_index_node.py | 12 ++- api/services/dataset_service.py | 85 ++----------------- 3 files changed, 22 insertions(+), 82 deletions(-) diff --git a/api/core/rag/index_processor/processor/paragraph_index_processor.py b/api/core/rag/index_processor/processor/paragraph_index_processor.py index dca84b9041..79c2c16b90 100644 --- a/api/core/rag/index_processor/processor/paragraph_index_processor.py +++ b/api/core/rag/index_processor/processor/paragraph_index_processor.py @@ -1,7 +1,7 @@ """Paragraph index processor.""" import uuid -from typing import Optional +from typing import Any, Mapping, Optional from core.rag.cleaner.clean_processor import CleanProcessor from core.rag.datasource.keyword.keyword_factory import Keyword @@ -125,3 +125,8 @@ class ParagraphIndexProcessor(BaseIndexProcessor): doc = Document(page_content=result.page_content, metadata=metadata) docs.append(doc) return docs + + def index(self, dataset: Dataset, document: Document, chunks: list[Mapping[str, Any]]): + for chunk in chunks: + GeneralDocument.create( + pass diff --git a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py index 543a170fa7..7aa6b9379f 100644 --- a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py +++ b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py @@ -43,7 +43,7 @@ from extensions.ext_redis import redis_client from libs.json_in_md_parser import parse_and_check_json_markdown from models.dataset import Dataset, DatasetMetadata, Document, RateLimitLog from models.workflow import WorkflowNodeExecutionStatus -from services.dataset_service import DatasetService +from services.dataset_service import DatasetService, DocumentService from services.feature_service import FeatureService from .entities import KnowledgeIndexNodeData, KnowledgeRetrievalNodeData, ModelConfig @@ -139,14 +139,20 @@ class KnowledgeIndexNode(LLMNode): ) - def _invoke_knowledge_index(self, node_data: KnowledgeIndexNodeData, chunks: list[any]) -> Any: + def _invoke_knowledge_index(self, node_data: KnowledgeIndexNodeData, document_id: str, chunks: list[any]) -> Any: dataset = Dataset.query.filter_by(id=node_data.dataset_id).first() if not dataset: raise KnowledgeIndexNodeError(f"Dataset {node_data.dataset_id} not found.") - DatasetService.invoke_knowledge_index( + document = Document.query.filter_by(id=document_id).first() + if not document: + raise KnowledgeIndexNodeError(f"Document {document_id} not found.") + + DocumentService.invoke_knowledge_index( dataset=dataset, + document=document, chunks=chunks, + chunk_structure=node_data.chunk_structure, index_method=node_data.index_method, retrieval_setting=node_data.retrieval_setting, ) diff --git a/api/services/dataset_service.py b/api/services/dataset_service.py index 3e14a92f36..60d3ccf131 100644 --- a/api/services/dataset_service.py +++ b/api/services/dataset_service.py @@ -6,7 +6,7 @@ import random import time import uuid from collections import Counter -from typing import Any, Optional +from typing import Any, Literal, Optional from flask_login import current_user # type: ignore from sqlalchemy import func @@ -20,6 +20,7 @@ from core.model_runtime.entities.model_entities import ModelType from core.plugin.entities.plugin import ModelProviderID from core.rag.index_processor.constant.built_in_field import BuiltInField from core.rag.index_processor.constant.index_type import IndexType +from core.rag.index_processor.index_processor_factory import IndexProcessorFactory from core.rag.retrieval.retrieval_methods import RetrievalMethod from core.workflow.nodes.knowledge_index.entities import IndexMethod, RetrievalSetting from events.dataset_event import dataset_was_deleted @@ -1435,9 +1436,11 @@ class DocumentService: @staticmethod def invoke_knowledge_index( dataset: Dataset, + document: Document, chunks: list[Any], index_method: IndexMethod, retrieval_setting: RetrievalSetting, + chunk_structure: Literal["text_model", "hierarchical_model"], original_document_id: str | None = None, account: Account | Any, created_from: str = "rag-pipline", @@ -1479,85 +1482,11 @@ class DocumentService: if retrieval_setting else default_retrieval_model ) # type: ignore - - documents = [] - if original_document_id: - document = DocumentService.update_document_with_dataset_id(dataset, knowledge_config, account) - documents.append(document) - batch = document.batch - else: - batch = time.strftime("%Y%m%d%H%M%S") + str(random.randint(100000, 999999)) - - lock_name = "add_document_lock_dataset_id_{}".format(dataset.id) - with redis_client.lock(lock_name, timeout=600): - position = DocumentService.get_documents_position(dataset.id) - document_ids = [] - duplicate_document_ids = [] - for chunk in chunks: - file = ( - db.session.query(UploadFile) - .filter(UploadFile.tenant_id == dataset.tenant_id, UploadFile.id == file_id) - .first() - ) - - # raise error if file not found - if not file: - raise FileNotExistsError() - - file_name = file.name - data_source_info = { - "upload_file_id": file_id, - } - # check duplicate - if knowledge_config.duplicate: - document = Document.query.filter_by( - dataset_id=dataset.id, - tenant_id=current_user.current_tenant_id, - data_source_type="upload_file", - enabled=True, - name=file_name, - ).first() - if document: - document.dataset_process_rule_id = dataset_process_rule.id # type: ignore - document.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) - document.created_from = created_from - document.doc_form = knowledge_config.doc_form - document.doc_language = knowledge_config.doc_language - document.data_source_info = json.dumps(data_source_info) - document.batch = batch - document.indexing_status = "waiting" - db.session.add(document) - documents.append(document) - duplicate_document_ids.append(document.id) - continue - document = DocumentService.build_document( - dataset, - dataset_process_rule.id, # type: ignore - knowledge_config.data_source.info_list.data_source_type, # type: ignore - knowledge_config.doc_form, - knowledge_config.doc_language, - data_source_info, - created_from, - position, - account, - file_name, - batch, - ) - db.session.add(document) - db.session.flush() - document_ids.append(document.id) - documents.append(document) - position += 1 - - db.session.commit() - - # trigger async task - if document_ids: - document_indexing_task.delay(dataset.id, document_ids) - if duplicate_document_ids: - duplicate_document_indexing_task.delay(dataset.id, duplicate_document_ids) + index_processor = IndexProcessorFactory(chunk_structure).init_index_processor() + index_processor.index(dataset, document, chunks) return documents, batch + @staticmethod def check_documents_upload_quota(count: int, features: FeatureModel): can_upload_size = features.documents_upload_quota.limit - features.documents_upload_quota.size