enlarge docker memory usage (#501)

### What problem does this PR solve?

### Type of change

- [x] Refactoring
This commit is contained in:
KevinHuSh 2024-04-23 14:41:10 +08:00 committed by GitHub
parent 78402d9a57
commit 0dfc8ddc0f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 50 additions and 36 deletions

View File

@ -11,7 +11,7 @@ import pdfplumber
import logging import logging
from PIL import Image, ImageDraw from PIL import Image, ImageDraw
import numpy as np import numpy as np
from timeit import default_timer as timer
from PyPDF2 import PdfReader as pdf2_read from PyPDF2 import PdfReader as pdf2_read
from api.utils.file_utils import get_project_base_directory from api.utils.file_utils import get_project_base_directory
@ -936,6 +936,7 @@ class HuParser:
self.page_cum_height = [0] self.page_cum_height = [0]
self.page_layout = [] self.page_layout = []
self.page_from = page_from self.page_from = page_from
st = timer()
try: try:
self.pdf = pdfplumber.open(fnm) if isinstance( self.pdf = pdfplumber.open(fnm) if isinstance(
fnm, str) else pdfplumber.open(BytesIO(fnm)) fnm, str) else pdfplumber.open(BytesIO(fnm))
@ -989,7 +990,9 @@ class HuParser:
self.is_english = True self.is_english = True
else: else:
self.is_english = False self.is_english = False
self.is_english = False
st = timer()
for i, img in enumerate(self.page_images): for i, img in enumerate(self.page_images):
chars = self.page_chars[i] if not self.is_english else [] chars = self.page_chars[i] if not self.is_english else []
self.mean_height.append( self.mean_height.append(
@ -1007,15 +1010,11 @@ class HuParser:
chars[j]["width"]) / 2: chars[j]["width"]) / 2:
chars[j]["text"] += " " chars[j]["text"] += " "
j += 1 j += 1
# if i > 0:
# if not chars:
# self.page_cum_height.append(img.size[1] / zoomin)
# else:
# self.page_cum_height.append(
# np.max([c["bottom"] for c in chars]))
self.__ocr(i + 1, img, chars, zoomin) self.__ocr(i + 1, img, chars, zoomin)
if callback: #if callback:
callback(prog=(i + 1) * 0.6 / len(self.page_images), msg="") # callback(prog=(i + 1) * 0.6 / len(self.page_images), msg="")
#print("OCR:", timer()-st)
if not self.is_english and not any( if not self.is_english and not any(
[c for c in self.page_chars]) and self.boxes: [c for c in self.page_chars]) and self.boxes:

View File

@ -11,7 +11,9 @@ ES_PORT=1200
KIBANA_PORT=6601 KIBANA_PORT=6601
# Increase or decrease based on the available host memory (in bytes) # Increase or decrease based on the available host memory (in bytes)
MEM_LIMIT=12073741824
MEM_LIMIT=8073741824
MYSQL_PASSWORD=infini_rag_flow MYSQL_PASSWORD=infini_rag_flow
MYSQL_PORT=5455 MYSQL_PORT=5455

View File

@ -29,23 +29,23 @@ services:
- ragflow - ragflow
restart: always restart: always
kibana: #kibana:
depends_on: # depends_on:
es01: # es01:
condition: service_healthy # condition: service_healthy
image: docker.elastic.co/kibana/kibana:${STACK_VERSION} # image: docker.elastic.co/kibana/kibana:${STACK_VERSION}
container_name: ragflow-kibana # container_name: ragflow-kibana
volumes: # volumes:
- kibanadata:/usr/share/kibana/data # - kibanadata:/usr/share/kibana/data
ports: # ports:
- ${KIBANA_PORT}:5601 # - ${KIBANA_PORT}:5601
environment: # environment:
- SERVERNAME=kibana # - SERVERNAME=kibana
- ELASTICSEARCH_HOSTS=http://es01:9200 # - ELASTICSEARCH_HOSTS=http://es01:9200
- TZ=${TIMEZONE} # - TZ=${TIMEZONE}
mem_limit: ${MEM_LIMIT} # mem_limit: ${MEM_LIMIT}
networks: # networks:
- ragflow # - ragflow
mysql: mysql:
image: mysql:5.7.18 image: mysql:5.7.18

View File

@ -29,7 +29,7 @@ function task_bro(){
task_bro & task_bro &
WS=2 WS=1
for ((i=0;i<WS;i++)) for ((i=0;i<WS;i++))
do do
task_exe $i $WS & task_exe $i $WS &

View File

@ -37,7 +37,7 @@ class Pdf(PdfParser):
start = timer() start = timer()
self._layouts_rec(zoomin) self._layouts_rec(zoomin)
callback(0.67, "Layout analysis finished") callback(0.67, "Layout analysis finished")
print("paddle layouts:", timer() - start) print("layouts:", timer() - start)
self._table_transformer_job(zoomin) self._table_transformer_job(zoomin)
callback(0.68, "Table analysis finished") callback(0.68, "Table analysis finished")
self._text_merge() self._text_merge()

View File

@ -71,7 +71,7 @@ class Pdf(PdfParser):
start = timer() start = timer()
self._layouts_rec(zoomin) self._layouts_rec(zoomin)
callback(0.67, "Layout analysis finished") callback(0.67, "Layout analysis finished")
cron_logger.info("paddle layouts:".format( cron_logger.info("layouts:".format(
(timer() - start) / (self.total_page + 0.1))) (timer() - start) / (self.total_page + 0.1)))
self._naive_vertical_merge() self._naive_vertical_merge()

View File

@ -32,7 +32,7 @@ class Pdf(PdfParser):
self._layouts_rec(zoomin) self._layouts_rec(zoomin)
callback(0.65, "Layout analysis finished.") callback(0.65, "Layout analysis finished.")
print("paddle layouts:", timer() - start) print("layouts:", timer() - start)
self._table_transformer_job(zoomin) self._table_transformer_job(zoomin)
callback(0.67, "Table analysis finished.") callback(0.67, "Table analysis finished.")
self._text_merge() self._text_merge()

View File

@ -77,12 +77,12 @@ class Pdf(PdfParser):
callback callback
) )
callback(msg="OCR finished") callback(msg="OCR finished")
cron_logger.info("OCR: {}".format(timer() - start)) cron_logger.info("OCR({}~{}): {}".format(from_page, to_page, timer() - start))
start = timer() start = timer()
self._layouts_rec(zoomin) self._layouts_rec(zoomin)
callback(0.63, "Layout analysis finished.") callback(0.63, "Layout analysis finished.")
print("paddle layouts:", timer() - start) print("layouts:", timer() - start)
self._table_transformer_job(zoomin) self._table_transformer_job(zoomin)
callback(0.65, "Table analysis finished.") callback(0.65, "Table analysis finished.")
self._text_merge() self._text_merge()
@ -92,7 +92,7 @@ class Pdf(PdfParser):
self._concat_downward() self._concat_downward()
#self._filter_forpages() #self._filter_forpages()
cron_logger.info("paddle layouts: {}".format( cron_logger.info("layouts: {}".format(
(timer() - start) / (self.total_page + 0.1))) (timer() - start) / (self.total_page + 0.1)))
return [(b["text"], self._line_tag(b, zoomin)) return [(b["text"], self._line_tag(b, zoomin))
for b in self.boxes], tbls for b in self.boxes], tbls

View File

@ -33,7 +33,7 @@ class Pdf(PdfParser):
start = timer() start = timer()
self._layouts_rec(zoomin, drop=False) self._layouts_rec(zoomin, drop=False)
callback(0.63, "Layout analysis finished.") callback(0.63, "Layout analysis finished.")
print("paddle layouts:", timer() - start) print("layouts:", timer() - start)
self._table_transformer_job(zoomin) self._table_transformer_job(zoomin)
callback(0.65, "Table analysis finished.") callback(0.65, "Table analysis finished.")
self._text_merge() self._text_merge()

View File

@ -42,7 +42,7 @@ class Pdf(PdfParser):
start = timer() start = timer()
self._layouts_rec(zoomin) self._layouts_rec(zoomin)
callback(0.63, "Layout analysis finished") callback(0.63, "Layout analysis finished")
print("paddle layouts:", timer() - start) print("layouts:", timer() - start)
self._table_transformer_job(zoomin) self._table_transformer_job(zoomin)
callback(0.68, "Table analysis finished") callback(0.68, "Table analysis finished")
self._text_merge() self._text_merge()

View File

@ -33,6 +33,8 @@ from api.settings import database_logger
from api.utils import get_format_time, get_uuid from api.utils import get_format_time, get_uuid
from api.utils.file_utils import get_project_base_directory from api.utils.file_utils import get_project_base_directory
from rag.utils.redis_conn import REDIS_CONN from rag.utils.redis_conn import REDIS_CONN
from api.db.db_models import init_database_tables as init_web_db
from api.db.init_data import init_web_data
def collect(tm): def collect(tm):
@ -181,6 +183,9 @@ if __name__ == "__main__":
peewee_logger.propagate = False peewee_logger.propagate = False
peewee_logger.addHandler(database_logger.handlers[0]) peewee_logger.addHandler(database_logger.handlers[0])
peewee_logger.setLevel(database_logger.level) peewee_logger.setLevel(database_logger.level)
# init db
init_web_db()
init_web_data()
while True: while True:
dispatch() dispatch()

View File

@ -163,6 +163,7 @@ def build(row):
"doc_id": row["doc_id"], "doc_id": row["doc_id"],
"kb_id": [str(row["kb_id"])] "kb_id": [str(row["kb_id"])]
} }
el = 0
for ck in cks: for ck in cks:
d = copy.deepcopy(doc) d = copy.deepcopy(doc)
d.update(ck) d.update(ck)
@ -182,10 +183,13 @@ def build(row):
else: else:
d["image"].save(output_buffer, format='JPEG') d["image"].save(output_buffer, format='JPEG')
st = timer()
MINIO.put(row["kb_id"], d["_id"], output_buffer.getvalue()) MINIO.put(row["kb_id"], d["_id"], output_buffer.getvalue())
el += timer() - st
d["img_id"] = "{}-{}".format(row["kb_id"], d["_id"]) d["img_id"] = "{}-{}".format(row["kb_id"], d["_id"])
del d["image"] del d["image"]
docs.append(d) docs.append(d)
cron_logger.info("MINIO PUT({}):{}".format(row["name"], el))
return docs return docs
@ -258,7 +262,9 @@ def main(comm, mod):
callback(prog=-1, msg=str(e)) callback(prog=-1, msg=str(e))
continue continue
st = timer()
cks = build(r) cks = build(r)
cron_logger.info("Build chunks({}): {}".format(r["name"], timer()-st))
if cks is None: if cks is None:
continue continue
if not cks: if not cks:
@ -277,12 +283,14 @@ def main(comm, mod):
callback(-1, "Embedding error:{}".format(str(e))) callback(-1, "Embedding error:{}".format(str(e)))
cron_logger.error(str(e)) cron_logger.error(str(e))
tk_count = 0 tk_count = 0
cron_logger.info("Embedding elapsed({}): {}".format(r["name"], timer()-st))
callback(msg="Finished embedding({})! Start to build index!".format(timer()-st)) callback(msg="Finished embedding({})! Start to build index!".format(timer()-st))
init_kb(r) init_kb(r)
chunk_count = len(set([c["_id"] for c in cks])) chunk_count = len(set([c["_id"] for c in cks]))
st = timer() st = timer()
es_r = ELASTICSEARCH.bulk(cks, search.index_name(r["tenant_id"])) es_r = ELASTICSEARCH.bulk(cks, search.index_name(r["tenant_id"]))
cron_logger.info("Indexing elapsed({}): {}".format(r["name"], timer()-st))
if es_r: if es_r:
callback(-1, "Index failure!") callback(-1, "Index failure!")
ELASTICSEARCH.deleteByQuery( ELASTICSEARCH.deleteByQuery(