From 08c1a5e1e84025988925a85ff54746c1f49cb2a7 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Sun, 1 Dec 2024 22:28:00 +0800 Subject: [PATCH] Refactor parse progress (#3781) ### What problem does this PR solve? Refactor parse file progress ### Type of change - [x] Refactoring Signed-off-by: jinhai --- rag/app/naive.py | 22 ++++--- rag/svr/task_executor.py | 125 ++++++++++++++++++++------------------- 2 files changed, 73 insertions(+), 74 deletions(-) diff --git a/rag/app/naive.py b/rag/app/naive.py index 4127e88f3..6983582cf 100644 --- a/rag/app/naive.py +++ b/rag/app/naive.py @@ -193,7 +193,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, Next, these successive pieces are merge into chunks whose token number is no more than 'Max token number'. """ - eng = lang.lower() == "english" # is_english(cks) + is_english = lang.lower() == "english" # is_english(cks) parser_config = kwargs.get( "parser_config", { "chunk_token_num": 128, "delimiter": "\n!?。;!?", "layout_recognize": True}) @@ -206,8 +206,8 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, pdf_parser = None if re.search(r"\.docx$", filename, re.IGNORECASE): callback(0.1, "Start to parse.") - sections, tbls = Docx()(filename, binary) - res = tokenize_table(tbls, doc, eng) # just for table + sections, tables = Docx()(filename, binary) + res = tokenize_table(tables, doc, is_english) # just for table callback(0.8, "Finish parsing.") st = timer() @@ -220,16 +220,14 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, if kwargs.get("section_only", False): return chunks - res.extend(tokenize_chunks_docx(chunks, doc, eng, images)) + res.extend(tokenize_chunks_docx(chunks, doc, is_english, images)) logging.info("naive_merge({}): {}".format(filename, timer() - st)) return res elif re.search(r"\.pdf$", filename, re.IGNORECASE): - pdf_parser = Pdf( - ) if parser_config.get("layout_recognize", True) else PlainParser() - sections, tbls = pdf_parser(filename if not binary else binary, - from_page=from_page, to_page=to_page, callback=callback) - res = tokenize_table(tbls, doc, eng) + pdf_parser = Pdf() if parser_config.get("layout_recognize", True) else PlainParser() + sections, tables = pdf_parser(filename if not binary else binary, from_page=from_page, to_page=to_page, callback=callback) + res = tokenize_table(tables, doc, is_english) elif re.search(r"\.xlsx?$", filename, re.IGNORECASE): callback(0.1, "Start to parse.") @@ -248,8 +246,8 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, elif re.search(r"\.(md|markdown)$", filename, re.IGNORECASE): callback(0.1, "Start to parse.") - sections, tbls = Markdown(int(parser_config.get("chunk_token_num", 128)))(filename, binary) - res = tokenize_table(tbls, doc, eng) + sections, tables = Markdown(int(parser_config.get("chunk_token_num", 128)))(filename, binary) + res = tokenize_table(tables, doc, is_english) callback(0.8, "Finish parsing.") elif re.search(r"\.(htm|html)$", filename, re.IGNORECASE): @@ -289,7 +287,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, if kwargs.get("section_only", False): return chunks - res.extend(tokenize_chunks(chunks, doc, eng, pdf_parser)) + res.extend(tokenize_chunks(chunks, doc, is_english, pdf_parser)) logging.info("naive_merge({}): {}".format(filename, timer() - st)) return res diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index a3a141ac0..b4d86f476 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -19,6 +19,7 @@ import logging import sys + from api.utils.log_utils import initRootLogger CONSUMER_NO = "0" if len(sys.argv) < 2 else sys.argv[1] @@ -166,52 +167,44 @@ def get_storage_binary(bucket, name): return STORAGE_IMPL.get(bucket, name) -def build(row): - if row["size"] > DOC_MAXIMUM_SIZE: - set_progress(row["id"], prog=-1, msg="File size exceeds( <= %dMb )" % - (int(DOC_MAXIMUM_SIZE / 1024 / 1024))) +def build_chunks(task, progress_callback): + if task["size"] > DOC_MAXIMUM_SIZE: + set_progress(task["id"], prog=-1, msg="File size exceeds( <= %dMb )" % + (int(DOC_MAXIMUM_SIZE / 1024 / 1024))) return [] - callback = partial( - set_progress, - row["id"], - row["from_page"], - row["to_page"]) - chunker = FACTORY[row["parser_id"].lower()] + chunker = FACTORY[task["parser_id"].lower()] try: st = timer() - bucket, name = File2DocumentService.get_storage_address(doc_id=row["doc_id"]) + bucket, name = File2DocumentService.get_storage_address(doc_id=task["doc_id"]) binary = get_storage_binary(bucket, name) - logging.info( - "From minio({}) {}/{}".format(timer() - st, row["location"], row["name"])) + logging.info("From minio({}) {}/{}".format(timer() - st, task["location"], task["name"])) except TimeoutError: - callback(-1, "Internal server error: Fetch file from minio timeout. Could you try it again.") - logging.exception( - "Minio {}/{} got timeout: Fetch file from minio timeout.".format(row["location"], row["name"])) + progress_callback(-1, "Internal server error: Fetch file from minio timeout. Could you try it again.") + logging.exception("Minio {}/{} got timeout: Fetch file from minio timeout.".format(task["location"], task["name"])) raise except Exception as e: if re.search("(No such file|not found)", str(e)): - callback(-1, "Can not find file <%s> from minio. Could you try it again?" % row["name"]) + progress_callback(-1, "Can not find file <%s> from minio. Could you try it again?" % task["name"]) else: - callback(-1, "Get file from minio: %s" % str(e).replace("'", "")) - logging.exception("Chunking {}/{} got exception".format(row["location"], row["name"])) + progress_callback(-1, "Get file from minio: %s" % str(e).replace("'", "")) + logging.exception("Chunking {}/{} got exception".format(task["location"], task["name"])) raise try: - cks = chunker.chunk(row["name"], binary=binary, from_page=row["from_page"], - to_page=row["to_page"], lang=row["language"], callback=callback, - kb_id=row["kb_id"], parser_config=row["parser_config"], tenant_id=row["tenant_id"]) - logging.info("Chunking({}) {}/{} done".format(timer() - st, row["location"], row["name"])) + cks = chunker.chunk(task["name"], binary=binary, from_page=task["from_page"], + to_page=task["to_page"], lang=task["language"], callback=progress_callback, + kb_id=task["kb_id"], parser_config=task["parser_config"], tenant_id=task["tenant_id"]) + logging.info("Chunking({}) {}/{} done".format(timer() - st, task["location"], task["name"])) except Exception as e: - callback(-1, "Internal server error while chunking: %s" % - str(e).replace("'", "")) - logging.exception("Chunking {}/{} got exception".format(row["location"], row["name"])) + progress_callback(-1, "Internal server error while chunking: %s" % str(e).replace("'", "")) + logging.exception("Chunking {}/{} got exception".format(task["location"], task["name"])) raise docs = [] doc = { - "doc_id": row["doc_id"], - "kb_id": str(row["kb_id"]) + "doc_id": task["doc_id"], + "kb_id": str(task["kb_id"]) } el = 0 for ck in cks: @@ -240,41 +233,40 @@ def build(row): d["image"].save(output_buffer, format='JPEG') st = timer() - STORAGE_IMPL.put(row["kb_id"], d["id"], output_buffer.getvalue()) + STORAGE_IMPL.put(task["kb_id"], d["id"], output_buffer.getvalue()) el += timer() - st except Exception: - logging.exception( - "Saving image of chunk {}/{}/{} got exception".format(row["location"], row["name"], d["_id"])) + logging.exception("Saving image of chunk {}/{}/{} got exception".format(task["location"], task["name"], d["_id"])) raise - d["img_id"] = "{}-{}".format(row["kb_id"], d["id"]) + d["img_id"] = "{}-{}".format(task["kb_id"], d["id"]) del d["image"] docs.append(d) - logging.info("MINIO PUT({}):{}".format(row["name"], el)) + logging.info("MINIO PUT({}):{}".format(task["name"], el)) - if row["parser_config"].get("auto_keywords", 0): + if task["parser_config"].get("auto_keywords", 0): st = timer() - callback(msg="Start to generate keywords for every chunk ...") - chat_mdl = LLMBundle(row["tenant_id"], LLMType.CHAT, llm_name=row["llm_id"], lang=row["language"]) + progress_callback(msg="Start to generate keywords for every chunk ...") + chat_mdl = LLMBundle(task["tenant_id"], LLMType.CHAT, llm_name=task["llm_id"], lang=task["language"]) for d in docs: d["important_kwd"] = keyword_extraction(chat_mdl, d["content_with_weight"], - row["parser_config"]["auto_keywords"]).split(",") + task["parser_config"]["auto_keywords"]).split(",") d["important_tks"] = rag_tokenizer.tokenize(" ".join(d["important_kwd"])) - callback(msg="Keywords generation completed in {:.2f}s".format(timer() - st)) + progress_callback(msg="Keywords generation completed in {:.2f}s".format(timer() - st)) - if row["parser_config"].get("auto_questions", 0): + if task["parser_config"].get("auto_questions", 0): st = timer() - callback(msg="Start to generate questions for every chunk ...") - chat_mdl = LLMBundle(row["tenant_id"], LLMType.CHAT, llm_name=row["llm_id"], lang=row["language"]) + progress_callback(msg="Start to generate questions for every chunk ...") + chat_mdl = LLMBundle(task["tenant_id"], LLMType.CHAT, llm_name=task["llm_id"], lang=task["language"]) for d in docs: - qst = question_proposal(chat_mdl, d["content_with_weight"], row["parser_config"]["auto_questions"]) + qst = question_proposal(chat_mdl, d["content_with_weight"], task["parser_config"]["auto_questions"]) d["content_with_weight"] = f"Question: \n{qst}\n\nAnswer:\n" + d["content_with_weight"] qst = rag_tokenizer.tokenize(qst) if "content_ltks" in d: d["content_ltks"] += " " + qst if "content_sm_ltks" in d: d["content_sm_ltks"] += " " + rag_tokenizer.fine_grained_tokenize(qst) - callback(msg="Question generation completed in {:.2f}s".format(timer() - st)) + progress_callback(msg="Question generation completed in {:.2f}s".format(timer() - st)) return docs @@ -389,7 +381,9 @@ def do_handle_task(task): # bind embedding model embedding_model = LLMBundle(task_tenant_id, LLMType.EMBEDDING, llm_name=task_embedding_id, lang=task_language) except Exception as e: - progress_callback(-1, msg=f'Fail to bind embedding model: {str(e)}') + error_message = f'Fail to bind embedding model: {str(e)}' + progress_callback(-1, msg=error_message) + logging.exception(error_message) raise # Either using RAPTOR or Standard chunking methods @@ -399,14 +393,16 @@ def do_handle_task(task): chat_model = LLMBundle(task_tenant_id, LLMType.CHAT, llm_name=task_llm_id, lang=task_language) # run RAPTOR - chunks, tk_count, vector_size = run_raptor(task, chat_model, embedding_model, progress_callback) + chunks, token_count, vector_size = run_raptor(task, chat_model, embedding_model, progress_callback) except Exception as e: - progress_callback(-1, msg=f'Fail to bind LLM used by RAPTOR: {str(e)}') + error_message = f'Fail to bind LLM used by RAPTOR: {str(e)}' + progress_callback(-1, msg=error_message) + logging.exception(error_message) raise else: # Standard chunking methods start_ts = timer() - chunks = build(task) + chunks = build_chunks(task, progress_callback) logging.info("Build document {}: {:.2f}s".format(task_document_name, timer() - start_ts)) if chunks is None: return @@ -418,38 +414,43 @@ def do_handle_task(task): progress_callback(msg="Generate {} chunks".format(len(chunks))) start_ts = timer() try: - tk_count, vector_size = embedding(chunks, embedding_model, task_parser_config, progress_callback) + token_count, vector_size = embedding(chunks, embedding_model, task_parser_config, progress_callback) except Exception as e: - progress_callback(-1, "Generate embedding error:{}".format(str(e))) - logging.exception("run_embedding got exception") - tk_count = 0 + error_message = "Generate embedding error:{}".format(str(e)) + progress_callback(-1, error_message) + logging.exception(error_message) + token_count = 0 raise - logging.info("Embedding {} elapsed: {:.2f}".format(task_document_name, timer() - start_ts)) - progress_callback(msg="Embedding chunks ({:.2f}s)".format(timer() - start_ts)) + progress_message = "Embedding chunks ({:.2f}s)".format(timer() - start_ts) + logging.info(progress_message) + progress_callback(msg=progress_message) # logging.info(f"task_executor init_kb index {search.index_name(task_tenant_id)} embedding_model {embedding_model.llm_name} vector length {vector_size}") init_kb(task, vector_size) chunk_count = len(set([chunk["id"] for chunk in chunks])) start_ts = timer() - es_r = "" + doc_store_result = "" es_bulk_size = 4 for b in range(0, len(chunks), es_bulk_size): - es_r = settings.docStoreConn.insert(chunks[b:b + es_bulk_size], search.index_name(task_tenant_id), task_dataset_id) + doc_store_result = settings.docStoreConn.insert(chunks[b:b + es_bulk_size], search.index_name(task_tenant_id), task_dataset_id) if b % 128 == 0: progress_callback(prog=0.8 + 0.1 * (b + 1) / len(chunks), msg="") logging.info("Indexing {} elapsed: {:.2f}".format(task_document_name, timer() - start_ts)) - if es_r: - progress_callback(-1, "Insert chunk error, detail info please check log file. Please also check Elasticsearch/Infinity status!") + if doc_store_result: + error_message = "Insert chunk error: {doc_store_result}, please check log file and Elasticsearch/Infinity status!" + progress_callback(-1, msg=error_message) settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id) - logging.error('Insert chunk error: ' + str(es_r)) - raise Exception('Insert chunk error: ' + str(es_r)) + logging.error(error_message) + raise Exception(error_message) if TaskService.do_cancel(task_id): settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id) return - progress_callback(1., msg="Finish Index ({:.2f}s)".format(timer() - start_ts)) - DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, tk_count, chunk_count, 0) - logging.info("Chunk doc({}), token({}), chunks({}), elapsed:{:.2f}".format(task_id, tk_count, len(chunks), timer() - start_ts)) + DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, token_count, chunk_count, 0) + + time_cost = timer() - start_ts + progress_callback(prog=1.0, msg="Done ({:.2f}s)".format(time_cost)) + logging.info("Chunk doc({}), token({}), chunks({}), elapsed:{:.2f}".format(task_id, token_count, len(chunks), time_cost)) def handle_task():