From e3cf14a3c9c1150a8613f86a1eafcef24d34413f Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Fri, 9 Aug 2024 16:20:02 +0800 Subject: [PATCH] add function: upload and parse (#1889) ### What problem does this PR solve? #1880 ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- api/apps/conversation_app.py | 2 + api/apps/document_app.py | 204 ++++++++++++++++++++-------- api/db/services/dialog_service.py | 10 +- api/db/services/document_service.py | 2 +- api/db/services/file_service.py | 65 ++++++++- graphrag/index.py | 28 +--- graphrag/mind_map_extractor.py | 32 ++++- graphrag/mind_map_prompt.py | 1 + 8 files changed, 255 insertions(+), 89 deletions(-) diff --git a/api/apps/conversation_app.py b/api/apps/conversation_app.py index c3e07c6bb..b3088f76c 100644 --- a/api/apps/conversation_app.py +++ b/api/apps/conversation_app.py @@ -118,6 +118,8 @@ def completion(): if m["role"] == "assistant" and not msg: continue msg.append({"role": m["role"], "content": m["content"]}) + if "doc_ids" in m: + msg[-1]["doc_ids"] = m["doc_ids"] try: e, conv = ConversationService.get_by_id(req["conversation_id"]) if not e: diff --git a/api/apps/document_app.py b/api/apps/document_app.py index 31611dd94..94db0c22e 100644 --- a/api/apps/document_app.py +++ b/api/apps/document_app.py @@ -13,10 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License # - +import datetime +import hashlib +import json import os import pathlib import re +import traceback +from concurrent.futures import ThreadPoolExecutor +from copy import deepcopy +from io import BytesIO import flask from elasticsearch_dsl import Q @@ -24,22 +30,26 @@ from flask import request from flask_login import login_required, current_user from api.db.db_models import Task, File +from api.db.services.dialog_service import DialogService, ConversationService from api.db.services.file2document_service import File2DocumentService from api.db.services.file_service import FileService +from api.db.services.llm_service import LLMBundle from api.db.services.task_service import TaskService, queue_tasks +from api.db.services.user_service import TenantService +from graphrag.mind_map_extractor import MindMapExtractor +from rag.app import naive from rag.nlp import search from rag.utils.es_conn import ELASTICSEARCH from api.db.services import duplicate_name from api.db.services.knowledgebase_service import KnowledgebaseService from api.utils.api_utils import server_error_response, get_data_error_result, validate_request from api.utils import get_uuid -from api.db import FileType, TaskStatus, ParserType, FileSource +from api.db import FileType, TaskStatus, ParserType, FileSource, LLMType from api.db.services.document_service import DocumentService -from api.settings import RetCode +from api.settings import RetCode, stat_logger from api.utils.api_utils import get_json_result from rag.utils.minio_conn import MINIO -from api.utils.file_utils import filename_type, thumbnail -from api.utils.web_utils import html2pdf, is_valid_url +from api.utils.file_utils import filename_type, thumbnail, get_project_base_directory from api.utils.web_utils import html2pdf, is_valid_url @@ -65,55 +75,7 @@ def upload(): if not e: raise LookupError("Can't find this knowledgebase!") - root_folder = FileService.get_root_folder(current_user.id) - pf_id = root_folder["id"] - FileService.init_knowledgebase_docs(pf_id, current_user.id) - kb_root_folder = FileService.get_kb_folder(current_user.id) - kb_folder = FileService.new_a_file_from_kb(kb.tenant_id, kb.name, kb_root_folder["id"]) - - err = [] - for file in file_objs: - try: - MAX_FILE_NUM_PER_USER = int(os.environ.get('MAX_FILE_NUM_PER_USER', 0)) - if MAX_FILE_NUM_PER_USER > 0 and DocumentService.get_doc_count(kb.tenant_id) >= MAX_FILE_NUM_PER_USER: - raise RuntimeError("Exceed the maximum file number of a free user!") - - filename = duplicate_name( - DocumentService.query, - name=file.filename, - kb_id=kb.id) - filetype = filename_type(filename) - if filetype == FileType.OTHER.value: - raise RuntimeError("This type of file has not been supported yet!") - - location = filename - while MINIO.obj_exist(kb_id, location): - location += "_" - blob = file.read() - MINIO.put(kb_id, location, blob) - doc = { - "id": get_uuid(), - "kb_id": kb.id, - "parser_id": kb.parser_id, - "parser_config": kb.parser_config, - "created_by": current_user.id, - "type": filetype, - "name": filename, - "location": location, - "size": len(blob), - "thumbnail": thumbnail(filename, blob) - } - if doc["type"] == FileType.VISUAL: - doc["parser_id"] = ParserType.PICTURE.value - if doc["type"] == FileType.AURAL: - doc["parser_id"] = ParserType.AUDIO.value - if re.search(r"\.(ppt|pptx|pages)$", filename): - doc["parser_id"] = ParserType.PRESENTATION.value - DocumentService.insert(doc) - - FileService.add_file_from_kb(doc, kb_folder["id"], kb.tenant_id) - except Exception as e: - err.append(file.filename + ": " + str(e)) + err, _ = FileService.upload_document(kb, file_objs) if err: return get_json_result( data=False, retmsg="\n".join(err), retcode=RetCode.SERVER_ERROR) @@ -149,7 +111,7 @@ def web_crawl(): try: filename = duplicate_name( DocumentService.query, - name=name+".pdf", + name=name + ".pdf", kb_id=kb.id) filetype = filename_type(filename) if filetype == FileType.OTHER.value: @@ -414,7 +376,7 @@ def get(doc_id): if not e: return get_data_error_result(retmsg="Document not found!") - b,n = File2DocumentService.get_minio_address(doc_id=doc_id) + b, n = File2DocumentService.get_minio_address(doc_id=doc_id) response = flask.make_response(MINIO.get(b, n)) ext = re.search(r"\.([^.]+)$", doc.name) @@ -484,3 +446,133 @@ def get_image(image_id): return response except Exception as e: return server_error_response(e) + + +@manager.route('/upload_and_parse', methods=['POST']) +@login_required +@validate_request("conversation_id") +def upload_and_parse(): + req = request.json + if 'file' not in request.files: + return get_json_result( + data=False, retmsg='No file part!', retcode=RetCode.ARGUMENT_ERROR) + + file_objs = request.files.getlist('file') + for file_obj in file_objs: + if file_obj.filename == '': + return get_json_result( + data=False, retmsg='No file selected!', retcode=RetCode.ARGUMENT_ERROR) + + e, conv = ConversationService.get_by_id(req["conversation_id"]) + if not e: + return get_data_error_result(retmsg="Conversation not found!") + e, dia = DialogService.get_by_id(conv.dialog_id) + kb_id = dia.kb_ids[0] + e, kb = KnowledgebaseService.get_by_id(kb_id) + if not e: + raise LookupError("Can't find this knowledgebase!") + + idxnm = search.index_name(kb.tenant_id) + if not ELASTICSEARCH.indexExist(idxnm): + ELASTICSEARCH.createIdx(idxnm, json.load( + open(os.path.join(get_project_base_directory(), "conf", "mapping.json"), "r"))) + + embd_mdl = LLMBundle(kb.tenant_id, LLMType.EMBEDDING, llm_name=kb.embd_id, lang=kb.language) + + err, files = FileService.upload_document(kb, file_objs) + if err: + return get_json_result( + data=False, retmsg="\n".join(err), retcode=RetCode.SERVER_ERROR) + + def dummy(prog=None, msg=""): + pass + + parser_config = {"chunk_token_num": 4096, "delimiter": "\n!?。;!?", "layout_recognize": False} + exe = ThreadPoolExecutor(max_workers=12) + threads = [] + for d, blob in files: + kwargs = { + "callback": dummy, + "parser_config": parser_config, + "from_page": 0, + "to_page": 100000 + } + threads.append(exe.submit(naive.chunk, d["name"], blob, **kwargs)) + + for (docinfo,_), th in zip(files, threads): + docs = [] + doc = { + "doc_id": docinfo["id"], + "kb_id": [kb.id] + } + for ck in th.result(): + d = deepcopy(doc) + d.update(ck) + md5 = hashlib.md5() + md5.update((ck["content_with_weight"] + + str(d["doc_id"])).encode("utf-8")) + d["_id"] = md5.hexdigest() + d["create_time"] = str(datetime.datetime.now()).replace("T", " ")[:19] + d["create_timestamp_flt"] = datetime.datetime.now().timestamp() + if not d.get("image"): + docs.append(d) + continue + + output_buffer = BytesIO() + if isinstance(d["image"], bytes): + output_buffer = BytesIO(d["image"]) + else: + d["image"].save(output_buffer, format='JPEG') + + MINIO.put(kb.id, d["_id"], output_buffer.getvalue()) + d["img_id"] = "{}-{}".format(kb.id, d["_id"]) + del d["image"] + docs.append(d) + + parser_ids = {d["id"]: d["parser_id"] for d, _ in files} + docids = [d["id"] for d, _ in files] + chunk_counts = {id: 0 for id in docids} + token_counts = {id: 0 for id in docids} + es_bulk_size = 64 + + def embedding(doc_id, cnts, batch_size=16): + nonlocal embd_mdl, chunk_counts, token_counts + vects = [] + for i in range(0, len(cnts), batch_size): + vts, c = embd_mdl.encode(cnts[i: i + batch_size]) + vects.extend(vts.tolist()) + chunk_counts[doc_id] += len(cnts[i:i + batch_size]) + token_counts[doc_id] += c + return vects + + _, tenant = TenantService.get_by_id(kb.tenant_id) + llm_bdl = LLMBundle(kb.tenant_id, LLMType.CHAT, tenant.llm_id) + for doc_id in docids: + cks = [c for c in docs if c["doc_id"] == doc_id] + + if parser_ids[doc_id] != ParserType.PICTURE.value: + mindmap = MindMapExtractor(llm_bdl) + try: + mind_map = json.dumps(mindmap([c["content_with_weight"] for c in docs if c["doc_id"] == doc_id]).output, ensure_ascii=False, indent=2) + if len(mind_map) < 32: raise Exception("Few content: "+mind_map) + cks.append({ + "doc_id": doc_id, + "kb_id": [kb.id], + "content_with_weight": mind_map, + "knowledge_graph_kwd": "mind_map" + }) + except Exception as e: + stat_logger.error("Mind map generation error:", traceback.format_exc()) + + vects = embedding(doc_id, cks) + assert len(cks) == len(vects) + for i, d in enumerate(cks): + v = vects[i] + d["q_%d_vec" % len(v)] = v + for b in range(0, len(cks), es_bulk_size): + ELASTICSEARCH.bulk(cks[b:b + es_bulk_size], idxnm) + + DocumentService.increment_chunk_num( + doc_id, kb.id, token_counts[doc_id], chunk_counts[doc_id], 0) + + return get_json_result(data=[d["id"] for d in files]) diff --git a/api/db/services/dialog_service.py b/api/db/services/dialog_service.py index 71429198a..2cfa8b8f8 100644 --- a/api/db/services/dialog_service.py +++ b/api/db/services/dialog_service.py @@ -104,7 +104,11 @@ def chat(dialog, messages, stream=True, **kwargs): is_kg = all([kb.parser_id == ParserType.KG for kb in kbs]) retr = retrievaler if not is_kg else kg_retrievaler - questions = [m["content"] for m in messages if m["role"] == "user"] + questions = [m["content"] for m in messages if m["role"] == "user"][-3:] + attachments = kwargs["doc_ids"].split(",") if "doc_ids" in kwargs else None + if "doc_ids" in messages[-1]: + attachments = messages[-1]["doc_ids"] + embd_mdl = LLMBundle(dialog.tenant_id, LLMType.EMBEDDING, embd_nms[0]) if llm_id2llm_type(dialog.llm_id) == "image2text": chat_mdl = LLMBundle(dialog.tenant_id, LLMType.IMAGE2TEXT, dialog.llm_id) @@ -144,7 +148,7 @@ def chat(dialog, messages, stream=True, **kwargs): kbinfos = retr.retrieval(" ".join(questions), embd_mdl, dialog.tenant_id, dialog.kb_ids, 1, dialog.top_n, dialog.similarity_threshold, dialog.vector_similarity_weight, - doc_ids=kwargs["doc_ids"].split(",") if "doc_ids" in kwargs else None, + doc_ids=attachments, top=dialog.top_k, aggs=False, rerank_mdl=rerank_mdl) knowledges = [ck["content_with_weight"] for ck in kbinfos["chunks"]] #self-rag @@ -153,7 +157,7 @@ def chat(dialog, messages, stream=True, **kwargs): kbinfos = retr.retrieval(" ".join(questions), embd_mdl, dialog.tenant_id, dialog.kb_ids, 1, dialog.top_n, dialog.similarity_threshold, dialog.vector_similarity_weight, - doc_ids=kwargs["doc_ids"].split(",") if "doc_ids" in kwargs else None, + doc_ids=attachments, top=dialog.top_k, aggs=False, rerank_mdl=rerank_mdl) knowledges = [ck["content_with_weight"] for ck in kbinfos["chunks"]] diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 019cea255..f87d8f10d 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -26,7 +26,7 @@ from rag.utils.es_conn import ELASTICSEARCH from rag.utils.minio_conn import MINIO from rag.nlp import search -from api.db import FileType, TaskStatus +from api.db import FileType, TaskStatus, ParserType from api.db.db_models import DB, Knowledgebase, Tenant, Task from api.db.db_models import Document from api.db.services.common_service import CommonService diff --git a/api/db/services/file_service.py b/api/db/services/file_service.py index 809690820..9ddc4841b 100644 --- a/api/db/services/file_service.py +++ b/api/db/services/file_service.py @@ -13,16 +13,21 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import re + from flask_login import current_user from peewee import fn -from api.db import FileType, KNOWLEDGEBASE_FOLDER_NAME, FileSource +from api.db import FileType, KNOWLEDGEBASE_FOLDER_NAME, FileSource, ParserType from api.db.db_models import DB, File2Document, Knowledgebase from api.db.db_models import File, Document +from api.db.services import duplicate_name from api.db.services.common_service import CommonService from api.db.services.document_service import DocumentService from api.db.services.file2document_service import File2DocumentService from api.utils import get_uuid +from api.utils.file_utils import filename_type, thumbnail +from rag.utils.minio_conn import MINIO class FileService(CommonService): @@ -318,4 +323,60 @@ class FileService(CommonService): cls.filter_update((cls.model.id << file_ids, ), { 'parent_id': folder_id }) except Exception as e: print(e) - raise RuntimeError("Database error (File move)!") \ No newline at end of file + raise RuntimeError("Database error (File move)!") + + @classmethod + @DB.connection_context() + def upload_document(self, kb, file_objs): + root_folder = self.get_root_folder(current_user.id) + pf_id = root_folder["id"] + self.init_knowledgebase_docs(pf_id, current_user.id) + kb_root_folder = self.get_kb_folder(current_user.id) + kb_folder = self.new_a_file_from_kb(kb.tenant_id, kb.name, kb_root_folder["id"]) + + err, files = [], [] + for file in file_objs: + try: + MAX_FILE_NUM_PER_USER = int(os.environ.get('MAX_FILE_NUM_PER_USER', 0)) + if MAX_FILE_NUM_PER_USER > 0 and DocumentService.get_doc_count(kb.tenant_id) >= MAX_FILE_NUM_PER_USER: + raise RuntimeError("Exceed the maximum file number of a free user!") + + filename = duplicate_name( + DocumentService.query, + name=file.filename, + kb_id=kb.id) + filetype = filename_type(filename) + if filetype == FileType.OTHER.value: + raise RuntimeError("This type of file has not been supported yet!") + + location = filename + while MINIO.obj_exist(kb.id, location): + location += "_" + blob = file.read() + MINIO.put(kb.id, location, blob) + doc = { + "id": get_uuid(), + "kb_id": kb.id, + "parser_id": kb.parser_id, + "parser_config": kb.parser_config, + "created_by": current_user.id, + "type": filetype, + "name": filename, + "location": location, + "size": len(blob), + "thumbnail": thumbnail(filename, blob) + } + if doc["type"] == FileType.VISUAL: + doc["parser_id"] = ParserType.PICTURE.value + if doc["type"] == FileType.AURAL: + doc["parser_id"] = ParserType.AUDIO.value + if re.search(r"\.(ppt|pptx|pages)$", filename): + doc["parser_id"] = ParserType.PRESENTATION.value + DocumentService.insert(doc) + + FileService.add_file_from_kb(doc, kb_folder["id"], kb.tenant_id) + files.append((doc, blob)) + except Exception as e: + err.append(file.filename + ": " + str(e)) + + return err, files \ No newline at end of file diff --git a/graphrag/index.py b/graphrag/index.py index a8e43eea7..ea9c354ad 100644 --- a/graphrag/index.py +++ b/graphrag/index.py @@ -30,24 +30,6 @@ from rag.nlp import rag_tokenizer from rag.utils import num_tokens_from_string -def be_children(obj: dict, keyset:set): - if isinstance(obj, str): - obj = [obj] - if isinstance(obj, list): - for i in obj: keyset.add(i) - return [{"id": re.sub(r"\*+", "", i), "children":[]} for i in obj] - arr = [] - for k,v in obj.items(): - k = re.sub(r"\*+", "", k) - if not k or k in keyset:continue - keyset.add(k) - arr.append({ - "id": k, - "children": be_children(v, keyset) - }) - return arr - - def graph_merge(g1, g2): g = g2.copy() for n, attr in g1.nodes(data=True): @@ -153,16 +135,10 @@ def build_knowlege_graph_chunks(tenant_id: str, chunks: List[str], callback, ent mg = mindmap(_chunks).output if not len(mg.keys()): return chunks - if len(mg.keys()) > 1: - keyset = set([re.sub(r"\*+", "", k) for k,v in mg.items() if isinstance(v, dict) and re.sub(r"\*+", "", k)]) - md_map = {"id": "root", "children": [{"id": re.sub(r"\*+", "", k), "children": be_children(v, keyset)} for k,v in mg.items() if isinstance(v, dict) and re.sub(r"\*+", "", k)]} - else: - k = re.sub(r"\*+", "", list(mg.keys())[0]) - md_map = {"id": k, "children": be_children(list(mg.items())[0][1], set([k]))} - print(json.dumps(md_map, ensure_ascii=False, indent=2)) + print(json.dumps(mg, ensure_ascii=False, indent=2)) chunks.append( { - "content_with_weight": json.dumps(md_map, ensure_ascii=False, indent=2), + "content_with_weight": json.dumps(mg, ensure_ascii=False, indent=2), "knowledge_graph_kwd": "mind_map" }) diff --git a/graphrag/mind_map_extractor.py b/graphrag/mind_map_extractor.py index f5e80d7bf..5eca9a204 100644 --- a/graphrag/mind_map_extractor.py +++ b/graphrag/mind_map_extractor.py @@ -57,6 +57,26 @@ class MindMapExtractor: self._mind_map_prompt = prompt or MIND_MAP_EXTRACTION_PROMPT self._on_error = on_error or (lambda _e, _s, _d: None) + def _key(self, k): + return re.sub(r"\*+", "", k) + + def _be_children(self, obj: dict, keyset: set): + if isinstance(obj, str): + obj = [obj] + if isinstance(obj, list): + for i in obj: keyset.add(i) + return [{"id": re.sub(r"\*+", "", i), "children": []} for i in obj] + arr = [] + for k, v in obj.items(): + k = self._key(k) + if not k or k in keyset: continue + keyset.add(k) + arr.append({ + "id": k, + "children": self._be_children(v, keyset) + }) + return arr + def __call__( self, sections: list[str], prompt_variables: dict[str, Any] | None = None ) -> MindMapResult: @@ -86,13 +106,23 @@ class MindMapExtractor: res.append(_.result()) merge_json = reduce(self._merge, res) - merge_json = self._list_to_kv(merge_json) + if len(merge_json.keys()) > 1: + keyset = set( + [re.sub(r"\*+", "", k) for k, v in merge_json.items() if isinstance(v, dict) and re.sub(r"\*+", "", k)]) + merge_json = {"id": "root", + "children": [{"id": self._key(k), "children": self._be_children(v, keyset)} for k, v in + merge_json.items() if isinstance(v, dict) and self._key(k)]} + else: + k = self._key(list(self._be_children.keys())[0]) + merge_json = {"id": k, "children": self._be_children(list(merge_json.items())[0][1], set([k]))} + except Exception as e: logging.exception("error mind graph") self._on_error( e, traceback.format_exc(), None ) + merge_json = {"error": str(e)} return MindMapResult(output=merge_json) diff --git a/graphrag/mind_map_prompt.py b/graphrag/mind_map_prompt.py index d87a7e895..4613c12dd 100644 --- a/graphrag/mind_map_prompt.py +++ b/graphrag/mind_map_prompt.py @@ -23,6 +23,7 @@ MIND_MAP_EXTRACTION_PROMPT = """ 4. Add a shot content summary of the bottom level section. - Output requirement: + - Generate at least 4 levels. - Always try to maximize the number of sub-sections. - In language of 'Text' - MUST IN FORMAT OF MARKDOWN