Feat: Add Duplicate ID Check and Update Deletion Logic (#6376)

- Introduce the `check_duplicate_ids` function in `dataset.py` and
`doc.py` to check for and handle duplicate IDs.
- Update the deletion operation to ensure that when deleting datasets
and documents, error messages regarding duplicate IDs can be returned.
- Implement the `check_duplicate_ids` function in `api_utils.py` to
return unique IDs and error messages for duplicate IDs.


### What problem does this PR solve?

Close https://github.com/infiniflow/ragflow/issues/6234

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Co-authored-by: wenju.li <wenju.li@deepctr.cn>
Co-authored-by: Kevin Hu <kevinhu.sh@gmail.com>
This commit is contained in:
liwenju0 2025-03-21 14:05:17 +08:00 committed by GitHub
parent 7cc5603a82
commit efdfb39a33
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 86 additions and 10 deletions

View File

@ -30,7 +30,7 @@ from api.utils.api_utils import (
token_required, token_required,
get_error_data_result, get_error_data_result,
valid, valid,
get_parser_config, valid_parser_config, dataset_readonly_fields, get_parser_config, valid_parser_config, dataset_readonly_fields,check_duplicate_ids
) )
@ -237,7 +237,7 @@ def delete(tenant_id):
if not req: if not req:
ids = None ids = None
else: else:
ids = set(req.get("ids")) ids = req.get("ids")
if not ids: if not ids:
id_list = [] id_list = []
kbs = KnowledgebaseService.query(tenant_id=tenant_id) kbs = KnowledgebaseService.query(tenant_id=tenant_id)
@ -245,6 +245,9 @@ def delete(tenant_id):
id_list.append(kb.id) id_list.append(kb.id)
else: else:
id_list = ids id_list = ids
unique_id_list, duplicate_messages = check_duplicate_ids(id_list, "dataset")
id_list = unique_id_list
for id in id_list: for id in id_list:
kbs = KnowledgebaseService.query(id=id, tenant_id=tenant_id) kbs = KnowledgebaseService.query(id=id, tenant_id=tenant_id)
if not kbs: if not kbs:
@ -276,6 +279,11 @@ def delete(tenant_id):
) )
else: else:
return get_error_data_result(message="; ".join(errors)) return get_error_data_result(message="; ".join(errors))
if duplicate_messages:
if success_count > 0:
return get_result(message=f"Partially deleted {success_count} datasets with {len(duplicate_messages)} errors", data={"success_count": success_count, "errors": duplicate_messages},)
else:
return get_error_data_result(message=";".join(duplicate_messages))
return get_result(code=settings.RetCode.SUCCESS) return get_result(code=settings.RetCode.SUCCESS)

View File

@ -36,7 +36,7 @@ from api.db.services.document_service import DocumentService
from api.db.services.file2document_service import File2DocumentService from api.db.services.file2document_service import File2DocumentService
from api.db.services.file_service import FileService from api.db.services.file_service import FileService
from api.db.services.knowledgebase_service import KnowledgebaseService from api.db.services.knowledgebase_service import KnowledgebaseService
from api.utils.api_utils import construct_json_result, get_parser_config from api.utils.api_utils import construct_json_result, get_parser_config, check_duplicate_ids
from rag.nlp import search from rag.nlp import search
from rag.prompts import keyword_extraction from rag.prompts import keyword_extraction
from rag.app.tag import label_question from rag.app.tag import label_question
@ -584,7 +584,7 @@ def delete(tenant_id, dataset_id):
if not req: if not req:
doc_ids = None doc_ids = None
else: else:
doc_ids = set(req.get("ids")) doc_ids = req.get("ids")
if not doc_ids: if not doc_ids:
doc_list = [] doc_list = []
docs = DocumentService.query(kb_id=dataset_id) docs = DocumentService.query(kb_id=dataset_id)
@ -592,11 +592,16 @@ def delete(tenant_id, dataset_id):
doc_list.append(doc.id) doc_list.append(doc.id)
else: else:
doc_list = doc_ids doc_list = doc_ids
unique_doc_ids, duplicate_messages = check_duplicate_ids(doc_list, "document")
doc_list = unique_doc_ids
root_folder = FileService.get_root_folder(tenant_id) root_folder = FileService.get_root_folder(tenant_id)
pf_id = root_folder["id"] pf_id = root_folder["id"]
FileService.init_knowledgebase_docs(pf_id, tenant_id) FileService.init_knowledgebase_docs(pf_id, tenant_id)
errors = "" errors = ""
not_found = [] not_found = []
success_count = 0
for doc_id in doc_list: for doc_id in doc_list:
try: try:
e, doc = DocumentService.get_by_id(doc_id) e, doc = DocumentService.get_by_id(doc_id)
@ -624,6 +629,7 @@ def delete(tenant_id, dataset_id):
File2DocumentService.delete_by_document_id(doc_id) File2DocumentService.delete_by_document_id(doc_id)
STORAGE_IMPL.rm(b, n) STORAGE_IMPL.rm(b, n)
success_count += 1
except Exception as e: except Exception as e:
errors += str(e) errors += str(e)
@ -633,6 +639,12 @@ def delete(tenant_id, dataset_id):
if errors: if errors:
return get_result(message=errors, code=settings.RetCode.SERVER_ERROR) return get_result(message=errors, code=settings.RetCode.SERVER_ERROR)
if duplicate_messages:
if success_count > 0:
return get_result(message=f"Partially deleted {success_count} datasets with {len(duplicate_messages)} errors", data={"success_count": success_count, "errors": duplicate_messages},)
else:
return get_error_data_result(message=";".join(duplicate_messages))
return get_result() return get_result()
@ -680,8 +692,13 @@ def parse(tenant_id, dataset_id):
req = request.json req = request.json
if not req.get("document_ids"): if not req.get("document_ids"):
return get_error_data_result("`document_ids` is required") return get_error_data_result("`document_ids` is required")
doc_list = req.get("document_ids")
unique_doc_ids, duplicate_messages = check_duplicate_ids(doc_list, "document")
doc_list = unique_doc_ids
not_found = [] not_found = []
for id in set(req["document_ids"]): success_count = 0
for id in doc_list:
doc = DocumentService.query(id=id, kb_id=dataset_id) doc = DocumentService.query(id=id, kb_id=dataset_id)
if not doc: if not doc:
not_found.append(id) not_found.append(id)
@ -701,9 +718,14 @@ def parse(tenant_id, dataset_id):
doc["tenant_id"] = tenant_id doc["tenant_id"] = tenant_id
bucket, name = File2DocumentService.get_storage_address(doc_id=doc["id"]) bucket, name = File2DocumentService.get_storage_address(doc_id=doc["id"])
queue_tasks(doc, bucket, name, 0) queue_tasks(doc, bucket, name, 0)
success_count += 1
if not_found: if not_found:
return get_result(message=f"Documents not found: {not_found}", code=settings.RetCode.DATA_ERROR) return get_result(message=f"Documents not found: {not_found}", code=settings.RetCode.DATA_ERROR)
if duplicate_messages:
if success_count > 0:
return get_result(message=f"Partially parsed {success_count} documents with {len(duplicate_messages)} errors", data={"success_count": success_count, "errors": duplicate_messages},)
else:
return get_error_data_result(message=";".join(duplicate_messages))
return get_result() return get_result()
@ -750,9 +772,15 @@ def stop_parsing(tenant_id, dataset_id):
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id): if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.") return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
req = request.json req = request.json
if not req.get("document_ids"): if not req.get("document_ids"):
return get_error_data_result("`document_ids` is required") return get_error_data_result("`document_ids` is required")
for id in req["document_ids"]: doc_list = req.get("document_ids")
unique_doc_ids, duplicate_messages = check_duplicate_ids(doc_list, "document")
doc_list = unique_doc_ids
success_count = 0
for id in doc_list:
doc = DocumentService.query(id=id, kb_id=dataset_id) doc = DocumentService.query(id=id, kb_id=dataset_id)
if not doc: if not doc:
return get_error_data_result(message=f"You don't own the document {id}.") return get_error_data_result(message=f"You don't own the document {id}.")
@ -763,6 +791,12 @@ def stop_parsing(tenant_id, dataset_id):
info = {"run": "2", "progress": 0, "chunk_num": 0} info = {"run": "2", "progress": 0, "chunk_num": 0}
DocumentService.update_by_id(id, info) DocumentService.update_by_id(id, info)
settings.docStoreConn.delete({"doc_id": doc[0].id}, search.index_name(tenant_id), dataset_id) settings.docStoreConn.delete({"doc_id": doc[0].id}, search.index_name(tenant_id), dataset_id)
success_count += 1
if duplicate_messages:
if success_count > 0:
return get_result(message=f"Partially stopped {success_count} documents with {len(duplicate_messages)} errors", data={"success_count": success_count, "errors": duplicate_messages},)
else:
return get_error_data_result(message=";".join(duplicate_messages))
return get_result() return get_result()
@ -1125,12 +1159,15 @@ def rm_chunk(tenant_id, dataset_id, document_id):
req = request.json req = request.json
condition = {"doc_id": document_id} condition = {"doc_id": document_id}
if "chunk_ids" in req: if "chunk_ids" in req:
condition["id"] = req["chunk_ids"] unique_chunk_ids, duplicate_messages = check_duplicate_ids(req["chunk_ids"], "chunk")
condition["id"] = unique_chunk_ids
chunk_number = settings.docStoreConn.delete(condition, search.index_name(tenant_id), dataset_id) chunk_number = settings.docStoreConn.delete(condition, search.index_name(tenant_id), dataset_id)
if chunk_number != 0: if chunk_number != 0:
DocumentService.decrement_chunk_num(document_id, dataset_id, 1, chunk_number, 0) DocumentService.decrement_chunk_num(document_id, dataset_id, 1, chunk_number, 0)
if "chunk_ids" in req and chunk_number != len(req["chunk_ids"]): if "chunk_ids" in req and chunk_number != len(unique_chunk_ids):
return get_error_data_result(message=f"rm_chunk deleted chunks {chunk_number}, expect {len(req['chunk_ids'])}") return get_error_data_result(message=f"rm_chunk deleted chunks {chunk_number}, expect {len(unique_chunk_ids)}")
if duplicate_messages:
return get_result(message=f"Partially deleted {chunk_number} chunks with {len(duplicate_messages)} errors", data={"success_count": chunk_number, "errors": duplicate_messages},)
return get_result(message=f"deleted {chunk_number} chunks") return get_result(message=f"deleted {chunk_number} chunks")

View File

@ -411,3 +411,34 @@ def valid_parser_config(parser_config):
assert 0 <= parser_config.get("topn_tags", 0) < 10, "topn_tags should be in range from 0 to 10" assert 0 <= parser_config.get("topn_tags", 0) < 10, "topn_tags should be in range from 0 to 10"
assert isinstance(parser_config.get("html4excel", False), bool), "html4excel should be True or False" assert isinstance(parser_config.get("html4excel", False), bool), "html4excel should be True or False"
assert isinstance(parser_config.get("delimiter", ""), str), "delimiter should be str" assert isinstance(parser_config.get("delimiter", ""), str), "delimiter should be str"
def check_duplicate_ids(ids, id_type="item"):
"""
Check for duplicate IDs in a list and return unique IDs and error messages.
Args:
ids (list): List of IDs to check for duplicates
id_type (str): Type of ID for error messages (e.g., 'document', 'dataset', 'chunk')
Returns:
tuple: (unique_ids, error_messages)
- unique_ids (list): List of unique IDs
- error_messages (list): List of error messages for duplicate IDs
"""
id_count = {}
duplicate_messages = []
# Count occurrences of each ID
for id_value in ids:
id_count[id_value] = id_count.get(id_value, 0) + 1
# Check for duplicates
for id_value, count in id_count.items():
if count > 1:
duplicate_messages.append(f"Duplicate {id_type} ids: {id_value}")
# Return unique IDs and error messages
return list(set(ids)), duplicate_messages