From 3e19044dee6041332677a3f46817d7a19471de9d Mon Sep 17 00:00:00 2001 From: Debug Doctor <109025558+debugdoctor@users.noreply.github.com> Date: Mon, 17 Mar 2025 11:58:40 +0800 Subject: [PATCH] Feat: add OCR's muti-gpus and parallel processing support (#5972) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? Add OCR's muti-gpus and parallel processing support ### Type of change - [x] New Feature (non-breaking change which adds functionality) @yuzhichang I've tried to resolve the comments in #5697. OCR jobs can now be done on both CPU and GPU. ( By the way, I've encountered a “Generate embedding error” issue #5954 that might be due to my outdated GPUs? idk. ) Please review it and give me suggestions. GPU: ![gpu_ocr](https://github.com/user-attachments/assets/0ee2ecfb-a665-4e50-8bc7-15941b9cd80e) ![smi](https://github.com/user-attachments/assets/a2312f8c-cf24-443d-bf89-bec50503546d) CPU: ![cpu_ocr](https://github.com/user-attachments/assets/1ba6bb0b-94df-41ea-be79-790096da4bf1) --- deepdoc/parser/pdf_parser.py | 69 +++++++++++++++++++++++-------- deepdoc/vision/ocr.py | 79 +++++++++++++++++++++++++----------- deepdoc/vision/t_ocr.py | 40 ++++++++++++++++-- rag/app/naive.py | 7 +++- rag/svr/task_executor.py | 10 ++++- 5 files changed, 157 insertions(+), 48 deletions(-) diff --git a/deepdoc/parser/pdf_parser.py b/deepdoc/parser/pdf_parser.py index 30a30f580..b9e2bee65 100644 --- a/deepdoc/parser/pdf_parser.py +++ b/deepdoc/parser/pdf_parser.py @@ -20,6 +20,7 @@ import random from timeit import default_timer as timer import sys import threading +import trio import xgboost as xgb from io import BytesIO @@ -41,7 +42,7 @@ if LOCK_KEY_pdfplumber not in sys.modules: sys.modules[LOCK_KEY_pdfplumber] = threading.Lock() class RAGFlowPdfParser: - def __init__(self): + def __init__(self, parallel_devices: int | None = None): """ If you have trouble downloading HuggingFace models, -_^ this might help!! @@ -53,7 +54,13 @@ class RAGFlowPdfParser: ^_- """ - self.ocr = OCR() + + self.ocr = OCR(parallel_devices = parallel_devices) + self.parallel_devices = parallel_devices + self.parallel_limiter = None + if parallel_devices is not None and parallel_devices > 1: + self.parallel_limiter = [trio.CapacityLimiter(1) for _ in range(parallel_devices)] + if hasattr(self, "model_speciess"): self.layouter = LayoutRecognizer("layout." + self.model_speciess) else: @@ -63,7 +70,7 @@ class RAGFlowPdfParser: self.updown_cnt_mdl = xgb.Booster() if not settings.LIGHTEN: try: - import torch + import torch.cuda if torch.cuda.is_available(): self.updown_cnt_mdl.set_param({"device": "cuda"}) except Exception: @@ -283,9 +290,9 @@ class RAGFlowPdfParser: b["H_right"] = spans[ii]["x1"] b["SP"] = ii - def __ocr(self, pagenum, img, chars, ZM=3): + def __ocr(self, pagenum, img, chars, ZM=3, device_id: int | None = None): start = timer() - bxs = self.ocr.detect(np.array(img)) + bxs = self.ocr.detect(np.array(img), device_id) logging.info(f"__ocr detecting boxes of a image cost ({timer() - start}s)") start = timer() @@ -330,7 +337,7 @@ class RAGFlowPdfParser: b["box_image"] = self.ocr.get_rotate_crop_image(img_np, np.array([[left, top], [right, top], [right, bott], [left, bott]], dtype=np.float32)) boxes_to_reg.append(b) del b["txt"] - texts = self.ocr.recognize_batch([b["box_image"] for b in boxes_to_reg]) + texts = self.ocr.recognize_batch([b["box_image"] for b in boxes_to_reg], device_id) for i in range(len(boxes_to_reg)): boxes_to_reg[i]["text"] = texts[i] del boxes_to_reg[i]["box_image"] @@ -1022,28 +1029,54 @@ class RAGFlowPdfParser: else: self.is_english = False - start = timer() - for i, img in enumerate(self.page_images): - chars = self.page_chars[i] if not self.is_english else [] - self.mean_height.append( - np.median(sorted([c["height"] for c in chars])) if chars else 0 - ) - self.mean_width.append( - np.median(sorted([c["width"] for c in chars])) if chars else 8 - ) - self.page_cum_height.append(img.size[1] / zoomin) + async def __img_ocr(i, id, img, chars, limiter): j = 0 while j + 1 < len(chars): if chars[j]["text"] and chars[j + 1]["text"] \ and re.match(r"[0-9a-zA-Z,.:;!%]+", chars[j]["text"] + chars[j + 1]["text"]) \ and chars[j + 1]["x0"] - chars[j]["x1"] >= min(chars[j + 1]["width"], - chars[j]["width"]) / 2: + chars[j]["width"]) / 2: chars[j]["text"] += " " j += 1 - self.__ocr(i + 1, img, chars, zoomin) + if limiter: + async with limiter: + await trio.to_thread.run_sync(lambda: self.__ocr(i + 1, img, chars, zoomin, id)) + else: + self.__ocr(i + 1, img, chars, zoomin, id) + if callback and i % 6 == 5: callback(prog=(i + 1) * 0.6 / len(self.page_images), msg="") + + async def __img_ocr_launcher(): + def __ocr_preprocess(): + chars = self.page_chars[i] if not self.is_english else [] + self.mean_height.append( + np.median(sorted([c["height"] for c in chars])) if chars else 0 + ) + self.mean_width.append( + np.median(sorted([c["width"] for c in chars])) if chars else 8 + ) + self.page_cum_height.append(img.size[1] / zoomin) + return chars + + if self.parallel_limiter: + async with trio.open_nursery() as nursery: + for i, img in enumerate(self.page_images): + chars = __ocr_preprocess() + + nursery.start_soon(__img_ocr, i, i % self.parallel_devices, img, chars, + self.parallel_limiter[i % self.parallel_devices]) + await trio.sleep(0.1) + else: + for i, img in enumerate(self.page_images): + chars = __ocr_preprocess() + await __img_ocr(i, 0, img, chars, None) + + start = timer() + + trio.run(__img_ocr_launcher) + logging.info(f"__images__ {len(self.page_images)} pages cost {timer() - start}s") if not self.is_english and not any( diff --git a/deepdoc/vision/ocr.py b/deepdoc/vision/ocr.py index 87ba2b669..6025f5603 100644 --- a/deepdoc/vision/ocr.py +++ b/deepdoc/vision/ocr.py @@ -66,10 +66,12 @@ def create_operators(op_param_list, global_config=None): return ops -def load_model(model_dir, nm): +def load_model(model_dir, nm, device_id: int | None = None): model_file_path = os.path.join(model_dir, nm + ".onnx") + model_cached_tag = model_file_path + str(device_id) if device_id is not None else model_file_path + global loaded_models - loaded_model = loaded_models.get(model_file_path) + loaded_model = loaded_models.get(model_cached_tag) if loaded_model: logging.info(f"load_model {model_file_path} reuses cached model") return loaded_model @@ -81,7 +83,7 @@ def load_model(model_dir, nm): def cuda_is_available(): try: import torch - if torch.cuda.is_available(): + if torch.cuda.is_available() and torch.cuda.device_count() > device_id: return True except Exception: return False @@ -98,7 +100,7 @@ def load_model(model_dir, nm): run_options = ort.RunOptions() if cuda_is_available(): cuda_provider_options = { - "device_id": 0, # Use specific GPU + "device_id": device_id, # Use specific GPU "gpu_mem_limit": 512 * 1024 * 1024, # Limit gpu memory "arena_extend_strategy": "kNextPowerOfTwo", # gpu memory allocation strategy } @@ -108,7 +110,7 @@ def load_model(model_dir, nm): providers=['CUDAExecutionProvider'], provider_options=[cuda_provider_options] ) - run_options.add_run_config_entry("memory.enable_memory_arena_shrinkage", "gpu:0") + run_options.add_run_config_entry("memory.enable_memory_arena_shrinkage", "gpu:" + str(device_id)) logging.info(f"load_model {model_file_path} uses GPU") else: sess = ort.InferenceSession( @@ -118,12 +120,12 @@ def load_model(model_dir, nm): run_options.add_run_config_entry("memory.enable_memory_arena_shrinkage", "cpu") logging.info(f"load_model {model_file_path} uses CPU") loaded_model = (sess, run_options) - loaded_models[model_file_path] = loaded_model + loaded_models[model_cached_tag] = loaded_model return loaded_model class TextRecognizer: - def __init__(self, model_dir): + def __init__(self, model_dir, device_id: int | None = None): self.rec_image_shape = [int(v) for v in "3, 48, 320".split(",")] self.rec_batch_num = 16 postprocess_params = { @@ -132,7 +134,7 @@ class TextRecognizer: "use_space_char": True } self.postprocess_op = build_post_process(postprocess_params) - self.predictor, self.run_options = load_model(model_dir, 'rec') + self.predictor, self.run_options = load_model(model_dir, 'rec', device_id) self.input_tensor = self.predictor.get_inputs()[0] def resize_norm_img(self, img, max_wh_ratio): @@ -394,7 +396,7 @@ class TextRecognizer: class TextDetector: - def __init__(self, model_dir): + def __init__(self, model_dir, device_id: int | None = None): pre_process_list = [{ 'DetResizeForTest': { 'limit_side_len': 960, @@ -418,7 +420,7 @@ class TextDetector: "unclip_ratio": 1.5, "use_dilation": False, "score_mode": "fast", "box_type": "quad"} self.postprocess_op = build_post_process(postprocess_params) - self.predictor, self.run_options = load_model(model_dir, 'det') + self.predictor, self.run_options = load_model(model_dir, 'det', device_id) self.input_tensor = self.predictor.get_inputs()[0] img_h, img_w = self.input_tensor.shape[2:] @@ -507,7 +509,7 @@ class TextDetector: class OCR: - def __init__(self, model_dir=None): + def __init__(self, model_dir=None, parallel_devices: int | None = None): """ If you have trouble downloading HuggingFace models, -_^ this might help!! @@ -524,14 +526,33 @@ class OCR: model_dir = os.path.join( get_project_base_directory(), "rag/res/deepdoc") - self.text_detector = TextDetector(model_dir) - self.text_recognizer = TextRecognizer(model_dir) + + # Append muti-gpus task to the list + if parallel_devices is not None and parallel_devices > 0: + self.text_detector = [] + self.text_recognizer = [] + for device_id in range(parallel_devices): + self.text_detector.append(TextDetector(model_dir, device_id)) + self.text_recognizer.append(TextRecognizer(model_dir, device_id)) + else: + self.text_detector = [TextDetector(model_dir, 0)] + self.text_recognizer = [TextRecognizer(model_dir, 0)] + except Exception: model_dir = snapshot_download(repo_id="InfiniFlow/deepdoc", local_dir=os.path.join(get_project_base_directory(), "rag/res/deepdoc"), local_dir_use_symlinks=False) - self.text_detector = TextDetector(model_dir) - self.text_recognizer = TextRecognizer(model_dir) + + if parallel_devices is not None: + assert parallel_devices > 0 , "Number of devices must be >= 1" + self.text_detector = [] + self.text_recognizer = [] + for device_id in range(parallel_devices): + self.text_detector.append(TextDetector(model_dir, device_id)) + self.text_recognizer.append(TextRecognizer(model_dir, device_id)) + else: + self.text_detector = [TextDetector(model_dir, 0)] + self.text_recognizer = [TextRecognizer(model_dir, 0)] self.drop_score = 0.5 self.crop_image_res_index = 0 @@ -593,14 +614,17 @@ class OCR: break return _boxes - def detect(self, img): + def detect(self, img, device_id: int | None = None): + if device_id is None: + device_id = 0 + time_dict = {'det': 0, 'rec': 0, 'cls': 0, 'all': 0} if img is None: return None, None, time_dict start = time.time() - dt_boxes, elapse = self.text_detector(img) + dt_boxes, elapse = self.text_detector[device_id](img) time_dict['det'] = elapse if dt_boxes is None: @@ -611,17 +635,22 @@ class OCR: return zip(self.sorted_boxes(dt_boxes), [ ("", 0) for _ in range(len(dt_boxes))]) - def recognize(self, ori_im, box): + def recognize(self, ori_im, box, device_id: int | None = None): + if device_id is None: + device_id = 0 + img_crop = self.get_rotate_crop_image(ori_im, box) - rec_res, elapse = self.text_recognizer([img_crop]) + rec_res, elapse = self.text_recognizer[device_id]([img_crop]) text, score = rec_res[0] if score < self.drop_score: return "" return text - def recognize_batch(self, img_list): - rec_res, elapse = self.text_recognizer(img_list) + def recognize_batch(self, img_list, device_id: int | None = None): + if device_id is None: + device_id = 0 + rec_res, elapse = self.text_recognizer[device_id](img_list) texts = [] for i in range(len(rec_res)): text, score = rec_res[i] @@ -630,15 +659,17 @@ class OCR: texts.append(text) return texts - def __call__(self, img, cls=True): + def __call__(self, img, device_id = 0, cls=True): time_dict = {'det': 0, 'rec': 0, 'cls': 0, 'all': 0} + if device_id is None: + device_id = 0 if img is None: return None, None, time_dict start = time.time() ori_im = img.copy() - dt_boxes, elapse = self.text_detector(img) + dt_boxes, elapse = self.text_detector[device_id](img) time_dict['det'] = elapse if dt_boxes is None: @@ -655,7 +686,7 @@ class OCR: img_crop = self.get_rotate_crop_image(ori_im, tmp_box) img_crop_list.append(img_crop) - rec_res, elapse = self.text_recognizer(img_crop_list) + rec_res, elapse = self.text_recognizer[device_id](img_crop_list) time_dict['rec'] = elapse diff --git a/deepdoc/vision/t_ocr.py b/deepdoc/vision/t_ocr.py index 2165449de..09652741a 100644 --- a/deepdoc/vision/t_ocr.py +++ b/deepdoc/vision/t_ocr.py @@ -28,14 +28,24 @@ from deepdoc.vision.seeit import draw_box from deepdoc.vision import OCR, init_in_out import argparse import numpy as np +import trio +# os.environ['CUDA_VISIBLE_DEVICES'] = '0,2' #2 gpus, uncontinuous +os.environ['CUDA_VISIBLE_DEVICES'] = '0' #1 gpu +# os.environ['CUDA_VISIBLE_DEVICES'] = '' #cpu def main(args): - ocr = OCR() + import torch.cuda + + cuda_devices = torch.cuda.device_count() + limiter = [trio.CapacityLimiter(1) for _ in range(cuda_devices)] if cuda_devices > 1 else None + ocr = OCR(parallel_devices = cuda_devices) images, outputs = init_in_out(args) - for i, img in enumerate(images): - bxs = ocr(np.array(img)) + + def __ocr(i, id, img): + print("Task {} start".format(i)) + bxs = ocr(np.array(img), id) bxs = [(line[0], line[1][0]) for line in bxs] bxs = [{ "text": t, @@ -47,6 +57,30 @@ def main(args): with open(outputs[i] + ".txt", "w+", encoding='utf-8') as f: f.write("\n".join([o["text"] for o in bxs])) + print("Task {} done".format(i)) + + async def __ocr_thread(i, id, img, limiter = None): + if limiter: + async with limiter: + print("Task {} use device {}".format(i, id)) + await trio.to_thread.run_sync(lambda: __ocr(i, id, img)) + else: + __ocr(i, id, img) + + async def __ocr_launcher(): + if cuda_devices > 1: + async with trio.open_nursery() as nursery: + for i, img in enumerate(images): + nursery.start_soon(__ocr_thread, i, i % cuda_devices, img, limiter[i % cuda_devices]) + await trio.sleep(0.1) + else: + for i, img in enumerate(images): + await __ocr_thread(i, 0, img) + + trio.run(__ocr_launcher) + + print("OCR tasks are all done") + if __name__ == "__main__": parser = argparse.ArgumentParser() diff --git a/rag/app/naive.py b/rag/app/naive.py index ff4f63f8c..670a1f701 100644 --- a/rag/app/naive.py +++ b/rag/app/naive.py @@ -128,6 +128,9 @@ class Docx(DocxParser): class Pdf(PdfParser): + def __init__(self, parallel_devices = None): + super().__init__(parallel_devices) + def __call__(self, filename, binary=None, from_page=0, to_page=100000, zoomin=3, callback=None): start = timer() @@ -194,7 +197,7 @@ class Markdown(MarkdownParser): def chunk(filename, binary=None, from_page=0, to_page=100000, - lang="Chinese", callback=None, **kwargs): + lang="Chinese", parallel_devices=None, callback=None, **kwargs): """ Supported file formats are docx, pdf, excel, txt. This method apply the naive ways to chunk files. @@ -234,7 +237,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, return res elif re.search(r"\.pdf$", filename, re.IGNORECASE): - pdf_parser = Pdf() + pdf_parser = Pdf(parallel_devices) if parser_config.get("layout_recognize", "DeepDOC") == "Plain Text": pdf_parser = PlainParser() sections, tables = pdf_parser(filename if not binary else binary, from_page=from_page, to_page=to_page, diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index bc5157ee4..359f9a9f3 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -100,6 +100,14 @@ MAX_CONCURRENT_CHUNK_BUILDERS = int(os.environ.get('MAX_CONCURRENT_CHUNK_BUILDER task_limiter = trio.CapacityLimiter(MAX_CONCURRENT_TASKS) chunk_limiter = trio.CapacityLimiter(MAX_CONCURRENT_CHUNK_BUILDERS) +PARALLEL_DEVICES = None +try: + import torch.cuda + PARALLEL_DEVICES = torch.cuda.device_count() + logging.info(f"found {PARALLEL_DEVICES} gpus") +except Exception: + logging.info("can't import package 'torch'") + # SIGUSR1 handler: start tracemalloc and take snapshot def start_tracemalloc_and_snapshot(signum, frame): if not tracemalloc.is_tracing(): @@ -241,7 +249,7 @@ async def build_chunks(task, progress_callback): try: async with chunk_limiter: cks = await trio.to_thread.run_sync(lambda: chunker.chunk(task["name"], binary=binary, from_page=task["from_page"], - to_page=task["to_page"], lang=task["language"], callback=progress_callback, + to_page=task["to_page"], lang=task["language"], parallel_devices = PARALLEL_DEVICES, callback=progress_callback, kb_id=task["kb_id"], parser_config=task["parser_config"], tenant_id=task["tenant_id"])) logging.info("Chunking({}) {}/{} done".format(timer() - st, task["location"], task["name"])) except TaskCanceledException: