diff --git a/sdk/python/test/libs/utils/__init__.py b/sdk/python/test/libs/utils/__init__.py index 86e9a8b6c..c6c384812 100644 --- a/sdk/python/test/libs/utils/__init__.py +++ b/sdk/python/test/libs/utils/__init__.py @@ -15,7 +15,9 @@ # import base64 +import functools import hashlib +import time from pathlib import Path @@ -35,3 +37,22 @@ def compare_by_hash(file1, file2, algorithm="sha256"): return hash_func.hexdigest() return _calc_hash(file1) == _calc_hash(file2) + + +def wait_for(timeout=10, interval=1, error_msg="Timeout"): + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + start_time = time.time() + while True: + result = func(*args, **kwargs) + if result is True: + return result + elapsed = time.time() - start_time + if elapsed > timeout: + assert False, error_msg + time.sleep(interval) + + return wrapper + + return decorator diff --git a/sdk/python/test/test_http_api/common.py b/sdk/python/test/test_http_api/common.py index 07be57931..205d9020c 100644 --- a/sdk/python/test/test_http_api/common.py +++ b/sdk/python/test/test_http_api/common.py @@ -25,6 +25,7 @@ HEADERS = {"Content-Type": "application/json"} HOST_ADDRESS = os.getenv("HOST_ADDRESS", "http://127.0.0.1:9380") DATASETS_API_URL = "/api/v1/datasets" FILE_API_URL = "/api/v1/datasets/{dataset_id}/documents" +FILE_CHUNK_API_URL = "/api/v1/datasets/{dataset_id}/chunks" INVALID_API_TOKEN = "invalid_key_123" DATASET_NAME_LIMIT = 128 @@ -32,7 +33,7 @@ DOCUMENT_NAME_LIMIT = 128 # DATASET MANAGEMENT -def create_dataset(auth, payload): +def create_dataset(auth, payload=None): res = requests.post( url=f"{HOST_ADDRESS}{DATASETS_API_URL}", headers=HEADERS, @@ -52,7 +53,7 @@ def list_dataset(auth, params=None): return res.json() -def update_dataset(auth, dataset_id, payload): +def update_dataset(auth, dataset_id, payload=None): res = requests.put( url=f"{HOST_ADDRESS}{DATASETS_API_URL}/{dataset_id}", headers=HEADERS, @@ -146,7 +147,7 @@ def list_documnet(auth, dataset_id, params=None): return res.json() -def update_documnet(auth, dataset_id, document_id, payload): +def update_documnet(auth, dataset_id, document_id, payload=None): url = f"{HOST_ADDRESS}{FILE_API_URL}/{document_id}".format(dataset_id=dataset_id) res = requests.put(url=url, headers=HEADERS, auth=auth, json=payload) return res.json() @@ -156,3 +157,9 @@ def delete_documnet(auth, dataset_id, payload=None): url = f"{HOST_ADDRESS}{FILE_API_URL}".format(dataset_id=dataset_id) res = requests.delete(url=url, headers=HEADERS, auth=auth, json=payload) return res.json() + + +def parse_documnet(auth, dataset_id, payload=None): + url = f"{HOST_ADDRESS}{FILE_CHUNK_API_URL}".format(dataset_id=dataset_id) + res = requests.post(url=url, headers=HEADERS, auth=auth, json=payload) + return res.json() diff --git a/sdk/python/test/test_http_api/test_dataset_mangement/test_delete_dataset.py b/sdk/python/test/test_http_api/test_dataset_mangement/test_delete_dataset.py index 0e7bc7386..3ac8fc707 100644 --- a/sdk/python/test/test_http_api/test_dataset_mangement/test_delete_dataset.py +++ b/sdk/python/test/test_http_api/test_dataset_mangement/test_delete_dataset.py @@ -116,6 +116,15 @@ class TestDatasetDeletion: assert res["code"] == 102 assert res["message"] == f"You don't own the dataset {ids[0]}" + def test_duplicate_deletion(self, get_http_api_auth): + ids = create_datasets(get_http_api_auth, 1) + res = delete_dataset(get_http_api_auth, {"ids": ids + ids}) + assert res["code"] == 0 + assert res["data"]["success_count"] == 1 + + res = list_dataset(get_http_api_auth) + assert len(res["data"]) == 0 + def test_concurrent_deletion(self, get_http_api_auth): ids = create_datasets(get_http_api_auth, 100) diff --git a/sdk/python/test/test_http_api/test_file_management_within_dataset/test_delete_documents.py b/sdk/python/test/test_http_api/test_file_management_within_dataset/test_delete_documents.py index 3fd51bb9f..3be7de82e 100644 --- a/sdk/python/test/test_http_api/test_file_management_within_dataset/test_delete_documents.py +++ b/sdk/python/test/test_http_api/test_file_management_within_dataset/test_delete_documents.py @@ -146,6 +146,19 @@ class TestDocumentDeletion: assert res["code"] in [102, 500] #assert res["message"] == "Document not found!" + @pytest.mark.xfail(reason="issues/6234") + def test_duplicate_deletion(self, get_http_api_auth, tmp_path): + ids = create_datasets(get_http_api_auth, 1) + document_ids = batch_upload_documents(get_http_api_auth, ids[0], 1, tmp_path) + res = delete_documnet( + get_http_api_auth, ids[0], {"ids": document_ids + document_ids} + ) + assert res["code"] == 0 + + res = list_documnet(get_http_api_auth, ids[0]) + assert len(res["data"]["docs"]) == 0 + assert res["data"]["total"] == 0 + def test_concurrent_deletion(self, get_http_api_auth, tmp_path): documnets_num = 100 ids = create_datasets(get_http_api_auth, 1) diff --git a/sdk/python/test/test_http_api/test_file_management_within_dataset/test_parse_documents.py b/sdk/python/test/test_http_api/test_file_management_within_dataset/test_parse_documents.py new file mode 100644 index 000000000..decbe513d --- /dev/null +++ b/sdk/python/test/test_http_api/test_file_management_within_dataset/test_parse_documents.py @@ -0,0 +1,292 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from concurrent.futures import ThreadPoolExecutor + +import pytest +from common import ( + INVALID_API_TOKEN, + batch_upload_documents, + create_datasets, + list_documnet, + parse_documnet, +) +from libs.auth import RAGFlowHttpApiAuth +from libs.utils import wait_for + + +def validate_document_details(auth, dataset_id, document_ids): + for document_id in document_ids: + res = list_documnet(auth, dataset_id, params={"id": document_id}) + doc = res["data"]["docs"][0] + assert doc["run"] == "DONE" + assert len(doc["process_begin_at"]) > 0 + assert doc["process_duation"] > 0 + assert doc["progress"] > 0 + assert "Task done" in doc["progress_msg"] + + +class TestAuthorization: + @pytest.mark.parametrize( + "auth, expected_code, expected_message", + [ + (None, 0, "`Authorization` can't be empty"), + ( + RAGFlowHttpApiAuth(INVALID_API_TOKEN), + 109, + "Authentication error: API key is invalid!", + ), + ], + ) + def test_invalid_auth( + self, get_http_api_auth, tmp_path, auth, expected_code, expected_message + ): + ids = create_datasets(get_http_api_auth, 1) + document_ids = batch_upload_documents(get_http_api_auth, ids[0], 1, tmp_path) + res = parse_documnet(auth, ids[0], {"document_ids": document_ids[0]}) + assert res["code"] == expected_code + assert res["message"] == expected_message + + +class TestDatasetParse: + @pytest.mark.parametrize( + "payload, expected_code, expected_message", + [ + pytest.param( + None, + 102, + """AttributeError("\'NoneType\' object has no attribute \'get\'")""", + marks=pytest.mark.xfail, + ), + ({"document_ids": []}, 102, "`document_ids` is required"), + ( + {"document_ids": ["invalid_id"]}, + 102, + "You don't own the document invalid_id.", + ), + ( + {"document_ids": ["\n!?。;!?\"'"]}, + 102, + """You don\'t own the document \n!?。;!?"\'.""", + ), + pytest.param( + "not json", + 102, + "AttributeError(\"'str' object has no attribute 'get'\")", + marks=pytest.mark.xfail, + ), + (lambda r: {"document_ids": r[:1]}, 0, ""), + (lambda r: {"document_ids": r}, 0, ""), + ], + ) + def test_basic_scenarios( + self, get_http_api_auth, tmp_path, payload, expected_code, expected_message + ): + @wait_for(10, 1, "Document parsing timeout") + def condition(_auth, _dataset_id, _document_ids): + for _document_id in _document_ids: + res = list_documnet(_auth, _dataset_id, {"id": _document_id}) + if res["data"]["docs"][0]["run"] != "DONE": + return False + return True + + ids = create_datasets(get_http_api_auth, 1) + dataset_id = ids[0] + document_ids = batch_upload_documents( + get_http_api_auth, dataset_id, 3, tmp_path + ) + if callable(payload): + payload = payload(document_ids) + res = parse_documnet(get_http_api_auth, dataset_id, payload) + assert res["code"] == expected_code + if expected_code != 0: + assert res["message"] == expected_message + else: + condition(get_http_api_auth, dataset_id, payload["document_ids"]) + validate_document_details( + get_http_api_auth, dataset_id, payload["document_ids"] + ) + + @pytest.mark.parametrize( + "dataset_id, expected_code, expected_message", + [ + ("", 100, ""), + ( + "invalid_dataset_id", + 102, + "You don't own the dataset invalid_dataset_id.", + ), + ], + ) + def test_invalid_dataset_id( + self, + get_http_api_auth, + tmp_path, + dataset_id, + expected_code, + expected_message, + ): + ids = create_datasets(get_http_api_auth, 1) + document_ids = batch_upload_documents(get_http_api_auth, ids[0], 1, tmp_path) + res = parse_documnet( + get_http_api_auth, dataset_id, {"document_ids": document_ids} + ) + assert res["code"] == expected_code + assert res["message"] == expected_message + + @pytest.mark.skip(reason="issues/6229") + @pytest.mark.parametrize( + "payload", + [ + lambda r: {"document_ids": ["invalid_id"] + r}, + lambda r: {"document_ids": r[:1] + ["invalid_id"] + r[1:3]}, + lambda r: {"document_ids": r + ["invalid_id"]}, + ], + ) + def test_parse_partial_invalid_document_id( + self, get_http_api_auth, tmp_path, payload + ): + @wait_for(10, 1, "Document parsing timeout") + def condition(_auth, _dataset_id): + res = list_documnet(_auth, _dataset_id) + for doc in res["data"]["docs"]: + if doc["run"] != "DONE": + return False + return True + + ids = create_datasets(get_http_api_auth, 1) + dataset_id = ids[0] + document_ids = batch_upload_documents( + get_http_api_auth, dataset_id, 3, tmp_path + ) + if callable(payload): + payload = payload(document_ids) + res = parse_documnet(get_http_api_auth, dataset_id, payload) + assert res["code"] == 102 + assert res["message"] == "You don't own the document invalid_id." + + condition(get_http_api_auth, dataset_id) + + validate_document_details(get_http_api_auth, dataset_id, document_ids) + + def test_repeated_parse(self, get_http_api_auth, tmp_path): + @wait_for(10, 1, "Document parsing timeout") + def condition(_auth, _dataset_id): + res = list_documnet(_auth, _dataset_id) + for doc in res["data"]["docs"]: + if doc["run"] != "DONE": + return False + return True + + ids = create_datasets(get_http_api_auth, 1) + dataset_id = ids[0] + document_ids = batch_upload_documents( + get_http_api_auth, dataset_id, 1, tmp_path + ) + res = parse_documnet( + get_http_api_auth, dataset_id, {"document_ids": document_ids} + ) + assert res["code"] == 0 + + condition(get_http_api_auth, dataset_id) + + res = parse_documnet( + get_http_api_auth, dataset_id, {"document_ids": document_ids} + ) + assert res["code"] == 102 + assert res["message"] == "Can't stop parsing document with progress at 0 or 100" + + @pytest.mark.skip(reason="issues/6234") + def test_duplicate_parse(self, get_http_api_auth, tmp_path): + @wait_for(10, 1, "Document parsing timeout") + def condition(_auth, _dataset_id): + res = list_documnet(_auth, _dataset_id) + for doc in res["data"]["docs"]: + if doc["run"] != "DONE": + return False + return True + + ids = create_datasets(get_http_api_auth, 1) + dataset_id = ids[0] + document_ids = batch_upload_documents( + get_http_api_auth, dataset_id, 1, tmp_path + ) + res = parse_documnet( + get_http_api_auth, dataset_id, {"document_ids": document_ids + document_ids} + ) + assert res["code"] == 0 + + condition(get_http_api_auth, dataset_id) + + validate_document_details(get_http_api_auth, dataset_id, document_ids) + + @pytest.mark.slow + def test_parse_100_files(self, get_http_api_auth, tmp_path): + @wait_for(100, 1, "Document parsing timeout") + def condition(_auth, _dataset_id, _document_num): + res = list_documnet(_auth, _dataset_id, {"page_size": _document_num}) + for doc in res["data"]["docs"]: + if doc["run"] != "DONE": + return False + return True + + document_num = 100 + ids = create_datasets(get_http_api_auth, 1) + dataset_id = ids[0] + document_ids = batch_upload_documents( + get_http_api_auth, dataset_id, document_num, tmp_path + ) + res = parse_documnet( + get_http_api_auth, dataset_id, {"document_ids": document_ids} + ) + assert res["code"] == 0 + + condition(get_http_api_auth, dataset_id, document_num) + + validate_document_details(get_http_api_auth, dataset_id, document_ids) + + @pytest.mark.slow + def test_concurrent_parse(self, get_http_api_auth, tmp_path): + @wait_for(120, 1, "Document parsing timeout") + def condition(_auth, _dataset_id, _document_num): + res = list_documnet(_auth, _dataset_id, {"page_size": _document_num}) + for doc in res["data"]["docs"]: + if doc["run"] != "DONE": + return False + return True + + document_num = 100 + ids = create_datasets(get_http_api_auth, 1) + dataset_id = ids[0] + document_ids = batch_upload_documents( + get_http_api_auth, dataset_id, document_num, tmp_path + ) + + with ThreadPoolExecutor(max_workers=5) as executor: + futures = [ + executor.submit( + parse_documnet, + get_http_api_auth, + dataset_id, + {"document_ids": document_ids[i : i + 1]}, + ) + for i in range(document_num) + ] + responses = [f.result() for f in futures] + assert all(r["code"] == 0 for r in responses) + + condition(get_http_api_auth, dataset_id, document_num) + + validate_document_details(get_http_api_auth, dataset_id, document_ids)