This commit is contained in:
jyong 2025-05-07 16:19:09 +08:00
parent a998022c12
commit 3f1363503b
7 changed files with 116 additions and 73 deletions

View File

@ -95,7 +95,6 @@ class PluginDatasourceManager(BasePluginClient):
"data": { "data": {
"provider": datasource_provider_id.provider_name, "provider": datasource_provider_id.provider_name,
"datasource": datasource_name, "datasource": datasource_name,
"credentials": credentials, "credentials": credentials,
"datasource_parameters": datasource_parameters, "datasource_parameters": datasource_parameters,
}, },

View File

@ -1,7 +1,8 @@
"""Abstract interface for document loader implementations.""" """Abstract interface for document loader implementations."""
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Optional from collections.abc import Mapping
from typing import Any, Optional
from configs import dify_config from configs import dify_config
from core.model_manager import ModelInstance from core.model_manager import ModelInstance
@ -33,6 +34,10 @@ class BaseIndexProcessor(ABC):
def clean(self, dataset: Dataset, node_ids: Optional[list[str]], with_keywords: bool = True, **kwargs): def clean(self, dataset: Dataset, node_ids: Optional[list[str]], with_keywords: bool = True, **kwargs):
raise NotImplementedError raise NotImplementedError
@abstractmethod
def index(self, dataset: Dataset, document: Document, chunks: Mapping[str, Any]):
raise NotImplementedError
@abstractmethod @abstractmethod
def retrieve( def retrieve(
self, self,

View File

@ -1,12 +1,14 @@
"""Paragraph index processor.""" """Paragraph index processor."""
import uuid import uuid
from typing import Any, Mapping, Optional from collections.abc import Mapping
from typing import Any, Optional
from core.rag.cleaner.clean_processor import CleanProcessor from core.rag.cleaner.clean_processor import CleanProcessor
from core.rag.datasource.keyword.keyword_factory import Keyword from core.rag.datasource.keyword.keyword_factory import Keyword
from core.rag.datasource.retrieval_service import RetrievalService from core.rag.datasource.retrieval_service import RetrievalService
from core.rag.datasource.vdb.vector_factory import Vector from core.rag.datasource.vdb.vector_factory import Vector
from core.rag.docstore.dataset_docstore import DatasetDocumentStore
from core.rag.extractor.entity.extract_setting import ExtractSetting from core.rag.extractor.entity.extract_setting import ExtractSetting
from core.rag.extractor.extract_processor import ExtractProcessor from core.rag.extractor.extract_processor import ExtractProcessor
from core.rag.index_processor.index_processor_base import BaseIndexProcessor from core.rag.index_processor.index_processor_base import BaseIndexProcessor
@ -129,4 +131,24 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
def index(self, dataset: Dataset, document: Document, chunks: Mapping[str, Any]): def index(self, dataset: Dataset, document: Document, chunks: Mapping[str, Any]):
paragraph = GeneralStructureChunk(**chunks) paragraph = GeneralStructureChunk(**chunks)
pass documents = []
for content in paragraph.general_chunk:
metadata = {
"dataset_id": dataset.id,
"document_id": document.id,
"doc_id": str(uuid.uuid4()),
"doc_hash": helper.generate_text_hash(content),
}
doc = Document(page_content=content, metadata=metadata)
documents.append(doc)
if documents:
# save node to document segment
doc_store = DatasetDocumentStore(dataset=dataset, user_id=document.created_by, document_id=document.id)
# add document segments
doc_store.add_documents(docs=documents, save_child=False)
if dataset.indexing_technique == "high_quality":
vector = Vector(dataset)
vector.create(documents)
elif dataset.indexing_technique == "economy":
keyword = Keyword(dataset)
keyword.add_texts(documents)

View File

@ -1,17 +1,20 @@
"""Paragraph index processor.""" """Paragraph index processor."""
import uuid import uuid
from typing import Optional from collections.abc import Mapping
from typing import Any, Optional
from configs import dify_config from configs import dify_config
from core.model_manager import ModelInstance from core.model_manager import ModelInstance
from core.rag.cleaner.clean_processor import CleanProcessor from core.rag.cleaner.clean_processor import CleanProcessor
from core.rag.datasource.retrieval_service import RetrievalService from core.rag.datasource.retrieval_service import RetrievalService
from core.rag.datasource.vdb.vector_factory import Vector from core.rag.datasource.vdb.vector_factory import Vector
from core.rag.docstore.dataset_docstore import DatasetDocumentStore
from core.rag.extractor.entity.extract_setting import ExtractSetting from core.rag.extractor.entity.extract_setting import ExtractSetting
from core.rag.extractor.extract_processor import ExtractProcessor from core.rag.extractor.extract_processor import ExtractProcessor
from core.rag.index_processor.index_processor_base import BaseIndexProcessor from core.rag.index_processor.index_processor_base import BaseIndexProcessor
from core.rag.models.document import ChildDocument, Document from core.rag.models.document import ChildDocument, Document
from core.workflow.nodes.knowledge_index.entities import ParentChildStructureChunk
from extensions.ext_database import db from extensions.ext_database import db
from libs import helper from libs import helper
from models.dataset import ChildChunk, Dataset, DocumentSegment from models.dataset import ChildChunk, Dataset, DocumentSegment
@ -202,3 +205,33 @@ class ParentChildIndexProcessor(BaseIndexProcessor):
child_document.page_content = child_page_content child_document.page_content = child_page_content
child_nodes.append(child_document) child_nodes.append(child_document)
return child_nodes return child_nodes
def index(self, dataset: Dataset, document: Document, chunks: Mapping[str, Any]):
parent_childs = ParentChildStructureChunk(**chunks)
documents = []
for parent_child in parent_childs.parent_child_chunks:
metadata = {
"dataset_id": dataset.id,
"document_id": document.id,
"doc_id": str(uuid.uuid4()),
"doc_hash": helper.generate_text_hash(parent_child.parent_content),
}
child_documents = []
for child in parent_child.child_contents:
child_metadata = {
"dataset_id": dataset.id,
"document_id": document.id,
"doc_id": str(uuid.uuid4()),
"doc_hash": helper.generate_text_hash(child),
}
child_documents.append(ChildDocument(page_content=child, metadata=child_metadata))
doc = Document(page_content=parent_child.parent_content, metadata=metadata, children=child_documents)
documents.append(doc)
if documents:
# save node to document segment
doc_store = DatasetDocumentStore(dataset=dataset, user_id=document.created_by, document_id=document.id)
# add document segments
doc_store.add_documents(docs=documents, save_child=True)
if dataset.indexing_technique == "high_quality":
vector = Vector(dataset)
vector.create(documents)

View File

@ -1,10 +1,8 @@
from collections.abc import Sequence from typing import Literal, Optional, Union
from typing import Any, Literal, Optional, Union
from pydantic import BaseModel, Field from pydantic import BaseModel
from core.workflow.nodes.base import BaseNodeData from core.workflow.nodes.base import BaseNodeData
from core.workflow.nodes.llm.entities import VisionConfig
class RerankingModelConfig(BaseModel): class RerankingModelConfig(BaseModel):
@ -15,6 +13,7 @@ class RerankingModelConfig(BaseModel):
provider: str provider: str
model: str model: str
class VectorSetting(BaseModel): class VectorSetting(BaseModel):
""" """
Vector Setting. Vector Setting.
@ -32,6 +31,7 @@ class KeywordSetting(BaseModel):
keyword_weight: float keyword_weight: float
class WeightedScoreConfig(BaseModel): class WeightedScoreConfig(BaseModel):
""" """
Weighted score Config. Weighted score Config.
@ -45,6 +45,7 @@ class EmbeddingSetting(BaseModel):
""" """
Embedding Setting. Embedding Setting.
""" """
embedding_provider_name: str embedding_provider_name: str
embedding_model_name: str embedding_model_name: str
@ -61,6 +62,7 @@ class RetrievalSetting(BaseModel):
""" """
Retrieval Setting. Retrieval Setting.
""" """
search_method: Literal["semantic_search", "keyword_search", "hybrid_search"] search_method: Literal["semantic_search", "keyword_search", "hybrid_search"]
top_k: int top_k: int
score_threshold: Optional[float] = 0.5 score_threshold: Optional[float] = 0.5
@ -70,49 +72,61 @@ class RetrievalSetting(BaseModel):
reranking_model: Optional[RerankingModelConfig] = None reranking_model: Optional[RerankingModelConfig] = None
weights: Optional[WeightedScoreConfig] = None weights: Optional[WeightedScoreConfig] = None
class IndexMethod(BaseModel): class IndexMethod(BaseModel):
""" """
Knowledge Index Setting. Knowledge Index Setting.
""" """
indexing_technique: Literal["high_quality", "economy"] indexing_technique: Literal["high_quality", "economy"]
embedding_setting: EmbeddingSetting embedding_setting: EmbeddingSetting
economy_setting: EconomySetting economy_setting: EconomySetting
class FileInfo(BaseModel): class FileInfo(BaseModel):
""" """
File Info. File Info.
""" """
file_id: str file_id: str
class OnlineDocumentIcon(BaseModel): class OnlineDocumentIcon(BaseModel):
""" """
Document Icon. Document Icon.
""" """
icon_url: str icon_url: str
icon_type: str icon_type: str
icon_emoji: str icon_emoji: str
class OnlineDocumentInfo(BaseModel): class OnlineDocumentInfo(BaseModel):
""" """
Online document info. Online document info.
""" """
provider: str provider: str
workspace_id: str workspace_id: str
page_id: str page_id: str
page_type: str page_type: str
icon: OnlineDocumentIcon icon: OnlineDocumentIcon
class WebsiteInfo(BaseModel): class WebsiteInfo(BaseModel):
""" """
website import info. website import info.
""" """
provider: str
provider: str
url: str url: str
class GeneralStructureChunk(BaseModel): class GeneralStructureChunk(BaseModel):
""" """
General Structure Chunk. General Structure Chunk.
""" """
general_chunk: list[str] general_chunk: list[str]
data_source_info: Union[FileInfo, OnlineDocumentInfo, WebsiteInfo] data_source_info: Union[FileInfo, OnlineDocumentInfo, WebsiteInfo]
@ -121,14 +135,16 @@ class ParentChildChunk(BaseModel):
""" """
Parent Child Chunk. Parent Child Chunk.
""" """
parent_content: str parent_content: str
child_content: list[str] child_contents: list[str]
class ParentChildStructureChunk(BaseModel): class ParentChildStructureChunk(BaseModel):
""" """
Parent Child Structure Chunk. Parent Child Structure Chunk.
""" """
parent_child_chunks: list[ParentChildChunk] parent_child_chunks: list[ParentChildChunk]
data_source_info: Union[FileInfo, OnlineDocumentInfo, WebsiteInfo] data_source_info: Union[FileInfo, OnlineDocumentInfo, WebsiteInfo]
@ -138,10 +154,10 @@ class KnowledgeIndexNodeData(BaseNodeData):
Knowledge index Node Data. Knowledge index Node Data.
""" """
type: str = "knowledge-index" type: str = "knowledge-index"
dataset_id: str dataset_id: str
document_id: str
index_chunk_variable_selector: list[str] index_chunk_variable_selector: list[str]
chunk_structure: Literal["general", "parent-child"] chunk_structure: Literal["general", "parent-child"]
index_method: IndexMethod index_method: IndexMethod
retrieval_setting: RetrievalSetting retrieval_setting: RetrievalSetting

View File

@ -1,60 +1,22 @@
import json
import logging import logging
import re
import time import time
from collections import defaultdict from typing import Any, cast
from collections.abc import Mapping, Sequence
from typing import Any, Optional, cast
from sqlalchemy import Integer, and_, func, or_, text
from sqlalchemy import cast as sqlalchemy_cast
from core.app.app_config.entities import DatasetRetrieveConfigEntity
from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
from core.entities.agent_entities import PlanningStrategy
from core.entities.model_entities import ModelStatus
from core.model_manager import ModelInstance, ModelManager
from core.model_runtime.entities.message_entities import PromptMessageRole
from core.model_runtime.entities.model_entities import ModelFeature, ModelType
from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from core.prompt.simple_prompt_transform import ModelMode
from core.rag.datasource.retrieval_service import RetrievalService
from core.rag.entities.metadata_entities import Condition, MetadataCondition
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
from core.rag.retrieval.retrieval_methods import RetrievalMethod from core.rag.retrieval.retrieval_methods import RetrievalMethod
from core.variables import StringSegment
from core.variables.segments import ObjectSegment from core.variables.segments import ObjectSegment
from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.nodes.enums import NodeType from core.workflow.nodes.enums import NodeType
from core.workflow.nodes.event.event import ModelInvokeCompletedEvent
from core.workflow.nodes.knowledge_retrieval.template_prompts import (
METADATA_FILTER_ASSISTANT_PROMPT_1,
METADATA_FILTER_ASSISTANT_PROMPT_2,
METADATA_FILTER_COMPLETION_PROMPT,
METADATA_FILTER_SYSTEM_PROMPT,
METADATA_FILTER_USER_PROMPT_1,
METADATA_FILTER_USER_PROMPT_3,
)
from core.workflow.nodes.llm.entities import LLMNodeChatModelMessage, LLMNodeCompletionModelPromptTemplate
from core.workflow.nodes.llm.node import LLMNode from core.workflow.nodes.llm.node import LLMNode
from core.workflow.nodes.question_classifier.template_prompts import QUESTION_CLASSIFIER_USER_PROMPT_2
from extensions.ext_database import db from extensions.ext_database import db
from extensions.ext_redis import redis_client from extensions.ext_redis import redis_client
from libs.json_in_md_parser import parse_and_check_json_markdown from models.dataset import Dataset, Document, RateLimitLog
from models.dataset import Dataset, DatasetMetadata, Document, RateLimitLog
from models.workflow import WorkflowNodeExecutionStatus from models.workflow import WorkflowNodeExecutionStatus
from services.dataset_service import DatasetService, DocumentService from services.dataset_service import DocumentService
from services.feature_service import FeatureService from services.feature_service import FeatureService
from .entities import KnowledgeIndexNodeData, KnowledgeRetrievalNodeData, ModelConfig from .entities import KnowledgeIndexNodeData
from .exc import ( from .exc import (
InvalidModelTypeError,
KnowledgeIndexNodeError, KnowledgeIndexNodeError,
KnowledgeRetrievalNodeError,
ModelCredentialsNotInitializedError,
ModelNotExistError,
ModelNotSupportedError,
ModelQuotaExceededError,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -138,16 +100,15 @@ class KnowledgeIndexNode(LLMNode):
error_type=type(e).__name__, error_type=type(e).__name__,
) )
def _invoke_knowledge_index(self, node_data: KnowledgeIndexNodeData, chunks: list[Any]) -> Any:
def _invoke_knowledge_index(self, node_data: KnowledgeIndexNodeData, document_id: str, chunks: list[any]) -> Any:
dataset = Dataset.query.filter_by(id=node_data.dataset_id).first() dataset = Dataset.query.filter_by(id=node_data.dataset_id).first()
if not dataset: if not dataset:
raise KnowledgeIndexNodeError(f"Dataset {node_data.dataset_id} not found.") raise KnowledgeIndexNodeError(f"Dataset {node_data.dataset_id} not found.")
document = Document.query.filter_by(id=document_id).first() document = Document.query.filter_by(id=node_data.document_id).first()
if not document: if not document:
raise KnowledgeIndexNodeError(f"Document {document_id} not found.") raise KnowledgeIndexNodeError(f"Document {node_data.document_id} not found.")
DocumentService.invoke_knowledge_index( DocumentService.invoke_knowledge_index(
dataset=dataset, dataset=dataset,
document=document, document=document,
@ -156,5 +117,12 @@ class KnowledgeIndexNode(LLMNode):
index_method=node_data.index_method, index_method=node_data.index_method,
retrieval_setting=node_data.retrieval_setting, retrieval_setting=node_data.retrieval_setting,
) )
pass return {
"dataset_id": dataset.id,
"dataset_name": dataset.name,
"document_id": document.id,
"document_name": document.name,
"created_at": document.created_at,
"display_status": document.indexing_status,
}

View File

@ -1441,11 +1441,7 @@ class DocumentService:
index_method: IndexMethod, index_method: IndexMethod,
retrieval_setting: RetrievalSetting, retrieval_setting: RetrievalSetting,
chunk_structure: Literal["text_model", "hierarchical_model"], chunk_structure: Literal["text_model", "hierarchical_model"],
original_document_id: str | None = None,
account: Account | Any,
created_from: str = "rag-pipline",
): ):
if not dataset.indexing_technique: if not dataset.indexing_technique:
if index_method.indexing_technique not in Dataset.INDEXING_TECHNIQUE_LIST: if index_method.indexing_technique not in Dataset.INDEXING_TECHNIQUE_LIST:
raise ValueError("Indexing technique is invalid") raise ValueError("Indexing technique is invalid")
@ -1453,7 +1449,10 @@ class DocumentService:
dataset.indexing_technique = index_method.indexing_technique dataset.indexing_technique = index_method.indexing_technique
if index_method.indexing_technique == "high_quality": if index_method.indexing_technique == "high_quality":
model_manager = ModelManager() model_manager = ModelManager()
if index_method.embedding_setting.embedding_model and index_method.embedding_setting.embedding_model_provider: if (
index_method.embedding_setting.embedding_model
and index_method.embedding_setting.embedding_model_provider
):
dataset_embedding_model = index_method.embedding_setting.embedding_model dataset_embedding_model = index_method.embedding_setting.embedding_model
dataset_embedding_model_provider = index_method.embedding_setting.embedding_model_provider dataset_embedding_model_provider = index_method.embedding_setting.embedding_model_provider
else: else:
@ -1478,15 +1477,16 @@ class DocumentService:
} }
dataset.retrieval_model = ( dataset.retrieval_model = (
retrieval_setting.model_dump() retrieval_setting.model_dump() if retrieval_setting else default_retrieval_model
if retrieval_setting
else default_retrieval_model
) # type: ignore ) # type: ignore
index_processor = IndexProcessorFactory(chunk_structure).init_index_processor() index_processor = IndexProcessorFactory(chunk_structure).init_index_processor()
index_processor.index(dataset, document, chunks) index_processor.index(dataset, document, chunks)
return documents, batch # update document status
document.indexing_status = "completed"
document.completed_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
db.session.commit()
@staticmethod @staticmethod
def check_documents_upload_quota(count: int, features: FeatureModel): def check_documents_upload_quota(count: int, features: FeatureModel):
can_upload_size = features.documents_upload_quota.limit - features.documents_upload_quota.size can_upload_size = features.documents_upload_quota.limit - features.documents_upload_quota.size