Fix chunk number error after re-parsing. (#4043)

### What problem does this PR solve?


### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
Kevin Hu 2024-12-16 15:23:49 +08:00 committed by GitHub
parent 44ac87aef4
commit 7fb67c4f67
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 9 additions and 5 deletions

View File

@ -356,12 +356,11 @@ def run():
try:
for id in req["doc_ids"]:
info = {"run": str(req["run"]), "progress": 0}
if str(req["run"]) == TaskStatus.RUNNING.value:
if str(req["run"]) == TaskStatus.RUNNING.value and req.get("delete", False):
info["progress_msg"] = ""
info["chunk_num"] = 0
info["token_num"] = 0
DocumentService.update_by_id(id, info)
# if str(req["run"]) == TaskStatus.CANCEL.value:
tenant_id = DocumentService.get_tenant_id(id)
if not tenant_id:
return get_data_error_result(message="Tenant not found!")

View File

@ -248,8 +248,9 @@ def queue_tasks(doc: dict, bucket: str, name: str):
prev_tasks = TaskService.get_tasks(doc["id"])
if prev_tasks:
ck_num = 0
for task in tsks:
reuse_prev_task_chunks(task, prev_tasks, chunking_config)
ck_num += reuse_prev_task_chunks(task, prev_tasks, chunking_config)
TaskService.filter_delete([Task.doc_id == doc["id"]])
chunk_ids = []
for task in prev_tasks:
@ -257,6 +258,7 @@ def queue_tasks(doc: dict, bucket: str, name: str):
chunk_ids.extend(task["chunk_ids"].split())
if chunk_ids:
settings.docStoreConn.delete({"id": chunk_ids}, search.index_name(chunking_config["tenant_id"]), chunking_config["kb_id"])
DocumentService.update_by_id(doc["id"], {"chunk_num": ck_num})
bulk_insert_into_db(Task, tsks, True)
DocumentService.begin2parse(doc["id"])
@ -267,14 +269,17 @@ def queue_tasks(doc: dict, bucket: str, name: str):
SVR_QUEUE_NAME, message=t
), "Can't access Redis. Please check the Redis' status."
def reuse_prev_task_chunks(task: dict, prev_tasks: list[dict], chunking_config: dict):
idx = bisect.bisect_left(prev_tasks, task["from_page"], key=lambda x: x["from_page"])
if idx >= len(prev_tasks):
return
return 0
prev_task = prev_tasks[idx]
if prev_task["progress"] < 1.0 or prev_task["digest"] != task["digest"] or not prev_task["chunk_ids"]:
return
return 0
task["chunk_ids"] = prev_task["chunk_ids"]
task["progress"] = 1.0
task["progress_msg"] = f"Page({task['from_page']}~{task['to_page']}): reused previous task's chunks"
prev_task["chunk_ids"] = ""
return len(task["chunk_ids"].split())