This commit is contained in:
jyong 2025-05-06 13:56:13 +08:00
parent 3c386c63a6
commit a25cc4e8af
3 changed files with 22 additions and 82 deletions

View File

@ -1,7 +1,7 @@
"""Paragraph index processor."""
import uuid
from typing import Optional
from typing import Any, Mapping, Optional
from core.rag.cleaner.clean_processor import CleanProcessor
from core.rag.datasource.keyword.keyword_factory import Keyword
@ -125,3 +125,8 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
doc = Document(page_content=result.page_content, metadata=metadata)
docs.append(doc)
return docs
def index(self, dataset: Dataset, document: Document, chunks: list[Mapping[str, Any]]):
for chunk in chunks:
GeneralDocument.create(
pass

View File

@ -43,7 +43,7 @@ from extensions.ext_redis import redis_client
from libs.json_in_md_parser import parse_and_check_json_markdown
from models.dataset import Dataset, DatasetMetadata, Document, RateLimitLog
from models.workflow import WorkflowNodeExecutionStatus
from services.dataset_service import DatasetService
from services.dataset_service import DatasetService, DocumentService
from services.feature_service import FeatureService
from .entities import KnowledgeIndexNodeData, KnowledgeRetrievalNodeData, ModelConfig
@ -139,14 +139,20 @@ class KnowledgeIndexNode(LLMNode):
)
def _invoke_knowledge_index(self, node_data: KnowledgeIndexNodeData, chunks: list[any]) -> Any:
def _invoke_knowledge_index(self, node_data: KnowledgeIndexNodeData, document_id: str, chunks: list[any]) -> Any:
dataset = Dataset.query.filter_by(id=node_data.dataset_id).first()
if not dataset:
raise KnowledgeIndexNodeError(f"Dataset {node_data.dataset_id} not found.")
DatasetService.invoke_knowledge_index(
document = Document.query.filter_by(id=document_id).first()
if not document:
raise KnowledgeIndexNodeError(f"Document {document_id} not found.")
DocumentService.invoke_knowledge_index(
dataset=dataset,
document=document,
chunks=chunks,
chunk_structure=node_data.chunk_structure,
index_method=node_data.index_method,
retrieval_setting=node_data.retrieval_setting,
)

View File

@ -6,7 +6,7 @@ import random
import time
import uuid
from collections import Counter
from typing import Any, Optional
from typing import Any, Literal, Optional
from flask_login import current_user # type: ignore
from sqlalchemy import func
@ -20,6 +20,7 @@ from core.model_runtime.entities.model_entities import ModelType
from core.plugin.entities.plugin import ModelProviderID
from core.rag.index_processor.constant.built_in_field import BuiltInField
from core.rag.index_processor.constant.index_type import IndexType
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
from core.rag.retrieval.retrieval_methods import RetrievalMethod
from core.workflow.nodes.knowledge_index.entities import IndexMethod, RetrievalSetting
from events.dataset_event import dataset_was_deleted
@ -1435,9 +1436,11 @@ class DocumentService:
@staticmethod
def invoke_knowledge_index(
dataset: Dataset,
document: Document,
chunks: list[Any],
index_method: IndexMethod,
retrieval_setting: RetrievalSetting,
chunk_structure: Literal["text_model", "hierarchical_model"],
original_document_id: str | None = None,
account: Account | Any,
created_from: str = "rag-pipline",
@ -1479,85 +1482,11 @@ class DocumentService:
if retrieval_setting
else default_retrieval_model
) # type: ignore
documents = []
if original_document_id:
document = DocumentService.update_document_with_dataset_id(dataset, knowledge_config, account)
documents.append(document)
batch = document.batch
else:
batch = time.strftime("%Y%m%d%H%M%S") + str(random.randint(100000, 999999))
lock_name = "add_document_lock_dataset_id_{}".format(dataset.id)
with redis_client.lock(lock_name, timeout=600):
position = DocumentService.get_documents_position(dataset.id)
document_ids = []
duplicate_document_ids = []
for chunk in chunks:
file = (
db.session.query(UploadFile)
.filter(UploadFile.tenant_id == dataset.tenant_id, UploadFile.id == file_id)
.first()
)
# raise error if file not found
if not file:
raise FileNotExistsError()
file_name = file.name
data_source_info = {
"upload_file_id": file_id,
}
# check duplicate
if knowledge_config.duplicate:
document = Document.query.filter_by(
dataset_id=dataset.id,
tenant_id=current_user.current_tenant_id,
data_source_type="upload_file",
enabled=True,
name=file_name,
).first()
if document:
document.dataset_process_rule_id = dataset_process_rule.id # type: ignore
document.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
document.created_from = created_from
document.doc_form = knowledge_config.doc_form
document.doc_language = knowledge_config.doc_language
document.data_source_info = json.dumps(data_source_info)
document.batch = batch
document.indexing_status = "waiting"
db.session.add(document)
documents.append(document)
duplicate_document_ids.append(document.id)
continue
document = DocumentService.build_document(
dataset,
dataset_process_rule.id, # type: ignore
knowledge_config.data_source.info_list.data_source_type, # type: ignore
knowledge_config.doc_form,
knowledge_config.doc_language,
data_source_info,
created_from,
position,
account,
file_name,
batch,
)
db.session.add(document)
db.session.flush()
document_ids.append(document.id)
documents.append(document)
position += 1
db.session.commit()
# trigger async task
if document_ids:
document_indexing_task.delay(dataset.id, document_ids)
if duplicate_document_ids:
duplicate_document_indexing_task.delay(dataset.id, duplicate_document_ids)
index_processor = IndexProcessorFactory(chunk_structure).init_index_processor()
index_processor.index(dataset, document, chunks)
return documents, batch
@staticmethod
def check_documents_upload_quota(count: int, features: FeatureModel):
can_upload_size = features.documents_upload_quota.limit - features.documents_upload_quota.size