Fix: Opensearch chunk management (#7802)

### What problem does this PR solve?

This PR solve the problems metioned in the
pr(https://github.com/infiniflow/ragflow/pull/7140) which is also
submitted by me

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [ ] New Feature (non-breaking change which adds functionality)
- [ ] Documentation Update
- [ ] Refactoring
- [ ] Performance Improvement
- [ ] Other (please describe):


### Introduction
I fixed the problems when using OpenSearch as the DOC_ENGINE, the
failures of pytest and the wrong API's return.
Mainly about delete chunk, list chunks, update chunk, retrieval chunk.
The pytest comand "cd sdk/python && uv sync --python 3.10 --group test
--frozen && source .venv/bin/activate && cd test/test_http_api &&
DOC_ENGINE=opensearch pytest test_chunk_management_within_dataset -s
--tb=short " is finally successful.

###Others
As some changes between Elasticsearch And Opensearch differ, some pytest
results about OpenSearch are correct and resonable. However, some pytest
params (skipif params) are incompatible. So I changed some pytest params
about skipif.

As a search engine programmer, I will still focus on the usage of vector
databases (especially OpenSearch) for the RAG stuff.
Thanks for your review
This commit is contained in:
pyyuhao 2025-05-26 16:57:58 +08:00 committed by GitHub
parent c09bd9fe4a
commit 5d6bf2224a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 13 additions and 10 deletions

View File

@ -217,7 +217,7 @@ class OSConnection(DocStoreConnection):
if bqry: if bqry:
s = s.query(bqry) s = s.query(bqry)
for field in highlightFields: for field in highlightFields:
s = s.highlight(field) s = s.highlight(field,force_source=True,no_match_size=30,require_field_match=False)
if orderBy: if orderBy:
orders = list() orders = list()
@ -269,7 +269,7 @@ class OSConnection(DocStoreConnection):
for i in range(ATTEMPT_TIME): for i in range(ATTEMPT_TIME):
try: try:
res = self.os.get(index=(indexName), res = self.os.get(index=(indexName),
id=chunkId, source=True, ) id=chunkId, _source=True, )
if str(res.get("timed_out", "")).lower() == "true": if str(res.get("timed_out", "")).lower() == "true":
raise Exception("Es Timeout.") raise Exception("Es Timeout.")
chunk = res["_source"] chunk = res["_source"]
@ -329,7 +329,7 @@ class OSConnection(DocStoreConnection):
chunkId = condition["id"] chunkId = condition["id"]
for i in range(ATTEMPT_TIME): for i in range(ATTEMPT_TIME):
try: try:
self.os.update(index=indexName, id=chunkId, doc=doc) self.os.update(index=indexName, id=chunkId, body=doc)
return True return True
except Exception as e: except Exception as e:
logger.exception( logger.exception(
@ -411,7 +411,10 @@ class OSConnection(DocStoreConnection):
chunk_ids = condition["id"] chunk_ids = condition["id"]
if not isinstance(chunk_ids, list): if not isinstance(chunk_ids, list):
chunk_ids = [chunk_ids] chunk_ids = [chunk_ids]
qry = Q("ids", values=chunk_ids) if not chunk_ids: # when chunk_ids is empty, delete all
qry = Q("match_all")
else:
qry = Q("ids", values=chunk_ids)
else: else:
qry = Q("bool") qry = Q("bool")
for k, v in condition.items(): for k, v in condition.items():

View File

@ -69,7 +69,7 @@ class TestChunksList:
[ [
({"page_size": None}, 0, 5, ""), ({"page_size": None}, 0, 5, ""),
pytest.param({"page_size": 0}, 0, 5, "", marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") == "infinity", reason="Infinity does not support page_size=0")), pytest.param({"page_size": 0}, 0, 5, "", marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") == "infinity", reason="Infinity does not support page_size=0")),
pytest.param({"page_size": 0}, 100, 0, "3013", marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") in [None, "elasticsearch"], reason="Infinity does not support page_size=0")), pytest.param({"page_size": 0}, 100, 0, "3013", marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") in [None, "opensearch", "elasticsearch"], reason="Infinity does not support page_size=0")),
({"page_size": 1}, 0, 1, ""), ({"page_size": 1}, 0, 1, ""),
({"page_size": 6}, 0, 5, ""), ({"page_size": 6}, 0, 5, ""),
({"page_size": "1"}, 0, 1, ""), ({"page_size": "1"}, 0, 1, ""),

View File

@ -185,28 +185,28 @@ class TestChunksRetrieval:
0, 0,
4, 4,
"", "",
marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") == "infinity", reason="Infinity"), marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") in ["infinity", "opensearch"], reason="Infinity"),
), ),
pytest.param( pytest.param(
{"top_k": 1}, {"top_k": 1},
0, 0,
1, 1,
"", "",
marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") in [None, "elasticsearch"], reason="elasticsearch"), marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") in [None, "opensearch", "elasticsearch"], reason="elasticsearch"),
), ),
pytest.param( pytest.param(
{"top_k": -1}, {"top_k": -1},
100, 100,
4, 4,
"must be greater than 0", "must be greater than 0",
marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") == "infinity", reason="Infinity"), marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") in ["infinity", "opensearch"], reason="Infinity"),
), ),
pytest.param( pytest.param(
{"top_k": -1}, {"top_k": -1},
100, 100,
4, 4,
"3014", "3014",
marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") in [None, "elasticsearch"], reason="elasticsearch"), marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") in [None, "opensearch", "elasticsearch"], reason="elasticsearch"),
), ),
pytest.param( pytest.param(
{"top_k": "a"}, {"top_k": "a"},

View File

@ -146,7 +146,7 @@ class TestUpdatedChunk:
[ [
("", 100, "<NotFound '404: Not Found'>"), ("", 100, "<NotFound '404: Not Found'>"),
pytest.param("invalid_dataset_id", 102, "You don't own the dataset invalid_dataset_id.", marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") == "infinity", reason="infinity")), pytest.param("invalid_dataset_id", 102, "You don't own the dataset invalid_dataset_id.", marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") == "infinity", reason="infinity")),
pytest.param("invalid_dataset_id", 102, "Can't find this chunk", marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") in [None, "elasticsearch"], reason="elasticsearch")), pytest.param("invalid_dataset_id", 102, "Can't find this chunk", marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") in [None, "opensearch","elasticsearch"], reason="elasticsearch")),
], ],
) )
def test_invalid_dataset_id(self, get_http_api_auth, add_chunks, dataset_id, expected_code, expected_message): def test_invalid_dataset_id(self, get_http_api_auth, add_chunks, dataset_id, expected_code, expected_message):