Perf: reduce upload to minio limiter scope (#7878)

### What problem does this PR solve?
reduce upload_to_minio limter scope

### Type of change
- [x] Performance Improvement
This commit is contained in:
Stephen Hu 2025-05-27 17:49:37 +08:00 committed by GitHub
parent 28cb4df127
commit 273f36cc54
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -277,28 +277,27 @@ async def build_chunks(task, progress_callback):
async def upload_to_minio(document, chunk): async def upload_to_minio(document, chunk):
try: try:
async with minio_limiter: d = copy.deepcopy(document)
d = copy.deepcopy(document) d.update(chunk)
d.update(chunk) d["id"] = xxhash.xxh64((chunk["content_with_weight"] + str(d["doc_id"])).encode("utf-8")).hexdigest()
d["id"] = xxhash.xxh64((chunk["content_with_weight"] + str(d["doc_id"])).encode("utf-8")).hexdigest() d["create_time"] = str(datetime.now()).replace("T", " ")[:19]
d["create_time"] = str(datetime.now()).replace("T", " ")[:19] d["create_timestamp_flt"] = datetime.now().timestamp()
d["create_timestamp_flt"] = datetime.now().timestamp() if not d.get("image"):
if not d.get("image"): _ = d.pop("image", None)
_ = d.pop("image", None) d["img_id"] = ""
d["img_id"] = ""
docs.append(d)
return
output_buffer = BytesIO()
if isinstance(d["image"], bytes):
output_buffer = BytesIO(d["image"])
else:
d["image"].save(output_buffer, format='JPEG')
await trio.to_thread.run_sync(lambda: STORAGE_IMPL.put(task["kb_id"], d["id"], output_buffer.getvalue()))
d["img_id"] = "{}-{}".format(task["kb_id"], d["id"])
del d["image"]
docs.append(d) docs.append(d)
return
output_buffer = BytesIO()
if isinstance(d["image"], bytes):
output_buffer = BytesIO(d["image"])
else:
d["image"].save(output_buffer, format='JPEG')
async with minio_limiter:
await trio.to_thread.run_sync(lambda: STORAGE_IMPL.put(task["kb_id"], d["id"], output_buffer.getvalue()))
d["img_id"] = "{}-{}".format(task["kb_id"], d["id"])
del d["image"]
docs.append(d)
except Exception: except Exception:
logging.exception( logging.exception(
"Saving image of chunk {}/{}/{} got exception".format(task["location"], task["name"], d["id"])) "Saving image of chunk {}/{}/{} got exception".format(task["location"], task["name"], d["id"]))