diff --git a/api/apps/dataset_api.py b/api/apps/dataset_api.py index cb115d0b5..b9d1d7d39 100644 --- a/api/apps/dataset_api.py +++ b/api/apps/dataset_api.py @@ -16,6 +16,7 @@ import os import pathlib import re import warnings +from functools import partial from io import BytesIO from elasticsearch_dsl import Q @@ -26,13 +27,12 @@ from httpx import HTTPError from api.contants import NAME_LENGTH_LIMIT from api.db import FileType, ParserType, FileSource, TaskStatus from api.db import StatusEnum -from api.db.db_models import File, Task +from api.db.db_models import File from api.db.services import duplicate_name from api.db.services.document_service import DocumentService from api.db.services.file2document_service import File2DocumentService from api.db.services.file_service import FileService from api.db.services.knowledgebase_service import KnowledgebaseService -from api.db.services.task_service import TaskService from api.db.services.user_service import TenantService from api.settings import RetCode from api.utils import get_uuid @@ -233,9 +233,10 @@ def update_dataset(dataset_id): if chunk_num == 0: dataset_updating_data["embd_id"] = req["embedding_model_id"] else: - return construct_json_result(code=RetCode.DATA_ERROR, message="You have already parsed the document in this " - "dataset, so you cannot change the embedding " - "model.") + return construct_json_result(code=RetCode.DATA_ERROR, + message="You have already parsed the document in this " + "dataset, so you cannot change the embedding " + "model.") # only if chunk_num is 0, the user can update the chunk_method if "chunk_method" in req: type_value = req["chunk_method"] @@ -614,35 +615,39 @@ def download_document(dataset_id, document_id): # ----------------------------start parsing a document----------------------------------------------------- # helper method for parsing -def dummy(prog=None, msg=""): - pass +# callback method +def doc_parse_callback(doc_id, prog=None, msg=""): + cancel = DocumentService.do_cancel(doc_id) + if cancel: + raise Exception("The parsing process has been cancelled!") -def doc_parse(binary, doc_name, parser_name, tenant_id): +def doc_parse(binary, doc_name, parser_name, tenant_id, doc_id): match parser_name: case "book": - book.chunk(doc_name, binary=binary, callback=dummy) + book.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id)) case "laws": - laws.chunk(doc_name, binary=binary, callback=dummy) + laws.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id)) case "manual": - manual.chunk(doc_name, binary=binary, callback=dummy) + manual.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id)) case "naive": # It's the mode by default, which is general in the front-end - naive.chunk(doc_name, binary=binary, callback=dummy) + naive.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id)) case "one": - one.chunk(doc_name, binary=binary, callback=dummy) + one.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id)) case "paper": - paper.chunk(doc_name, binary=binary, callback=dummy) + paper.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id)) case "picture": - picture.chunk(doc_name, binary=binary, tenant_id=tenant_id, lang="Chinese", callback=dummy) + picture.chunk(doc_name, binary=binary, tenant_id=tenant_id, lang="Chinese", + callback=partial(doc_parse_callback, doc_id)) case "presentation": - presentation.chunk(doc_name, binary=binary, callback=dummy) + presentation.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id)) case "qa": - qa.chunk(doc_name, binary=binary, callback=dummy) + qa.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id)) case "resume": - resume.chunk(doc_name, binary=binary, callback=dummy) + resume.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id)) case "table": - table.chunk(doc_name, binary=binary, callback=dummy) + table.chunk(doc_name, binary=binary, callback=partial(doc_parse_callback, doc_id)) case _: return False @@ -658,13 +663,8 @@ def parse_document(dataset_id, document_id): if not exist: return construct_json_result(code=RetCode.DATA_ERROR, message=f"This dataset '{dataset_id}' cannot be found!") - message = "" - res = get_message_during_parsing_document(document_id, message) - if isinstance(res, str): - message += res - return construct_json_result(code=RetCode.SUCCESS, message=message) - else: - return res + + return parsing_document_internal(document_id) except Exception as e: return construct_error_response(e) @@ -680,34 +680,31 @@ def parse_documents(dataset_id): if not exist: return construct_json_result(code=RetCode.DATA_ERROR, message=f"This dataset '{dataset_id}' cannot be found!") - - def process(doc_ids): - message = "" - # for loop - for id in doc_ids: - res = get_message_during_parsing_document(id, message) - if isinstance(res, str): - message += res - else: - return res - return construct_json_result(data=True, code=RetCode.SUCCESS, message=message) - # two conditions - if doc_ids: - return process(doc_ids) - else: + if not doc_ids: # documents inside the dataset docs, total = DocumentService.list_documents_in_dataset(dataset_id, 0, -1, "create_time", True, "") doc_ids = [doc["id"] for doc in docs] - return process(doc_ids) + + message = "" + # for loop + for id in doc_ids: + res = parsing_document_internal(id) + res_body = res.json + if res_body["code"] == RetCode.SUCCESS: + message += res_body["message"] + else: + return res + return construct_json_result(data=True, code=RetCode.SUCCESS, message=message) except Exception as e: return construct_error_response(e) -# helper method for getting message or response when parsing the document -def get_message_during_parsing_document(id, message): +# helper method for parsing the document +def parsing_document_internal(id): + message = "" try: # Check whether there is this document exist, document = DocumentService.get_by_id(id) @@ -736,7 +733,7 @@ def get_message_during_parsing_document(id, message): binary = MINIO.get(bucket, doc_name) parser_name = doc_attributes["parser_id"] if binary: - res = doc_parse(binary, doc_name, parser_name, tenant_id) + res = doc_parse(binary, doc_name, parser_name, tenant_id, doc_id) if res is False: message += f"The parser id: {parser_name} of the document {doc_id} is not supported; " else: @@ -744,10 +741,94 @@ def get_message_during_parsing_document(id, message): # failed in parsing if doc_attributes["status"] == TaskStatus.FAIL.value: message += f"Failed in parsing the document: {doc_id}; " - return message + return construct_json_result(code=RetCode.SUCCESS, message=message) except Exception as e: return construct_error_response(e) -# ----------------------------stop parsing----------------------------------------------------- + + +# ----------------------------stop parsing a doc----------------------------------------------------- +@manager.route("/documents//status", methods=["DELETE"]) +@login_required +def stop_parsing_document(dataset_id, document_id): + try: + # valid dataset + exist, _ = KnowledgebaseService.get_by_id(dataset_id) + if not exist: + return construct_json_result(code=RetCode.DATA_ERROR, + message=f"This dataset '{dataset_id}' cannot be found!") + + return stop_parsing_document_internal(document_id) + + except Exception as e: + return construct_error_response(e) + + +# ----------------------------stop parsing docs----------------------------------------------------- +@manager.route("/documents/status", methods=["DELETE"]) +@login_required +def stop_parsing_documents(dataset_id): + doc_ids = request.json["doc_ids"] + try: + # valid dataset? + exist, _ = KnowledgebaseService.get_by_id(dataset_id) + if not exist: + return construct_json_result(code=RetCode.DATA_ERROR, + message=f"This dataset '{dataset_id}' cannot be found!") + if not doc_ids: + # documents inside the dataset + docs, total = DocumentService.list_documents_in_dataset(dataset_id, 0, -1, "create_time", + True, "") + doc_ids = [doc["id"] for doc in docs] + + message = "" + # for loop + for id in doc_ids: + res = stop_parsing_document_internal(id) + res_body = res.json + if res_body["code"] == RetCode.SUCCESS: + message += res_body["message"] + else: + return res + return construct_json_result(data=True, code=RetCode.SUCCESS, message=message) + + except Exception as e: + return construct_error_response(e) + + +# Helper method +def stop_parsing_document_internal(document_id): + try: + # valid doc? + exist, doc = DocumentService.get_by_id(document_id) + if not exist: + return construct_json_result(message=f"This document '{document_id}' cannot be found!", + code=RetCode.ARGUMENT_ERROR) + doc_attributes = doc.to_dict() + + # only when the status is parsing, we need to stop it + if doc_attributes["status"] == TaskStatus.RUNNING.value: + tenant_id = DocumentService.get_tenant_id(document_id) + if not tenant_id: + return construct_json_result(message="Tenant not found!", code=RetCode.AUTHENTICATION_ERROR) + + # update successfully? + if not DocumentService.update_by_id(document_id, {"status": "2"}): # cancel + return construct_json_result( + code=RetCode.OPERATING_ERROR, + message="There was an error during the stopping parsing the document process. " + "Please check the status of the RAGFlow server and try the update again." + ) + + _, doc_attributes = DocumentService.get_by_id(document_id) + doc_attributes = doc_attributes.to_dict() + + # failed in stop parsing + if doc_attributes["status"] == TaskStatus.RUNNING.value: + return construct_json_result(message=f"Failed in parsing the document: {document_id}; ", code=RetCode.SUCCESS) + return construct_json_result(code=RetCode.SUCCESS, message="") + except Exception as e: + return construct_error_response(e) + # ----------------------------show the status of the file----------------------------------------------------- @manager.route("//documents//status", methods=["GET"]) @@ -774,6 +855,7 @@ def show_parsing_status(dataset_id, document_id): ) except Exception as e: return construct_error_response(e) + # ----------------------------list the chunks of the file----------------------------------------------------- # -- --------------------------delete the chunk----------------------------------------------------- diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 6a54d22c7..f3195a502 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -333,6 +333,17 @@ class DocumentService(CommonService): cls.model.kb_id == kb_id).dicts()) + @classmethod + @DB.connection_context() + def do_cancel(cls, doc_id): + try: + _, doc = DocumentService.get_by_id(doc_id) + return doc.run == TaskStatus.CANCEL.value or doc.progress < 0 + except Exception as e: + pass + return False + + def queue_raptor_tasks(doc): def new_task(): nonlocal doc @@ -347,4 +358,4 @@ def queue_raptor_tasks(doc): task = new_task() bulk_insert_into_db(Task, [task], True) task["type"] = "raptor" - assert REDIS_CONN.queue_product(SVR_QUEUE_NAME, message=task), "Can't access Redis. Please check the Redis' status." \ No newline at end of file + assert REDIS_CONN.queue_product(SVR_QUEUE_NAME, message=task), "Can't access Redis. Please check the Redis' status." diff --git a/docs/references/ragflow_api.md b/docs/references/ragflow_api.md index b62477970..881ee8865 100644 --- a/docs/references/ragflow_api.md +++ b/docs/references/ragflow_api.md @@ -758,7 +758,7 @@ This method enables a specific document to start parsing for a specific user. ```json { "code": 102, - "message": "This dataset 'imagination.txt' cannot be found!" + "message": "This dataset 'imagination' cannot be found!" } ``` diff --git a/sdk/python/ragflow/ragflow.py b/sdk/python/ragflow/ragflow.py index 3e8f94984..cc2927778 100644 --- a/sdk/python/ragflow/ragflow.py +++ b/sdk/python/ragflow/ragflow.py @@ -157,6 +157,17 @@ class RAGFlow: return res.json() # ----------------------------stop parsing----------------------------------------------------- + def stop_parsing_document(self, dataset_id, document_id): + endpoint = f"{self.dataset_url}/{dataset_id}/documents/{document_id}/status" + res = requests.delete(endpoint, headers=self.authorization_header) + + return res.json() + + def stop_parsing_documents(self, dataset_id, doc_ids=None): + endpoint = f"{self.dataset_url}/{dataset_id}/documents/status" + res = requests.delete(endpoint, headers=self.authorization_header, json={"doc_ids": doc_ids}) + + return res.json() # ----------------------------show the status of the file----------------------------------------------------- def show_parsing_status(self, dataset_id, document_id): diff --git a/sdk/python/test/test_document.py b/sdk/python/test/test_document.py index aef638aa4..efc2430d4 100644 --- a/sdk/python/test/test_document.py +++ b/sdk/python/test/test_document.py @@ -949,7 +949,126 @@ class TestFile(TestSdk): # parse res = ragflow.start_parsing_documents(created_res_id, doc_ids) assert res["code"] == RetCode.SUCCESS and res["message"] == "" + # ----------------------------stop parsing----------------------------------------------------- + def test_stop_parsing_document_with_success(self): + """ + Test the stopping parsing of a document with success. + """ + # create a dataset + ragflow = RAGFlow(API_KEY, HOST_ADDRESS) + created_res = ragflow.create_dataset("test_start_parsing_document_with_success") + created_res_id = created_res["data"]["dataset_id"] + # upload files + file_paths = ["test_data/lol.txt"] + uploading_res = ragflow.upload_local_file(created_res_id, file_paths) + # get the doc_id + data = uploading_res["data"][0] + doc_id = data["id"] + # parse file + res = ragflow.start_parsing_document(created_res_id, doc_id) + assert res["code"] == RetCode.SUCCESS and res["message"] == "" + res = ragflow.stop_parsing_document(created_res_id, doc_id) + assert res["code"] == RetCode.SUCCESS and res["message"] == "" + + def test_stop_parsing_nonexistent_document(self): + """ + Test the stopping parsing a document which does not exist. + """ + # create a dataset + ragflow = RAGFlow(API_KEY, HOST_ADDRESS) + created_res = ragflow.create_dataset("test_start_parsing_nonexistent_document") + created_res_id = created_res["data"]["dataset_id"] + res = ragflow.stop_parsing_document(created_res_id, "imagination.txt") + assert res["code"] == RetCode.ARGUMENT_ERROR and res["message"] == "This document 'imagination.txt' cannot be found!" + + def test_stop_parsing_document_in_nonexistent_dataset(self): + """ + Test the stopping parsing a document whose dataset is nonexistent. + """ + # create a dataset + ragflow = RAGFlow(API_KEY, HOST_ADDRESS) + created_res = ragflow.create_dataset("test_download_nonexistent_document") + created_res_id = created_res["data"]["dataset_id"] + # upload files + file_paths = ["test_data/test.txt"] + uploading_res = ragflow.upload_local_file(created_res_id, file_paths) + # get the doc_id + data = uploading_res["data"][0] + doc_id = data["id"] + # parse + res = ragflow.stop_parsing_document("imagination", doc_id) + assert res["code"] == RetCode.DATA_ERROR and res["message"] == "This dataset 'imagination' cannot be found!" + + # ------------------------stop parsing multiple documents---------------------------- + def test_stop_parsing_documents_in_nonexistent_dataset(self): + """ + Test the stopping parsing documents whose dataset is nonexistent. + """ + # create a dataset + ragflow = RAGFlow(API_KEY, HOST_ADDRESS) + created_res = ragflow.create_dataset("test_download_nonexistent_document") + created_res_id = created_res["data"]["dataset_id"] + # upload files + file_paths = ["test_data/test.txt"] + uploading_res = ragflow.upload_local_file(created_res_id, file_paths) + # parse + res = ragflow.stop_parsing_documents("imagination") + assert res["code"] == RetCode.DATA_ERROR and res["message"] == "This dataset 'imagination' cannot be found!" + + def test_stop_parsing_multiple_documents(self): + """ + Test the stopping parsing documents with a success. + """ + # create a dataset + ragflow = RAGFlow(API_KEY, HOST_ADDRESS) + created_res = ragflow.create_dataset("test_start_parsing_multiple_documents") + created_res_id = created_res["data"]["dataset_id"] + # upload files + file_paths = ["test_data/test.txt", "test_data/test1.txt"] + ragflow.upload_local_file(created_res_id, file_paths) + res = ragflow.start_parsing_documents(created_res_id) + assert res["code"] == RetCode.SUCCESS and res["data"] is True and res["message"] == "" + + res = ragflow.stop_parsing_documents(created_res_id) + assert res["code"] == RetCode.SUCCESS and res["data"] is True and res["message"] == "" + + def test_stop_parsing_multiple_documents_with_one_empty_file(self): + """ + Test the stopping parsing documents, one of which is empty. + """ + # create a dataset + ragflow = RAGFlow(API_KEY, HOST_ADDRESS) + created_res = ragflow.create_dataset(" test_start_parsing_multiple_documents") + created_res_id = created_res["data"]["dataset_id"] + # upload files + file_paths = ["test_data/test.txt", "test_data/test1.txt", "test_data/empty.txt"] + ragflow.upload_local_file(created_res_id, file_paths) + res = ragflow.start_parsing_documents(created_res_id) + assert res["code"] == RetCode.SUCCESS and res["message"] == "Empty data in the document: empty.txt; " + res = ragflow.stop_parsing_documents(created_res_id) + assert res["code"] == RetCode.SUCCESS and res["data"] is True and res["message"] == "" + + def test_stop_parsing_multiple_specific_documents(self): + """ + Test the stopping parsing documents whose document ids are specified. + """ + # create a dataset + ragflow = RAGFlow(API_KEY, HOST_ADDRESS) + created_res = ragflow.create_dataset(" test_start_parsing_multiple_documents") + created_res_id = created_res["data"]["dataset_id"] + # upload files + file_paths = ["test_data/test.txt", "test_data/test1.txt"] + uploading_res = ragflow.upload_local_file(created_res_id, file_paths) + # get the doc_id + data = uploading_res["data"] + doc_ids = [] + for d in data: + doc_ids.append(d["id"]) + res = ragflow.start_parsing_documents(created_res_id, doc_ids) + assert res["code"] == RetCode.SUCCESS and res["message"] == "" + res = ragflow.stop_parsing_documents(created_res_id, doc_ids) + assert res["code"] == RetCode.SUCCESS and res["data"] is True and res["message"] == "" # ----------------------------show the status of the file----------------------------------------------------- def test_show_status_with_success(self):