mirror of
https://git.mirrors.martin98.com/https://github.com/infiniflow/ragflow.git
synced 2025-04-22 06:00:00 +08:00

### What problem does this PR solve? cover [retrieval chunk](https://ragflow.io/docs/v0.17.2/http_api_reference#retrieve-chunks) endpoints ### Type of change - [x] add test cases
187 lines
6.2 KiB
Python
187 lines
6.2 KiB
Python
#
|
|
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
import os
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
from libs.utils.file_utils import create_txt_file
|
|
from requests_toolbelt import MultipartEncoder
|
|
|
|
HEADERS = {"Content-Type": "application/json"}
|
|
HOST_ADDRESS = os.getenv("HOST_ADDRESS", "http://127.0.0.1:9380")
|
|
DATASETS_API_URL = "/api/v1/datasets"
|
|
FILE_API_URL = "/api/v1/datasets/{dataset_id}/documents"
|
|
FILE_CHUNK_API_URL = "/api/v1/datasets/{dataset_id}/chunks"
|
|
CHUNK_API_URL = "/api/v1/datasets/{dataset_id}/documents/{document_id}/chunks"
|
|
|
|
INVALID_API_TOKEN = "invalid_key_123"
|
|
DATASET_NAME_LIMIT = 128
|
|
DOCUMENT_NAME_LIMIT = 128
|
|
|
|
|
|
# DATASET MANAGEMENT
|
|
def create_dataset(auth, payload=None):
|
|
res = requests.post(url=f"{HOST_ADDRESS}{DATASETS_API_URL}", headers=HEADERS, auth=auth, json=payload)
|
|
return res.json()
|
|
|
|
|
|
def list_dataset(auth, params=None):
|
|
res = requests.get(url=f"{HOST_ADDRESS}{DATASETS_API_URL}", headers=HEADERS, auth=auth, params=params)
|
|
return res.json()
|
|
|
|
|
|
def update_dataset(auth, dataset_id, payload=None):
|
|
res = requests.put(url=f"{HOST_ADDRESS}{DATASETS_API_URL}/{dataset_id}", headers=HEADERS, auth=auth, json=payload)
|
|
return res.json()
|
|
|
|
|
|
def delete_dataset(auth, payload=None):
|
|
res = requests.delete(url=f"{HOST_ADDRESS}{DATASETS_API_URL}", headers=HEADERS, auth=auth, json=payload)
|
|
return res.json()
|
|
|
|
|
|
def batch_create_datasets(auth, num):
|
|
ids = []
|
|
for i in range(num):
|
|
res = create_dataset(auth, {"name": f"dataset_{i}"})
|
|
ids.append(res["data"]["id"])
|
|
return ids
|
|
|
|
|
|
# FILE MANAGEMENT WITHIN DATASET
|
|
def upload_documnets(auth, dataset_id, files_path=None):
|
|
url = f"{HOST_ADDRESS}{FILE_API_URL}".format(dataset_id=dataset_id)
|
|
|
|
if files_path is None:
|
|
files_path = []
|
|
|
|
fields = []
|
|
file_objects = []
|
|
try:
|
|
for fp in files_path:
|
|
p = Path(fp)
|
|
f = p.open("rb")
|
|
fields.append(("file", (p.name, f)))
|
|
file_objects.append(f)
|
|
m = MultipartEncoder(fields=fields)
|
|
|
|
res = requests.post(
|
|
url=url,
|
|
headers={"Content-Type": m.content_type},
|
|
auth=auth,
|
|
data=m,
|
|
)
|
|
return res.json()
|
|
finally:
|
|
for f in file_objects:
|
|
f.close()
|
|
|
|
|
|
def download_document(auth, dataset_id, document_id, save_path):
|
|
url = f"{HOST_ADDRESS}{FILE_API_URL}/{document_id}".format(dataset_id=dataset_id)
|
|
res = requests.get(url=url, auth=auth, stream=True)
|
|
try:
|
|
if res.status_code == 200:
|
|
with open(save_path, "wb") as f:
|
|
for chunk in res.iter_content(chunk_size=8192):
|
|
f.write(chunk)
|
|
finally:
|
|
res.close()
|
|
|
|
return res
|
|
|
|
|
|
def list_documnet(auth, dataset_id, params=None):
|
|
url = f"{HOST_ADDRESS}{FILE_API_URL}".format(dataset_id=dataset_id)
|
|
res = requests.get(url=url, headers=HEADERS, auth=auth, params=params)
|
|
return res.json()
|
|
|
|
|
|
def update_documnet(auth, dataset_id, document_id, payload=None):
|
|
url = f"{HOST_ADDRESS}{FILE_API_URL}/{document_id}".format(dataset_id=dataset_id)
|
|
res = requests.put(url=url, headers=HEADERS, auth=auth, json=payload)
|
|
return res.json()
|
|
|
|
|
|
def delete_documnet(auth, dataset_id, payload=None):
|
|
url = f"{HOST_ADDRESS}{FILE_API_URL}".format(dataset_id=dataset_id)
|
|
res = requests.delete(url=url, headers=HEADERS, auth=auth, json=payload)
|
|
return res.json()
|
|
|
|
|
|
def parse_documnet(auth, dataset_id, payload=None):
|
|
url = f"{HOST_ADDRESS}{FILE_CHUNK_API_URL}".format(dataset_id=dataset_id)
|
|
res = requests.post(url=url, headers=HEADERS, auth=auth, json=payload)
|
|
return res.json()
|
|
|
|
|
|
def stop_parse_documnet(auth, dataset_id, payload=None):
|
|
url = f"{HOST_ADDRESS}{FILE_CHUNK_API_URL}".format(dataset_id=dataset_id)
|
|
res = requests.delete(url=url, headers=HEADERS, auth=auth, json=payload)
|
|
return res.json()
|
|
|
|
|
|
def bulk_upload_documents(auth, dataset_id, num, tmp_path):
|
|
fps = []
|
|
for i in range(num):
|
|
fp = create_txt_file(tmp_path / f"ragflow_test_upload_{i}.txt")
|
|
fps.append(fp)
|
|
res = upload_documnets(auth, dataset_id, fps)
|
|
document_ids = []
|
|
for document in res["data"]:
|
|
document_ids.append(document["id"])
|
|
return document_ids
|
|
|
|
|
|
# CHUNK MANAGEMENT WITHIN DATASET
|
|
def add_chunk(auth, dataset_id, document_id, payload=None):
|
|
url = f"{HOST_ADDRESS}{CHUNK_API_URL}".format(dataset_id=dataset_id, document_id=document_id)
|
|
res = requests.post(url=url, headers=HEADERS, auth=auth, json=payload)
|
|
return res.json()
|
|
|
|
|
|
def list_chunks(auth, dataset_id, document_id, params=None):
|
|
url = f"{HOST_ADDRESS}{CHUNK_API_URL}".format(dataset_id=dataset_id, document_id=document_id)
|
|
res = requests.get(url=url, headers=HEADERS, auth=auth, params=params)
|
|
return res.json()
|
|
|
|
|
|
def update_chunk(auth, dataset_id, document_id, chunk_id, payload=None):
|
|
url = f"{HOST_ADDRESS}{CHUNK_API_URL}/{chunk_id}".format(dataset_id=dataset_id, document_id=document_id)
|
|
res = requests.put(url=url, headers=HEADERS, auth=auth, json=payload)
|
|
return res.json()
|
|
|
|
|
|
def delete_chunks(auth, dataset_id, document_id, payload=None):
|
|
url = f"{HOST_ADDRESS}{CHUNK_API_URL}".format(dataset_id=dataset_id, document_id=document_id)
|
|
res = requests.delete(url=url, headers=HEADERS, auth=auth, json=payload)
|
|
return res.json()
|
|
|
|
|
|
def retrieval_chunks(auth, payload=None):
|
|
url = f"{HOST_ADDRESS}/api/v1/retrieval"
|
|
res = requests.post(url=url, headers=HEADERS, auth=auth, json=payload)
|
|
return res.json()
|
|
|
|
|
|
def batch_add_chunks(auth, dataset_id, document_id, num):
|
|
chunk_ids = []
|
|
for i in range(num):
|
|
res = add_chunk(auth, dataset_id, document_id, {"content": f"chunk test {i}"})
|
|
chunk_ids.append(res["data"]["chunk"]["id"])
|
|
return chunk_ids
|