Refactor parse progress (#3781)

### What problem does this PR solve?

Refactor parse file progress

### Type of change

- [x] Refactoring

Signed-off-by: jinhai <haijin.chn@gmail.com>
This commit is contained in:
Jin Hai 2024-12-01 22:28:00 +08:00 committed by GitHub
parent ea84cc2e33
commit 08c1a5e1e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 73 additions and 74 deletions

View File

@ -193,7 +193,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000,
Next, these successive pieces are merge into chunks whose token number is no more than 'Max token number'. Next, these successive pieces are merge into chunks whose token number is no more than 'Max token number'.
""" """
eng = lang.lower() == "english" # is_english(cks) is_english = lang.lower() == "english" # is_english(cks)
parser_config = kwargs.get( parser_config = kwargs.get(
"parser_config", { "parser_config", {
"chunk_token_num": 128, "delimiter": "\n!?。;!?", "layout_recognize": True}) "chunk_token_num": 128, "delimiter": "\n!?。;!?", "layout_recognize": True})
@ -206,8 +206,8 @@ def chunk(filename, binary=None, from_page=0, to_page=100000,
pdf_parser = None pdf_parser = None
if re.search(r"\.docx$", filename, re.IGNORECASE): if re.search(r"\.docx$", filename, re.IGNORECASE):
callback(0.1, "Start to parse.") callback(0.1, "Start to parse.")
sections, tbls = Docx()(filename, binary) sections, tables = Docx()(filename, binary)
res = tokenize_table(tbls, doc, eng) # just for table res = tokenize_table(tables, doc, is_english) # just for table
callback(0.8, "Finish parsing.") callback(0.8, "Finish parsing.")
st = timer() st = timer()
@ -220,16 +220,14 @@ def chunk(filename, binary=None, from_page=0, to_page=100000,
if kwargs.get("section_only", False): if kwargs.get("section_only", False):
return chunks return chunks
res.extend(tokenize_chunks_docx(chunks, doc, eng, images)) res.extend(tokenize_chunks_docx(chunks, doc, is_english, images))
logging.info("naive_merge({}): {}".format(filename, timer() - st)) logging.info("naive_merge({}): {}".format(filename, timer() - st))
return res return res
elif re.search(r"\.pdf$", filename, re.IGNORECASE): elif re.search(r"\.pdf$", filename, re.IGNORECASE):
pdf_parser = Pdf( pdf_parser = Pdf() if parser_config.get("layout_recognize", True) else PlainParser()
) if parser_config.get("layout_recognize", True) else PlainParser() sections, tables = pdf_parser(filename if not binary else binary, from_page=from_page, to_page=to_page, callback=callback)
sections, tbls = pdf_parser(filename if not binary else binary, res = tokenize_table(tables, doc, is_english)
from_page=from_page, to_page=to_page, callback=callback)
res = tokenize_table(tbls, doc, eng)
elif re.search(r"\.xlsx?$", filename, re.IGNORECASE): elif re.search(r"\.xlsx?$", filename, re.IGNORECASE):
callback(0.1, "Start to parse.") callback(0.1, "Start to parse.")
@ -248,8 +246,8 @@ def chunk(filename, binary=None, from_page=0, to_page=100000,
elif re.search(r"\.(md|markdown)$", filename, re.IGNORECASE): elif re.search(r"\.(md|markdown)$", filename, re.IGNORECASE):
callback(0.1, "Start to parse.") callback(0.1, "Start to parse.")
sections, tbls = Markdown(int(parser_config.get("chunk_token_num", 128)))(filename, binary) sections, tables = Markdown(int(parser_config.get("chunk_token_num", 128)))(filename, binary)
res = tokenize_table(tbls, doc, eng) res = tokenize_table(tables, doc, is_english)
callback(0.8, "Finish parsing.") callback(0.8, "Finish parsing.")
elif re.search(r"\.(htm|html)$", filename, re.IGNORECASE): elif re.search(r"\.(htm|html)$", filename, re.IGNORECASE):
@ -289,7 +287,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000,
if kwargs.get("section_only", False): if kwargs.get("section_only", False):
return chunks return chunks
res.extend(tokenize_chunks(chunks, doc, eng, pdf_parser)) res.extend(tokenize_chunks(chunks, doc, is_english, pdf_parser))
logging.info("naive_merge({}): {}".format(filename, timer() - st)) logging.info("naive_merge({}): {}".format(filename, timer() - st))
return res return res

View File

@ -19,6 +19,7 @@
import logging import logging
import sys import sys
from api.utils.log_utils import initRootLogger from api.utils.log_utils import initRootLogger
CONSUMER_NO = "0" if len(sys.argv) < 2 else sys.argv[1] CONSUMER_NO = "0" if len(sys.argv) < 2 else sys.argv[1]
@ -166,52 +167,44 @@ def get_storage_binary(bucket, name):
return STORAGE_IMPL.get(bucket, name) return STORAGE_IMPL.get(bucket, name)
def build(row): def build_chunks(task, progress_callback):
if row["size"] > DOC_MAXIMUM_SIZE: if task["size"] > DOC_MAXIMUM_SIZE:
set_progress(row["id"], prog=-1, msg="File size exceeds( <= %dMb )" % set_progress(task["id"], prog=-1, msg="File size exceeds( <= %dMb )" %
(int(DOC_MAXIMUM_SIZE / 1024 / 1024))) (int(DOC_MAXIMUM_SIZE / 1024 / 1024)))
return [] return []
callback = partial( chunker = FACTORY[task["parser_id"].lower()]
set_progress,
row["id"],
row["from_page"],
row["to_page"])
chunker = FACTORY[row["parser_id"].lower()]
try: try:
st = timer() st = timer()
bucket, name = File2DocumentService.get_storage_address(doc_id=row["doc_id"]) bucket, name = File2DocumentService.get_storage_address(doc_id=task["doc_id"])
binary = get_storage_binary(bucket, name) binary = get_storage_binary(bucket, name)
logging.info( logging.info("From minio({}) {}/{}".format(timer() - st, task["location"], task["name"]))
"From minio({}) {}/{}".format(timer() - st, row["location"], row["name"]))
except TimeoutError: except TimeoutError:
callback(-1, "Internal server error: Fetch file from minio timeout. Could you try it again.") progress_callback(-1, "Internal server error: Fetch file from minio timeout. Could you try it again.")
logging.exception( logging.exception("Minio {}/{} got timeout: Fetch file from minio timeout.".format(task["location"], task["name"]))
"Minio {}/{} got timeout: Fetch file from minio timeout.".format(row["location"], row["name"]))
raise raise
except Exception as e: except Exception as e:
if re.search("(No such file|not found)", str(e)): if re.search("(No such file|not found)", str(e)):
callback(-1, "Can not find file <%s> from minio. Could you try it again?" % row["name"]) progress_callback(-1, "Can not find file <%s> from minio. Could you try it again?" % task["name"])
else: else:
callback(-1, "Get file from minio: %s" % str(e).replace("'", "")) progress_callback(-1, "Get file from minio: %s" % str(e).replace("'", ""))
logging.exception("Chunking {}/{} got exception".format(row["location"], row["name"])) logging.exception("Chunking {}/{} got exception".format(task["location"], task["name"]))
raise raise
try: try:
cks = chunker.chunk(row["name"], binary=binary, from_page=row["from_page"], cks = chunker.chunk(task["name"], binary=binary, from_page=task["from_page"],
to_page=row["to_page"], lang=row["language"], callback=callback, to_page=task["to_page"], lang=task["language"], callback=progress_callback,
kb_id=row["kb_id"], parser_config=row["parser_config"], tenant_id=row["tenant_id"]) kb_id=task["kb_id"], parser_config=task["parser_config"], tenant_id=task["tenant_id"])
logging.info("Chunking({}) {}/{} done".format(timer() - st, row["location"], row["name"])) logging.info("Chunking({}) {}/{} done".format(timer() - st, task["location"], task["name"]))
except Exception as e: except Exception as e:
callback(-1, "Internal server error while chunking: %s" % progress_callback(-1, "Internal server error while chunking: %s" % str(e).replace("'", ""))
str(e).replace("'", "")) logging.exception("Chunking {}/{} got exception".format(task["location"], task["name"]))
logging.exception("Chunking {}/{} got exception".format(row["location"], row["name"]))
raise raise
docs = [] docs = []
doc = { doc = {
"doc_id": row["doc_id"], "doc_id": task["doc_id"],
"kb_id": str(row["kb_id"]) "kb_id": str(task["kb_id"])
} }
el = 0 el = 0
for ck in cks: for ck in cks:
@ -240,41 +233,40 @@ def build(row):
d["image"].save(output_buffer, format='JPEG') d["image"].save(output_buffer, format='JPEG')
st = timer() st = timer()
STORAGE_IMPL.put(row["kb_id"], d["id"], output_buffer.getvalue()) STORAGE_IMPL.put(task["kb_id"], d["id"], output_buffer.getvalue())
el += timer() - st el += timer() - st
except Exception: except Exception:
logging.exception( logging.exception("Saving image of chunk {}/{}/{} got exception".format(task["location"], task["name"], d["_id"]))
"Saving image of chunk {}/{}/{} got exception".format(row["location"], row["name"], d["_id"]))
raise raise
d["img_id"] = "{}-{}".format(row["kb_id"], d["id"]) d["img_id"] = "{}-{}".format(task["kb_id"], d["id"])
del d["image"] del d["image"]
docs.append(d) docs.append(d)
logging.info("MINIO PUT({}):{}".format(row["name"], el)) logging.info("MINIO PUT({}):{}".format(task["name"], el))
if row["parser_config"].get("auto_keywords", 0): if task["parser_config"].get("auto_keywords", 0):
st = timer() st = timer()
callback(msg="Start to generate keywords for every chunk ...") progress_callback(msg="Start to generate keywords for every chunk ...")
chat_mdl = LLMBundle(row["tenant_id"], LLMType.CHAT, llm_name=row["llm_id"], lang=row["language"]) chat_mdl = LLMBundle(task["tenant_id"], LLMType.CHAT, llm_name=task["llm_id"], lang=task["language"])
for d in docs: for d in docs:
d["important_kwd"] = keyword_extraction(chat_mdl, d["content_with_weight"], d["important_kwd"] = keyword_extraction(chat_mdl, d["content_with_weight"],
row["parser_config"]["auto_keywords"]).split(",") task["parser_config"]["auto_keywords"]).split(",")
d["important_tks"] = rag_tokenizer.tokenize(" ".join(d["important_kwd"])) d["important_tks"] = rag_tokenizer.tokenize(" ".join(d["important_kwd"]))
callback(msg="Keywords generation completed in {:.2f}s".format(timer() - st)) progress_callback(msg="Keywords generation completed in {:.2f}s".format(timer() - st))
if row["parser_config"].get("auto_questions", 0): if task["parser_config"].get("auto_questions", 0):
st = timer() st = timer()
callback(msg="Start to generate questions for every chunk ...") progress_callback(msg="Start to generate questions for every chunk ...")
chat_mdl = LLMBundle(row["tenant_id"], LLMType.CHAT, llm_name=row["llm_id"], lang=row["language"]) chat_mdl = LLMBundle(task["tenant_id"], LLMType.CHAT, llm_name=task["llm_id"], lang=task["language"])
for d in docs: for d in docs:
qst = question_proposal(chat_mdl, d["content_with_weight"], row["parser_config"]["auto_questions"]) qst = question_proposal(chat_mdl, d["content_with_weight"], task["parser_config"]["auto_questions"])
d["content_with_weight"] = f"Question: \n{qst}\n\nAnswer:\n" + d["content_with_weight"] d["content_with_weight"] = f"Question: \n{qst}\n\nAnswer:\n" + d["content_with_weight"]
qst = rag_tokenizer.tokenize(qst) qst = rag_tokenizer.tokenize(qst)
if "content_ltks" in d: if "content_ltks" in d:
d["content_ltks"] += " " + qst d["content_ltks"] += " " + qst
if "content_sm_ltks" in d: if "content_sm_ltks" in d:
d["content_sm_ltks"] += " " + rag_tokenizer.fine_grained_tokenize(qst) d["content_sm_ltks"] += " " + rag_tokenizer.fine_grained_tokenize(qst)
callback(msg="Question generation completed in {:.2f}s".format(timer() - st)) progress_callback(msg="Question generation completed in {:.2f}s".format(timer() - st))
return docs return docs
@ -389,7 +381,9 @@ def do_handle_task(task):
# bind embedding model # bind embedding model
embedding_model = LLMBundle(task_tenant_id, LLMType.EMBEDDING, llm_name=task_embedding_id, lang=task_language) embedding_model = LLMBundle(task_tenant_id, LLMType.EMBEDDING, llm_name=task_embedding_id, lang=task_language)
except Exception as e: except Exception as e:
progress_callback(-1, msg=f'Fail to bind embedding model: {str(e)}') error_message = f'Fail to bind embedding model: {str(e)}'
progress_callback(-1, msg=error_message)
logging.exception(error_message)
raise raise
# Either using RAPTOR or Standard chunking methods # Either using RAPTOR or Standard chunking methods
@ -399,14 +393,16 @@ def do_handle_task(task):
chat_model = LLMBundle(task_tenant_id, LLMType.CHAT, llm_name=task_llm_id, lang=task_language) chat_model = LLMBundle(task_tenant_id, LLMType.CHAT, llm_name=task_llm_id, lang=task_language)
# run RAPTOR # run RAPTOR
chunks, tk_count, vector_size = run_raptor(task, chat_model, embedding_model, progress_callback) chunks, token_count, vector_size = run_raptor(task, chat_model, embedding_model, progress_callback)
except Exception as e: except Exception as e:
progress_callback(-1, msg=f'Fail to bind LLM used by RAPTOR: {str(e)}') error_message = f'Fail to bind LLM used by RAPTOR: {str(e)}'
progress_callback(-1, msg=error_message)
logging.exception(error_message)
raise raise
else: else:
# Standard chunking methods # Standard chunking methods
start_ts = timer() start_ts = timer()
chunks = build(task) chunks = build_chunks(task, progress_callback)
logging.info("Build document {}: {:.2f}s".format(task_document_name, timer() - start_ts)) logging.info("Build document {}: {:.2f}s".format(task_document_name, timer() - start_ts))
if chunks is None: if chunks is None:
return return
@ -418,38 +414,43 @@ def do_handle_task(task):
progress_callback(msg="Generate {} chunks".format(len(chunks))) progress_callback(msg="Generate {} chunks".format(len(chunks)))
start_ts = timer() start_ts = timer()
try: try:
tk_count, vector_size = embedding(chunks, embedding_model, task_parser_config, progress_callback) token_count, vector_size = embedding(chunks, embedding_model, task_parser_config, progress_callback)
except Exception as e: except Exception as e:
progress_callback(-1, "Generate embedding error:{}".format(str(e))) error_message = "Generate embedding error:{}".format(str(e))
logging.exception("run_embedding got exception") progress_callback(-1, error_message)
tk_count = 0 logging.exception(error_message)
token_count = 0
raise raise
logging.info("Embedding {} elapsed: {:.2f}".format(task_document_name, timer() - start_ts)) progress_message = "Embedding chunks ({:.2f}s)".format(timer() - start_ts)
progress_callback(msg="Embedding chunks ({:.2f}s)".format(timer() - start_ts)) logging.info(progress_message)
progress_callback(msg=progress_message)
# logging.info(f"task_executor init_kb index {search.index_name(task_tenant_id)} embedding_model {embedding_model.llm_name} vector length {vector_size}") # logging.info(f"task_executor init_kb index {search.index_name(task_tenant_id)} embedding_model {embedding_model.llm_name} vector length {vector_size}")
init_kb(task, vector_size) init_kb(task, vector_size)
chunk_count = len(set([chunk["id"] for chunk in chunks])) chunk_count = len(set([chunk["id"] for chunk in chunks]))
start_ts = timer() start_ts = timer()
es_r = "" doc_store_result = ""
es_bulk_size = 4 es_bulk_size = 4
for b in range(0, len(chunks), es_bulk_size): for b in range(0, len(chunks), es_bulk_size):
es_r = settings.docStoreConn.insert(chunks[b:b + es_bulk_size], search.index_name(task_tenant_id), task_dataset_id) doc_store_result = settings.docStoreConn.insert(chunks[b:b + es_bulk_size], search.index_name(task_tenant_id), task_dataset_id)
if b % 128 == 0: if b % 128 == 0:
progress_callback(prog=0.8 + 0.1 * (b + 1) / len(chunks), msg="") progress_callback(prog=0.8 + 0.1 * (b + 1) / len(chunks), msg="")
logging.info("Indexing {} elapsed: {:.2f}".format(task_document_name, timer() - start_ts)) logging.info("Indexing {} elapsed: {:.2f}".format(task_document_name, timer() - start_ts))
if es_r: if doc_store_result:
progress_callback(-1, "Insert chunk error, detail info please check log file. Please also check Elasticsearch/Infinity status!") error_message = "Insert chunk error: {doc_store_result}, please check log file and Elasticsearch/Infinity status!"
progress_callback(-1, msg=error_message)
settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id) settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id)
logging.error('Insert chunk error: ' + str(es_r)) logging.error(error_message)
raise Exception('Insert chunk error: ' + str(es_r)) raise Exception(error_message)
if TaskService.do_cancel(task_id): if TaskService.do_cancel(task_id):
settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id) settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id)
return return
progress_callback(1., msg="Finish Index ({:.2f}s)".format(timer() - start_ts)) DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, token_count, chunk_count, 0)
DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, tk_count, chunk_count, 0)
logging.info("Chunk doc({}), token({}), chunks({}), elapsed:{:.2f}".format(task_id, tk_count, len(chunks), timer() - start_ts)) time_cost = timer() - start_ts
progress_callback(prog=1.0, msg="Done ({:.2f}s)".format(time_cost))
logging.info("Chunk doc({}), token({}), chunks({}), elapsed:{:.2f}".format(task_id, token_count, len(chunks), time_cost))
def handle_task(): def handle_task():