mirror of
https://git.mirrors.martin98.com/https://github.com/infiniflow/ragflow.git
synced 2025-04-22 06:00:00 +08:00

### What problem does this PR solve? cover [create chat assistant](https://ragflow.io/docs/v0.17.2/http_api_reference#create-chat-assistant) endpoints ### Type of change - [x] add test cases
129 lines
4.0 KiB
Python
129 lines
4.0 KiB
Python
#
|
|
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
import pytest
|
|
from common import add_chunk, batch_create_datasets, bulk_upload_documents, delete_chat_assistants, delete_datasets, list_documnets, parse_documnets
|
|
from libs.utils import wait_for
|
|
from libs.utils.file_utils import (
|
|
create_docx_file,
|
|
create_eml_file,
|
|
create_excel_file,
|
|
create_html_file,
|
|
create_image_file,
|
|
create_json_file,
|
|
create_md_file,
|
|
create_pdf_file,
|
|
create_ppt_file,
|
|
create_txt_file,
|
|
)
|
|
|
|
|
|
@wait_for(30, 1, "Document parsing timeout")
|
|
def condition(_auth, _dataset_id):
|
|
res = list_documnets(_auth, _dataset_id)
|
|
for doc in res["data"]["docs"]:
|
|
if doc["run"] != "DONE":
|
|
return False
|
|
return True
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def clear_datasets(get_http_api_auth):
|
|
yield
|
|
delete_datasets(get_http_api_auth)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def clear_chat_assistants(get_http_api_auth):
|
|
yield
|
|
delete_chat_assistants(get_http_api_auth)
|
|
|
|
|
|
@pytest.fixture
|
|
def generate_test_files(request, tmp_path):
|
|
file_creators = {
|
|
"docx": (tmp_path / "ragflow_test.docx", create_docx_file),
|
|
"excel": (tmp_path / "ragflow_test.xlsx", create_excel_file),
|
|
"ppt": (tmp_path / "ragflow_test.pptx", create_ppt_file),
|
|
"image": (tmp_path / "ragflow_test.png", create_image_file),
|
|
"pdf": (tmp_path / "ragflow_test.pdf", create_pdf_file),
|
|
"txt": (tmp_path / "ragflow_test.txt", create_txt_file),
|
|
"md": (tmp_path / "ragflow_test.md", create_md_file),
|
|
"json": (tmp_path / "ragflow_test.json", create_json_file),
|
|
"eml": (tmp_path / "ragflow_test.eml", create_eml_file),
|
|
"html": (tmp_path / "ragflow_test.html", create_html_file),
|
|
}
|
|
|
|
files = {}
|
|
for file_type, (file_path, creator_func) in file_creators.items():
|
|
if request.param in ["", file_type]:
|
|
creator_func(file_path)
|
|
files[file_type] = file_path
|
|
return files
|
|
|
|
|
|
@pytest.fixture(scope="class")
|
|
def ragflow_tmp_dir(request, tmp_path_factory):
|
|
class_name = request.cls.__name__
|
|
return tmp_path_factory.mktemp(class_name)
|
|
|
|
|
|
@pytest.fixture(scope="class")
|
|
def add_dataset(request, get_http_api_auth):
|
|
def cleanup():
|
|
delete_datasets(get_http_api_auth)
|
|
|
|
request.addfinalizer(cleanup)
|
|
|
|
dataset_ids = batch_create_datasets(get_http_api_auth, 1)
|
|
return dataset_ids[0]
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def add_dataset_func(request, get_http_api_auth):
|
|
def cleanup():
|
|
delete_datasets(get_http_api_auth)
|
|
|
|
request.addfinalizer(cleanup)
|
|
|
|
dataset_ids = batch_create_datasets(get_http_api_auth, 1)
|
|
return dataset_ids[0]
|
|
|
|
|
|
@pytest.fixture(scope="class")
|
|
def add_document(get_http_api_auth, add_dataset, ragflow_tmp_dir):
|
|
dataset_id = add_dataset
|
|
document_ids = bulk_upload_documents(get_http_api_auth, dataset_id, 1, ragflow_tmp_dir)
|
|
return dataset_id, document_ids[0]
|
|
|
|
|
|
@pytest.fixture(scope="class")
|
|
def add_chunks(get_http_api_auth, add_document):
|
|
dataset_id, document_id = add_document
|
|
parse_documnets(get_http_api_auth, dataset_id, {"document_ids": [document_id]})
|
|
condition(get_http_api_auth, dataset_id)
|
|
|
|
chunk_ids = []
|
|
for i in range(4):
|
|
res = add_chunk(get_http_api_auth, dataset_id, document_id, {"content": f"chunk test {i}"})
|
|
chunk_ids.append(res["data"]["chunk"]["id"])
|
|
|
|
# issues/6487
|
|
from time import sleep
|
|
|
|
sleep(1)
|
|
return dataset_id, document_id, chunk_ids
|