From 3f1363503b4ea92b1f16d30ccfedf1fac7302054 Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Wed, 7 May 2025 16:19:09 +0800 Subject: [PATCH] r2 --- api/core/plugin/impl/datasource.py | 1 - .../index_processor/index_processor_base.py | 7 +- .../processor/paragraph_index_processor.py | 26 ++++++- .../processor/parent_child_index_processor.py | 35 +++++++++- .../nodes/knowledge_index/entities.py | 32 ++++++--- .../knowledge_index/knowledge_index_node.py | 68 +++++-------------- api/services/dataset_service.py | 20 +++--- 7 files changed, 116 insertions(+), 73 deletions(-) diff --git a/api/core/plugin/impl/datasource.py b/api/core/plugin/impl/datasource.py index 029f752d25..c69fa2fe32 100644 --- a/api/core/plugin/impl/datasource.py +++ b/api/core/plugin/impl/datasource.py @@ -95,7 +95,6 @@ class PluginDatasourceManager(BasePluginClient): "data": { "provider": datasource_provider_id.provider_name, "datasource": datasource_name, - "credentials": credentials, "datasource_parameters": datasource_parameters, }, diff --git a/api/core/rag/index_processor/index_processor_base.py b/api/core/rag/index_processor/index_processor_base.py index 2bcd1c79bb..d796c9fd24 100644 --- a/api/core/rag/index_processor/index_processor_base.py +++ b/api/core/rag/index_processor/index_processor_base.py @@ -1,7 +1,8 @@ """Abstract interface for document loader implementations.""" from abc import ABC, abstractmethod -from typing import Optional +from collections.abc import Mapping +from typing import Any, Optional from configs import dify_config from core.model_manager import ModelInstance @@ -33,6 +34,10 @@ class BaseIndexProcessor(ABC): def clean(self, dataset: Dataset, node_ids: Optional[list[str]], with_keywords: bool = True, **kwargs): raise NotImplementedError + @abstractmethod + def index(self, dataset: Dataset, document: Document, chunks: Mapping[str, Any]): + raise NotImplementedError + @abstractmethod def retrieve( self, diff --git a/api/core/rag/index_processor/processor/paragraph_index_processor.py b/api/core/rag/index_processor/processor/paragraph_index_processor.py index 7c2031258e..43d201af73 100644 --- a/api/core/rag/index_processor/processor/paragraph_index_processor.py +++ b/api/core/rag/index_processor/processor/paragraph_index_processor.py @@ -1,12 +1,14 @@ """Paragraph index processor.""" import uuid -from typing import Any, Mapping, Optional +from collections.abc import Mapping +from typing import Any, Optional from core.rag.cleaner.clean_processor import CleanProcessor from core.rag.datasource.keyword.keyword_factory import Keyword from core.rag.datasource.retrieval_service import RetrievalService from core.rag.datasource.vdb.vector_factory import Vector +from core.rag.docstore.dataset_docstore import DatasetDocumentStore from core.rag.extractor.entity.extract_setting import ExtractSetting from core.rag.extractor.extract_processor import ExtractProcessor from core.rag.index_processor.index_processor_base import BaseIndexProcessor @@ -129,4 +131,24 @@ class ParagraphIndexProcessor(BaseIndexProcessor): def index(self, dataset: Dataset, document: Document, chunks: Mapping[str, Any]): paragraph = GeneralStructureChunk(**chunks) - pass + documents = [] + for content in paragraph.general_chunk: + metadata = { + "dataset_id": dataset.id, + "document_id": document.id, + "doc_id": str(uuid.uuid4()), + "doc_hash": helper.generate_text_hash(content), + } + doc = Document(page_content=content, metadata=metadata) + documents.append(doc) + if documents: + # save node to document segment + doc_store = DatasetDocumentStore(dataset=dataset, user_id=document.created_by, document_id=document.id) + # add document segments + doc_store.add_documents(docs=documents, save_child=False) + if dataset.indexing_technique == "high_quality": + vector = Vector(dataset) + vector.create(documents) + elif dataset.indexing_technique == "economy": + keyword = Keyword(dataset) + keyword.add_texts(documents) diff --git a/api/core/rag/index_processor/processor/parent_child_index_processor.py b/api/core/rag/index_processor/processor/parent_child_index_processor.py index 1cde5e1c8f..ce64bb2a54 100644 --- a/api/core/rag/index_processor/processor/parent_child_index_processor.py +++ b/api/core/rag/index_processor/processor/parent_child_index_processor.py @@ -1,17 +1,20 @@ """Paragraph index processor.""" import uuid -from typing import Optional +from collections.abc import Mapping +from typing import Any, Optional from configs import dify_config from core.model_manager import ModelInstance from core.rag.cleaner.clean_processor import CleanProcessor from core.rag.datasource.retrieval_service import RetrievalService from core.rag.datasource.vdb.vector_factory import Vector +from core.rag.docstore.dataset_docstore import DatasetDocumentStore from core.rag.extractor.entity.extract_setting import ExtractSetting from core.rag.extractor.extract_processor import ExtractProcessor from core.rag.index_processor.index_processor_base import BaseIndexProcessor from core.rag.models.document import ChildDocument, Document +from core.workflow.nodes.knowledge_index.entities import ParentChildStructureChunk from extensions.ext_database import db from libs import helper from models.dataset import ChildChunk, Dataset, DocumentSegment @@ -202,3 +205,33 @@ class ParentChildIndexProcessor(BaseIndexProcessor): child_document.page_content = child_page_content child_nodes.append(child_document) return child_nodes + + def index(self, dataset: Dataset, document: Document, chunks: Mapping[str, Any]): + parent_childs = ParentChildStructureChunk(**chunks) + documents = [] + for parent_child in parent_childs.parent_child_chunks: + metadata = { + "dataset_id": dataset.id, + "document_id": document.id, + "doc_id": str(uuid.uuid4()), + "doc_hash": helper.generate_text_hash(parent_child.parent_content), + } + child_documents = [] + for child in parent_child.child_contents: + child_metadata = { + "dataset_id": dataset.id, + "document_id": document.id, + "doc_id": str(uuid.uuid4()), + "doc_hash": helper.generate_text_hash(child), + } + child_documents.append(ChildDocument(page_content=child, metadata=child_metadata)) + doc = Document(page_content=parent_child.parent_content, metadata=metadata, children=child_documents) + documents.append(doc) + if documents: + # save node to document segment + doc_store = DatasetDocumentStore(dataset=dataset, user_id=document.created_by, document_id=document.id) + # add document segments + doc_store.add_documents(docs=documents, save_child=True) + if dataset.indexing_technique == "high_quality": + vector = Vector(dataset) + vector.create(documents) diff --git a/api/core/workflow/nodes/knowledge_index/entities.py b/api/core/workflow/nodes/knowledge_index/entities.py index a87032dba6..635748799b 100644 --- a/api/core/workflow/nodes/knowledge_index/entities.py +++ b/api/core/workflow/nodes/knowledge_index/entities.py @@ -1,10 +1,8 @@ -from collections.abc import Sequence -from typing import Any, Literal, Optional, Union +from typing import Literal, Optional, Union -from pydantic import BaseModel, Field +from pydantic import BaseModel from core.workflow.nodes.base import BaseNodeData -from core.workflow.nodes.llm.entities import VisionConfig class RerankingModelConfig(BaseModel): @@ -15,6 +13,7 @@ class RerankingModelConfig(BaseModel): provider: str model: str + class VectorSetting(BaseModel): """ Vector Setting. @@ -32,6 +31,7 @@ class KeywordSetting(BaseModel): keyword_weight: float + class WeightedScoreConfig(BaseModel): """ Weighted score Config. @@ -45,6 +45,7 @@ class EmbeddingSetting(BaseModel): """ Embedding Setting. """ + embedding_provider_name: str embedding_model_name: str @@ -61,6 +62,7 @@ class RetrievalSetting(BaseModel): """ Retrieval Setting. """ + search_method: Literal["semantic_search", "keyword_search", "hybrid_search"] top_k: int score_threshold: Optional[float] = 0.5 @@ -70,49 +72,61 @@ class RetrievalSetting(BaseModel): reranking_model: Optional[RerankingModelConfig] = None weights: Optional[WeightedScoreConfig] = None + class IndexMethod(BaseModel): """ Knowledge Index Setting. """ + indexing_technique: Literal["high_quality", "economy"] embedding_setting: EmbeddingSetting economy_setting: EconomySetting + class FileInfo(BaseModel): """ File Info. """ + file_id: str + class OnlineDocumentIcon(BaseModel): """ Document Icon. """ + icon_url: str icon_type: str icon_emoji: str + class OnlineDocumentInfo(BaseModel): """ Online document info. """ + provider: str workspace_id: str page_id: str page_type: str icon: OnlineDocumentIcon + class WebsiteInfo(BaseModel): """ website import info. """ - provider: str + + provider: str url: str + class GeneralStructureChunk(BaseModel): """ General Structure Chunk. """ + general_chunk: list[str] data_source_info: Union[FileInfo, OnlineDocumentInfo, WebsiteInfo] @@ -121,14 +135,16 @@ class ParentChildChunk(BaseModel): """ Parent Child Chunk. """ + parent_content: str - child_content: list[str] + child_contents: list[str] class ParentChildStructureChunk(BaseModel): """ Parent Child Structure Chunk. """ + parent_child_chunks: list[ParentChildChunk] data_source_info: Union[FileInfo, OnlineDocumentInfo, WebsiteInfo] @@ -138,10 +154,10 @@ class KnowledgeIndexNodeData(BaseNodeData): Knowledge index Node Data. """ - type: str = "knowledge-index" + type: str = "knowledge-index" dataset_id: str + document_id: str index_chunk_variable_selector: list[str] chunk_structure: Literal["general", "parent-child"] index_method: IndexMethod retrieval_setting: RetrievalSetting - diff --git a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py index 7aa6b9379f..5f9ac78097 100644 --- a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py +++ b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py @@ -1,60 +1,22 @@ -import json import logging -import re import time -from collections import defaultdict -from collections.abc import Mapping, Sequence -from typing import Any, Optional, cast +from typing import Any, cast -from sqlalchemy import Integer, and_, func, or_, text -from sqlalchemy import cast as sqlalchemy_cast - -from core.app.app_config.entities import DatasetRetrieveConfigEntity -from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity -from core.entities.agent_entities import PlanningStrategy -from core.entities.model_entities import ModelStatus -from core.model_manager import ModelInstance, ModelManager -from core.model_runtime.entities.message_entities import PromptMessageRole -from core.model_runtime.entities.model_entities import ModelFeature, ModelType -from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel -from core.prompt.simple_prompt_transform import ModelMode -from core.rag.datasource.retrieval_service import RetrievalService -from core.rag.entities.metadata_entities import Condition, MetadataCondition -from core.rag.retrieval.dataset_retrieval import DatasetRetrieval from core.rag.retrieval.retrieval_methods import RetrievalMethod -from core.variables import StringSegment from core.variables.segments import ObjectSegment from core.workflow.entities.node_entities import NodeRunResult from core.workflow.nodes.enums import NodeType -from core.workflow.nodes.event.event import ModelInvokeCompletedEvent -from core.workflow.nodes.knowledge_retrieval.template_prompts import ( - METADATA_FILTER_ASSISTANT_PROMPT_1, - METADATA_FILTER_ASSISTANT_PROMPT_2, - METADATA_FILTER_COMPLETION_PROMPT, - METADATA_FILTER_SYSTEM_PROMPT, - METADATA_FILTER_USER_PROMPT_1, - METADATA_FILTER_USER_PROMPT_3, -) -from core.workflow.nodes.llm.entities import LLMNodeChatModelMessage, LLMNodeCompletionModelPromptTemplate from core.workflow.nodes.llm.node import LLMNode -from core.workflow.nodes.question_classifier.template_prompts import QUESTION_CLASSIFIER_USER_PROMPT_2 from extensions.ext_database import db from extensions.ext_redis import redis_client -from libs.json_in_md_parser import parse_and_check_json_markdown -from models.dataset import Dataset, DatasetMetadata, Document, RateLimitLog +from models.dataset import Dataset, Document, RateLimitLog from models.workflow import WorkflowNodeExecutionStatus -from services.dataset_service import DatasetService, DocumentService +from services.dataset_service import DocumentService from services.feature_service import FeatureService -from .entities import KnowledgeIndexNodeData, KnowledgeRetrievalNodeData, ModelConfig +from .entities import KnowledgeIndexNodeData from .exc import ( - InvalidModelTypeError, KnowledgeIndexNodeError, - KnowledgeRetrievalNodeError, - ModelCredentialsNotInitializedError, - ModelNotExistError, - ModelNotSupportedError, - ModelQuotaExceededError, ) logger = logging.getLogger(__name__) @@ -138,16 +100,15 @@ class KnowledgeIndexNode(LLMNode): error_type=type(e).__name__, ) - - def _invoke_knowledge_index(self, node_data: KnowledgeIndexNodeData, document_id: str, chunks: list[any]) -> Any: + def _invoke_knowledge_index(self, node_data: KnowledgeIndexNodeData, chunks: list[Any]) -> Any: dataset = Dataset.query.filter_by(id=node_data.dataset_id).first() if not dataset: raise KnowledgeIndexNodeError(f"Dataset {node_data.dataset_id} not found.") - - document = Document.query.filter_by(id=document_id).first() + + document = Document.query.filter_by(id=node_data.document_id).first() if not document: - raise KnowledgeIndexNodeError(f"Document {document_id} not found.") - + raise KnowledgeIndexNodeError(f"Document {node_data.document_id} not found.") + DocumentService.invoke_knowledge_index( dataset=dataset, document=document, @@ -156,5 +117,12 @@ class KnowledgeIndexNode(LLMNode): index_method=node_data.index_method, retrieval_setting=node_data.retrieval_setting, ) - - pass + + return { + "dataset_id": dataset.id, + "dataset_name": dataset.name, + "document_id": document.id, + "document_name": document.name, + "created_at": document.created_at, + "display_status": document.indexing_status, + } diff --git a/api/services/dataset_service.py b/api/services/dataset_service.py index 60d3ccf131..af1c1028cf 100644 --- a/api/services/dataset_service.py +++ b/api/services/dataset_service.py @@ -1441,11 +1441,7 @@ class DocumentService: index_method: IndexMethod, retrieval_setting: RetrievalSetting, chunk_structure: Literal["text_model", "hierarchical_model"], - original_document_id: str | None = None, - account: Account | Any, - created_from: str = "rag-pipline", ): - if not dataset.indexing_technique: if index_method.indexing_technique not in Dataset.INDEXING_TECHNIQUE_LIST: raise ValueError("Indexing technique is invalid") @@ -1453,7 +1449,10 @@ class DocumentService: dataset.indexing_technique = index_method.indexing_technique if index_method.indexing_technique == "high_quality": model_manager = ModelManager() - if index_method.embedding_setting.embedding_model and index_method.embedding_setting.embedding_model_provider: + if ( + index_method.embedding_setting.embedding_model + and index_method.embedding_setting.embedding_model_provider + ): dataset_embedding_model = index_method.embedding_setting.embedding_model dataset_embedding_model_provider = index_method.embedding_setting.embedding_model_provider else: @@ -1478,15 +1477,16 @@ class DocumentService: } dataset.retrieval_model = ( - retrieval_setting.model_dump() - if retrieval_setting - else default_retrieval_model + retrieval_setting.model_dump() if retrieval_setting else default_retrieval_model ) # type: ignore index_processor = IndexProcessorFactory(chunk_structure).init_index_processor() index_processor.index(dataset, document, chunks) - return documents, batch - + # update document status + document.indexing_status = "completed" + document.completed_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) + db.session.commit() + @staticmethod def check_documents_upload_quota(count: int, features: FeatureModel): can_upload_size = features.documents_upload_quota.limit - features.documents_upload_quota.size