diff --git a/deepdoc/parser/pdf_parser.py b/deepdoc/parser/pdf_parser.py index c65c17525..2205d0e61 100644 --- a/deepdoc/parser/pdf_parser.py +++ b/deepdoc/parser/pdf_parser.py @@ -37,13 +37,15 @@ from rag.nlp import rag_tokenizer from copy import deepcopy from huggingface_hub import snapshot_download +from rag.settings import PARALLEL_DEVICES + LOCK_KEY_pdfplumber = "global_shared_lock_pdfplumber" if LOCK_KEY_pdfplumber not in sys.modules: sys.modules[LOCK_KEY_pdfplumber] = threading.Lock() class RAGFlowPdfParser: - def __init__(self, parallel_devices: int | None = None): + def __init__(self): """ If you have trouble downloading HuggingFace models, -_^ this might help!! @@ -56,11 +58,10 @@ class RAGFlowPdfParser: """ - self.ocr = OCR(parallel_devices = parallel_devices) - self.parallel_devices = parallel_devices + self.ocr = OCR() self.parallel_limiter = None - if parallel_devices is not None and parallel_devices > 1: - self.parallel_limiter = [trio.CapacityLimiter(1) for _ in range(parallel_devices)] + if PARALLEL_DEVICES is not None and PARALLEL_DEVICES > 1: + self.parallel_limiter = [trio.CapacityLimiter(1) for _ in range(PARALLEL_DEVICES)] if hasattr(self, "model_speciess"): self.layouter = LayoutRecognizer("layout." + self.model_speciess) @@ -1018,7 +1019,6 @@ class RAGFlowPdfParser: self.pdf.close() if not self.outlines: logging.warning("Miss outlines") - logging.debug("Images converted.") self.is_english = [re.search(r"[a-zA-Z0-9,/ΒΈ;:'\[\]\(\)!@#$%^&*\"?<>._-]{30,}", "".join( @@ -1066,8 +1066,8 @@ class RAGFlowPdfParser: for i, img in enumerate(self.page_images): chars = __ocr_preprocess() - nursery.start_soon(__img_ocr, i, i % self.parallel_devices, img, chars, - self.parallel_limiter[i % self.parallel_devices]) + nursery.start_soon(__img_ocr, i, i % PARALLEL_DEVICES, img, chars, + self.parallel_limiter[i % PARALLEL_DEVICES]) await trio.sleep(0.1) else: for i, img in enumerate(self.page_images): diff --git a/deepdoc/vision/ocr.py b/deepdoc/vision/ocr.py index 6025f5603..4dedb7c67 100644 --- a/deepdoc/vision/ocr.py +++ b/deepdoc/vision/ocr.py @@ -22,6 +22,7 @@ import os from huggingface_hub import snapshot_download from api.utils.file_utils import get_project_base_directory +from rag.settings import PARALLEL_DEVICES from .operators import * # noqa: F403 from . import operators import math @@ -509,7 +510,7 @@ class TextDetector: class OCR: - def __init__(self, model_dir=None, parallel_devices: int | None = None): + def __init__(self, model_dir=None): """ If you have trouble downloading HuggingFace models, -_^ this might help!! @@ -528,10 +529,10 @@ class OCR: "rag/res/deepdoc") # Append muti-gpus task to the list - if parallel_devices is not None and parallel_devices > 0: + if PARALLEL_DEVICES is not None and PARALLEL_DEVICES > 0: self.text_detector = [] self.text_recognizer = [] - for device_id in range(parallel_devices): + for device_id in range(PARALLEL_DEVICES): self.text_detector.append(TextDetector(model_dir, device_id)) self.text_recognizer.append(TextRecognizer(model_dir, device_id)) else: @@ -543,11 +544,11 @@ class OCR: local_dir=os.path.join(get_project_base_directory(), "rag/res/deepdoc"), local_dir_use_symlinks=False) - if parallel_devices is not None: - assert parallel_devices > 0 , "Number of devices must be >= 1" + if PARALLEL_DEVICES is not None: + assert PARALLEL_DEVICES > 0, "Number of devices must be >= 1" self.text_detector = [] self.text_recognizer = [] - for device_id in range(parallel_devices): + for device_id in range(PARALLEL_DEVICES): self.text_detector.append(TextDetector(model_dir, device_id)) self.text_recognizer.append(TextRecognizer(model_dir, device_id)) else: diff --git a/deepdoc/vision/t_ocr.py b/deepdoc/vision/t_ocr.py index 09652741a..ccc96538b 100644 --- a/deepdoc/vision/t_ocr.py +++ b/deepdoc/vision/t_ocr.py @@ -34,15 +34,15 @@ import trio os.environ['CUDA_VISIBLE_DEVICES'] = '0' #1 gpu # os.environ['CUDA_VISIBLE_DEVICES'] = '' #cpu + def main(args): import torch.cuda cuda_devices = torch.cuda.device_count() limiter = [trio.CapacityLimiter(1) for _ in range(cuda_devices)] if cuda_devices > 1 else None - ocr = OCR(parallel_devices = cuda_devices) + ocr = OCR() images, outputs = init_in_out(args) - def __ocr(i, id, img): print("Task {} start".format(i)) bxs = ocr(np.array(img), id) diff --git a/rag/app/naive.py b/rag/app/naive.py index a01cf5eea..133632e50 100644 --- a/rag/app/naive.py +++ b/rag/app/naive.py @@ -128,8 +128,8 @@ class Docx(DocxParser): class Pdf(PdfParser): - def __init__(self, parallel_devices = None): - super().__init__(parallel_devices) + def __init__(self): + super().__init__() def __call__(self, filename, binary=None, from_page=0, to_page=100000, zoomin=3, callback=None): @@ -197,7 +197,7 @@ class Markdown(MarkdownParser): def chunk(filename, binary=None, from_page=0, to_page=100000, - lang="Chinese", callback=None, parallel_devices=None, **kwargs): + lang="Chinese", callback=None, **kwargs): """ Supported file formats are docx, pdf, excel, txt. This method apply the naive ways to chunk files. @@ -237,7 +237,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, return res elif re.search(r"\.pdf$", filename, re.IGNORECASE): - pdf_parser = Pdf(parallel_devices) + pdf_parser = Pdf() if parser_config.get("layout_recognize", "DeepDOC") == "Plain Text": pdf_parser = PlainParser() sections, tables = pdf_parser(filename if not binary else binary, from_page=from_page, to_page=to_page, diff --git a/rag/settings.py b/rag/settings.py index a54d65826..d1c32e6bf 100644 --- a/rag/settings.py +++ b/rag/settings.py @@ -39,6 +39,13 @@ SVR_CONSUMER_GROUP_NAME = "rag_flow_svr_task_broker" PAGERANK_FLD = "pagerank_fea" TAG_FLD = "tag_feas" +PARALLEL_DEVICES = None +try: + import torch.cuda + PARALLEL_DEVICES = torch.cuda.device_count() + logging.info(f"found {PARALLEL_DEVICES} gpus") +except Exception: + logging.info("can't import package 'torch'") def print_rag_settings(): logging.info(f"MAX_CONTENT_LENGTH: {DOC_MAXIMUM_SIZE}") diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 359f9a9f3..f1e16c094 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -100,13 +100,6 @@ MAX_CONCURRENT_CHUNK_BUILDERS = int(os.environ.get('MAX_CONCURRENT_CHUNK_BUILDER task_limiter = trio.CapacityLimiter(MAX_CONCURRENT_TASKS) chunk_limiter = trio.CapacityLimiter(MAX_CONCURRENT_CHUNK_BUILDERS) -PARALLEL_DEVICES = None -try: - import torch.cuda - PARALLEL_DEVICES = torch.cuda.device_count() - logging.info(f"found {PARALLEL_DEVICES} gpus") -except Exception: - logging.info("can't import package 'torch'") # SIGUSR1 handler: start tracemalloc and take snapshot def start_tracemalloc_and_snapshot(signum, frame): @@ -249,7 +242,7 @@ async def build_chunks(task, progress_callback): try: async with chunk_limiter: cks = await trio.to_thread.run_sync(lambda: chunker.chunk(task["name"], binary=binary, from_page=task["from_page"], - to_page=task["to_page"], lang=task["language"], parallel_devices = PARALLEL_DEVICES, callback=progress_callback, + to_page=task["to_page"], lang=task["language"], callback=progress_callback, kb_id=task["kb_id"], parser_config=task["parser_config"], tenant_id=task["tenant_id"])) logging.info("Chunking({}) {}/{} done".format(timer() - st, task["location"], task["name"])) except TaskCanceledException: