diff --git a/deepdoc/parser/figure_parser.py b/deepdoc/parser/figure_parser.py index b86f16a29..24b030716 100644 --- a/deepdoc/parser/figure_parser.py +++ b/deepdoc/parser/figure_parser.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +from concurrent.futures import ThreadPoolExecutor, as_completed from PIL import Image @@ -28,6 +28,7 @@ def vision_figure_parser_figure_data_wraper(figures_data_without_positions): ) for figure_data in figures_data_without_positions if isinstance(figure_data[1], Image.Image)] +shared_executor = ThreadPoolExecutor(max_workers=10) class VisionFigureParser: def __init__(self, vision_model, figures_data, *args, **kwargs): self.vision_model = vision_model @@ -73,16 +74,21 @@ class VisionFigureParser: def __call__(self, **kwargs): callback = kwargs.get("callback", lambda prog, msg: None) - for idx, img_binary in enumerate(self.figures or []): - figure_num = idx # 0-based - - txt = picture_vision_llm_chunk( - binary=img_binary, + def process(figure_idx, figure_binary): + description_text = picture_vision_llm_chunk( + binary=figure_binary, vision_model=self.vision_model, prompt=vision_llm_figure_describe_prompt(), callback=callback, ) + return figure_idx, description_text + futures = [] + for idx, img_binary in enumerate(self.figures or []): + futures.append(shared_executor.submit(process, idx, img_binary)) + + for future in as_completed(futures): + figure_num, txt = future.result() if txt: self.descriptions[figure_num] = txt + "\n".join(self.descriptions[figure_num]) diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 379581aa3..cf4028f1b 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -99,8 +99,10 @@ CURRENT_TASKS = {} MAX_CONCURRENT_TASKS = int(os.environ.get('MAX_CONCURRENT_TASKS', "5")) MAX_CONCURRENT_CHUNK_BUILDERS = int(os.environ.get('MAX_CONCURRENT_CHUNK_BUILDERS', "1")) +MAX_CONCURRENT_MINIO = int(os.environ.get('MAX_CONCURRENT_MINIO', '10')) task_limiter = trio.CapacityLimiter(MAX_CONCURRENT_TASKS) chunk_limiter = trio.CapacityLimiter(MAX_CONCURRENT_CHUNK_BUILDERS) +minio_limiter = trio.CapacityLimiter(MAX_CONCURRENT_MINIO) WORKER_HEARTBEAT_TIMEOUT = int(os.environ.get('WORKER_HEARTBEAT_TIMEOUT', '120')) stop_event = threading.Event() @@ -270,38 +272,43 @@ async def build_chunks(task, progress_callback): } if task["pagerank"]: doc[PAGERANK_FLD] = int(task["pagerank"]) - el = 0 - for ck in cks: - d = copy.deepcopy(doc) - d.update(ck) - d["id"] = xxhash.xxh64((ck["content_with_weight"] + str(d["doc_id"])).encode("utf-8")).hexdigest() - d["create_time"] = str(datetime.now()).replace("T", " ")[:19] - d["create_timestamp_flt"] = datetime.now().timestamp() - if not d.get("image"): - _ = d.pop("image", None) - d["img_id"] = "" - docs.append(d) - continue + st = timer() + async def upload_to_minio(document, chunk): try: - output_buffer = BytesIO() - if isinstance(d["image"], bytes): - output_buffer = BytesIO(d["image"]) - else: - d["image"].save(output_buffer, format='JPEG') + async with minio_limiter: + d = copy.deepcopy(document) + d.update(chunk) + d["id"] = xxhash.xxh64((chunk["content_with_weight"] + str(d["doc_id"])).encode("utf-8")).hexdigest() + d["create_time"] = str(datetime.now()).replace("T", " ")[:19] + d["create_timestamp_flt"] = datetime.now().timestamp() + if not d.get("image"): + _ = d.pop("image", None) + d["img_id"] = "" + docs.append(d) + return - st = timer() - await trio.to_thread.run_sync(lambda: STORAGE_IMPL.put(task["kb_id"], d["id"], output_buffer.getvalue())) - el += timer() - st + output_buffer = BytesIO() + if isinstance(d["image"], bytes): + output_buffer = BytesIO(d["image"]) + else: + d["image"].save(output_buffer, format='JPEG') + await trio.to_thread.run_sync(lambda: STORAGE_IMPL.put(task["kb_id"], d["id"], output_buffer.getvalue())) + + d["img_id"] = "{}-{}".format(task["kb_id"], d["id"]) + del d["image"] + docs.append(d) except Exception: logging.exception( "Saving image of chunk {}/{}/{} got exception".format(task["location"], task["name"], d["id"])) raise - d["img_id"] = "{}-{}".format(task["kb_id"], d["id"]) - del d["image"] - docs.append(d) - logging.info("MINIO PUT({}):{}".format(task["name"], el)) + async with trio.open_nursery() as nursery: + for ck in cks: + nursery.start_soon(upload_to_minio, doc, ck) + + el = timer() - st + logging.info("MINIO PUT({}) cost {:.3f} s".format(task["name"], el)) if task["parser_config"].get("auto_keywords", 0): st = timer()