From e84bf35e2acd1bb24036e8a7b617e0efc344be67 Mon Sep 17 00:00:00 2001 From: huangzhuo1949 <167434202+huangzhuo1949@users.noreply.github.com> Date: Thu, 9 Jan 2025 15:16:41 +0800 Subject: [PATCH] fix: same chunk insert deadlock (#12502) Co-authored-by: huangzhuo --- api/core/indexing_runner.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/api/core/indexing_runner.py b/api/core/indexing_runner.py index 05000c5400..1bc4baf9c0 100644 --- a/api/core/indexing_runner.py +++ b/api/core/indexing_runner.py @@ -530,7 +530,6 @@ class IndexingRunner: # chunk nodes by chunk size indexing_start_at = time.perf_counter() tokens = 0 - chunk_size = 10 if dataset_document.doc_form != IndexType.PARENT_CHILD_INDEX: # create keyword index create_keyword_thread = threading.Thread( @@ -539,11 +538,22 @@ class IndexingRunner: ) create_keyword_thread.start() + max_workers = 10 if dataset.indexing_technique == "high_quality": - with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] - for i in range(0, len(documents), chunk_size): - chunk_documents = documents[i : i + chunk_size] + + # Distribute documents into multiple groups based on the hash values of page_content + # This is done to prevent multiple threads from processing the same document, + # Thereby avoiding potential database insertion deadlocks + document_groups: list[list[Document]] = [[] for _ in range(max_workers)] + for document in documents: + hash = helper.generate_text_hash(document.page_content) + group_index = int(hash, 16) % max_workers + document_groups[group_index].append(document) + for chunk_documents in document_groups: + if len(chunk_documents) == 0: + continue futures.append( executor.submit( self._process_chunk,