add redis to accelerate access of minio (#482)

### What problem does this PR solve?

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
KevinHuSh 2024-04-22 14:11:09 +08:00 committed by GitHub
parent fc87c20bd8
commit b8e58fe27a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 101 additions and 16 deletions

View File

@ -54,7 +54,7 @@ app.errorhandler(Exception)(server_error_response)
#app.config["LOGIN_DISABLED"] = True #app.config["LOGIN_DISABLED"] = True
app.config["SESSION_PERMANENT"] = False app.config["SESSION_PERMANENT"] = False
app.config["SESSION_TYPE"] = "filesystem" app.config["SESSION_TYPE"] = "filesystem"
app.config['MAX_CONTENT_LENGTH'] = os.environ.get("MAX_CONTENT_LENGTH", 128 * 1024 * 1024) app.config['MAX_CONTENT_LENGTH'] = int(os.environ.get("MAX_CONTENT_LENGTH", 128 * 1024 * 1024))
Session(app) Session(app)
login_manager = LoginManager() login_manager = LoginManager()

View File

@ -147,7 +147,7 @@ def filename_type(filename):
return FileType.PDF.value return FileType.PDF.value
if re.match( if re.match(
r".*\.(docx|doc|ppt|pptx|yml|xml|htm|json|csv|txt|ini|xls|xlsx|wps|rtf|hlp|pages|numbers|key|md)$", filename): r".*\.(docx|ppt|pptx|yml|xml|htm|json|csv|txt|ini|xls|xlsx|wps|rtf|hlp|pages|numbers|key|md)$", filename):
return FileType.DOC.value return FileType.DOC.value
if re.match( if re.match(

View File

@ -1,7 +1,7 @@
{ {
"settings": { "settings": {
"index": { "index": {
"number_of_shards": 4, "number_of_shards": 2,
"number_of_replicas": 0, "number_of_replicas": 0,
"refresh_interval" : "1000ms" "refresh_interval" : "1000ms"
}, },

View File

@ -13,11 +13,16 @@ minio:
user: 'rag_flow' user: 'rag_flow'
password: 'infini_rag_flow' password: 'infini_rag_flow'
host: 'minio:9000' host: 'minio:9000'
redis:
db: 1
password: 'infini_rag_flow'
host: 'redis:6379'
es: es:
hosts: 'http://es01:9200' hosts: 'http://es01:9200'
user_default_llm: user_default_llm:
factory: 'Tongyi-Qianwen' factory: 'Tongyi-Qianwen'
api_key: 'sk-xxxxxxxxxxxxx' api_key: 'sk-xxxxxxxxxxxxx'
base_url: ''
oauth: oauth:
github: github:
client_id: xxxxxxxxxxxxxxxxxxxxxxxxx client_id: xxxxxxxxxxxxxxxxxxxxxxxxx

View File

@ -13,6 +13,10 @@ minio:
user: 'rag_flow' user: 'rag_flow'
password: 'infini_rag_flow' password: 'infini_rag_flow'
host: 'minio:9000' host: 'minio:9000'
redis:
db: 1
password: 'infini_rag_flow'
host: 'redis:6379'
es: es:
hosts: 'http://es01:9200' hosts: 'http://es01:9200'
user_default_llm: user_default_llm:

View File

@ -66,6 +66,8 @@ class Docx(DocxParser):
class Pdf(PdfParser): class Pdf(PdfParser):
def __call__(self, filename, binary=None, from_page=0, def __call__(self, filename, binary=None, from_page=0,
to_page=100000, zoomin=3, callback=None): to_page=100000, zoomin=3, callback=None):
from timeit import default_timer as timer
start = timer()
callback(msg="OCR is running...") callback(msg="OCR is running...")
self.__images__( self.__images__(
filename if not binary else binary, filename if not binary else binary,
@ -75,8 +77,8 @@ class Pdf(PdfParser):
callback callback
) )
callback(msg="OCR finished") callback(msg="OCR finished")
cron_logger.info("OCR: {}".format(timer() - start))
from timeit import default_timer as timer
start = timer() start = timer()
self._layouts_rec(zoomin) self._layouts_rec(zoomin)
callback(0.63, "Layout analysis finished.") callback(0.63, "Layout analysis finished.")
@ -90,7 +92,7 @@ class Pdf(PdfParser):
self._concat_downward() self._concat_downward()
#self._filter_forpages() #self._filter_forpages()
cron_logger.info("paddle layouts:".format( cron_logger.info("paddle layouts: {}".format(
(timer() - start) / (self.total_page + 0.1))) (timer() - start) / (self.total_page + 0.1)))
return [(b["text"], self._line_tag(b, zoomin)) return [(b["text"], self._line_tag(b, zoomin))
for b in self.boxes], tbls for b in self.boxes], tbls

View File

@ -25,6 +25,11 @@ SUBPROCESS_STD_LOG_NAME = "std.log"
ES = get_base_config("es", {}) ES = get_base_config("es", {})
MINIO = decrypt_database_config(name="minio") MINIO = decrypt_database_config(name="minio")
try:
REDIS = decrypt_database_config(name="redis")
except Exception as e:
REDIS = {}
pass
DOC_MAXIMUM_SIZE = 128 * 1024 * 1024 DOC_MAXIMUM_SIZE = 128 * 1024 * 1024
# Logger # Logger
@ -39,5 +44,6 @@ LoggerFactory.LEVEL = 30
es_logger = getLogger("es") es_logger = getLogger("es")
minio_logger = getLogger("minio") minio_logger = getLogger("minio")
cron_logger = getLogger("cron_logger") cron_logger = getLogger("cron_logger")
cron_logger.setLevel(20)
chunk_logger = getLogger("chunk_logger") chunk_logger = getLogger("chunk_logger")
database_logger = getLogger("database") database_logger = getLogger("database")

View File

@ -32,6 +32,7 @@ from api.db.services.document_service import DocumentService
from api.settings import database_logger from api.settings import database_logger
from api.utils import get_format_time, get_uuid from api.utils import get_format_time, get_uuid
from api.utils.file_utils import get_project_base_directory from api.utils.file_utils import get_project_base_directory
from rag.utils.redis_conn import REDIS_CONN
def collect(tm): def collect(tm):
@ -84,10 +85,16 @@ def dispatch():
tsks = [] tsks = []
try: try:
file_bin = MINIO.get(r["kb_id"], r["location"])
if REDIS_CONN.is_alive():
try:
REDIS_CONN.set("{}/{}".format(r["kb_id"], r["location"]), file_bin, 12*60)
except Exception as e:
cron_logger.warning("Put into redis[EXCEPTION]:" + str(e))
if r["type"] == FileType.PDF.value: if r["type"] == FileType.PDF.value:
do_layout = r["parser_config"].get("layout_recognize", True) do_layout = r["parser_config"].get("layout_recognize", True)
pages = PdfParser.total_page_number( pages = PdfParser.total_page_number(r["name"], file_bin)
r["name"], MINIO.get(r["kb_id"], r["location"]))
page_size = r["parser_config"].get("task_page_size", 12) page_size = r["parser_config"].get("task_page_size", 12)
if r["parser_id"] == "paper": if r["parser_id"] == "paper":
page_size = r["parser_config"].get("task_page_size", 22) page_size = r["parser_config"].get("task_page_size", 22)
@ -110,8 +117,7 @@ def dispatch():
elif r["parser_id"] == "table": elif r["parser_id"] == "table":
rn = HuExcelParser.row_number( rn = HuExcelParser.row_number(
r["name"], MINIO.get( r["name"], file_bin)
r["kb_id"], r["location"]))
for i in range(0, rn, 3000): for i in range(0, rn, 3000):
task = new_task() task = new_task()
task["from_page"] = i task["from_page"] = i

View File

@ -19,13 +19,12 @@ import logging
import os import os
import hashlib import hashlib
import copy import copy
import random
import re import re
import sys import sys
import time import time
import traceback import traceback
from functools import partial from functools import partial
from rag.utils import MINIO
from api.db.db_models import close_connection from api.db.db_models import close_connection
from rag.settings import database_logger from rag.settings import database_logger
from rag.settings import cron_logger, DOC_MAXIMUM_SIZE from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
@ -35,7 +34,7 @@ from elasticsearch_dsl import Q
from multiprocessing.context import TimeoutError from multiprocessing.context import TimeoutError
from api.db.services.task_service import TaskService from api.db.services.task_service import TaskService
from rag.utils import ELASTICSEARCH from rag.utils import ELASTICSEARCH
from rag.utils import MINIO from timeit import default_timer as timer
from rag.utils import rmSpace, findMaxTm from rag.utils import rmSpace, findMaxTm
from rag.nlp import search from rag.nlp import search
@ -48,6 +47,7 @@ from api.db import LLMType, ParserType
from api.db.services.document_service import DocumentService from api.db.services.document_service import DocumentService
from api.db.services.llm_service import LLMBundle from api.db.services.llm_service import LLMBundle
from api.utils.file_utils import get_project_base_directory from api.utils.file_utils import get_project_base_directory
from rag.utils.redis_conn import REDIS_CONN
BATCH_SIZE = 64 BATCH_SIZE = 64
@ -105,11 +105,16 @@ def collect(comm, mod, tm):
def get_minio_binary(bucket, name): def get_minio_binary(bucket, name):
global MINIO global MINIO
if REDIS_CONN.is_alive():
try:
r = REDIS_CONN.get("{}/{}".format(bucket, name))
if r: return r
except Exception as e:
cron_logger.warning("Get redis[EXCEPTION]:" + str(e))
return MINIO.get(bucket, name) return MINIO.get(bucket, name)
def build(row): def build(row):
from timeit import default_timer as timer
if row["size"] > DOC_MAXIMUM_SIZE: if row["size"] > DOC_MAXIMUM_SIZE:
set_progress(row["id"], prog=-1, msg="File size exceeds( <= %dMb )" % set_progress(row["id"], prog=-1, msg="File size exceeds( <= %dMb )" %
(int(DOC_MAXIMUM_SIZE / 1024 / 1024))) (int(DOC_MAXIMUM_SIZE / 1024 / 1024)))
@ -265,6 +270,7 @@ def main(comm, mod):
callback( callback(
msg="Finished slicing files(%d). Start to embedding the content." % msg="Finished slicing files(%d). Start to embedding the content." %
len(cks)) len(cks))
st = timer()
try: try:
tk_count = embedding(cks, embd_mdl, r["parser_config"], callback) tk_count = embedding(cks, embd_mdl, r["parser_config"], callback)
except Exception as e: except Exception as e:
@ -272,9 +278,10 @@ def main(comm, mod):
cron_logger.error(str(e)) cron_logger.error(str(e))
tk_count = 0 tk_count = 0
callback(msg="Finished embedding! Start to build index!") callback(msg="Finished embedding({})! Start to build index!".format(timer()-st))
init_kb(r) init_kb(r)
chunk_count = len(set([c["_id"] for c in cks])) chunk_count = len(set([c["_id"] for c in cks]))
st = timer()
es_r = ELASTICSEARCH.bulk(cks, search.index_name(r["tenant_id"])) es_r = ELASTICSEARCH.bulk(cks, search.index_name(r["tenant_id"]))
if es_r: if es_r:
callback(-1, "Index failure!") callback(-1, "Index failure!")
@ -290,8 +297,8 @@ def main(comm, mod):
DocumentService.increment_chunk_num( DocumentService.increment_chunk_num(
r["doc_id"], r["kb_id"], tk_count, chunk_count, 0) r["doc_id"], r["kb_id"], tk_count, chunk_count, 0)
cron_logger.info( cron_logger.info(
"Chunk doc({}), token({}), chunks({})".format( "Chunk doc({}), token({}), chunks({}), elapsed:{}".format(
r["id"], tk_count, len(cks))) r["id"], tk_count, len(cks), timer()-st))
tmf.write(str(r["update_time"]) + "\n") tmf.write(str(r["update_time"]) + "\n")
tmf.close() tmf.close()

55
rag/utils/redis_conn.py Normal file
View File

@ -0,0 +1,55 @@
import json
import redis
import logging
from rag import settings
from rag.utils import singleton
@singleton
class RedisDB:
def __init__(self):
self.REDIS = None
self.config = settings.REDIS
self.__open__()
def __open__(self):
try:
self.REDIS = redis.Redis(host=self.config.get("host", "redis").split(":")[0],
port=int(self.config.get("host", ":6379").split(":")[1]),
db=int(self.config.get("db", 1)),
password=self.config.get("password"))
except Exception as e:
logging.warning("Redis can't be connected.")
return self.REDIS
def is_alive(self):
return self.REDIS is not None
def get(self, k):
if not self.REDIS: return
try:
return self.REDIS.get(k)
except Exception as e:
logging.warning("[EXCEPTION]get" + str(k) + "||" + str(e))
self.__open__()
def set_obj(self, k, obj, exp=3600):
try:
self.REDIS.set(k, json.dumps(obj, ensure_ascii=False), exp)
return True
except Exception as e:
logging.warning("[EXCEPTION]set_obj" + str(k) + "||" + str(e))
self.__open__()
return False
def set(self, k, v, exp=3600):
try:
self.REDIS.set(k, v, exp)
return True
except Exception as e:
logging.warning("[EXCEPTION]set" + str(k) + "||" + str(e))
self.__open__()
return False
REDIS_CONN = RedisDB()