Storage: Support the s3, azure blob as the object storage of ragflow. (#2278)

### What problem does this PR solve?

issue: https://github.com/infiniflow/ragflow/issues/2277

_Briefly describe what this PR aims to solve. Include background context
that will help reviewers understand the purpose of the PR._

### Type of change

- [ ] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
- [ ] Documentation Update
- [ ] Refactoring
- [ ] Performance Improvement
- [ ] Other (please describe):

Co-authored-by: Kevin Hu <kevinhu.sh@gmail.com>
This commit is contained in:
Fachuan Bai 2024-09-09 09:41:14 +08:00 committed by GitHub
parent e85fea31a8
commit 8dd3adc443
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 395 additions and 38 deletions

View File

@ -39,7 +39,7 @@ from itsdangerous import URLSafeTimedSerializer
from api.utils.file_utils import filename_type, thumbnail from api.utils.file_utils import filename_type, thumbnail
from rag.nlp import keyword_extraction from rag.nlp import keyword_extraction
from rag.utils.minio_conn import MINIO from rag.utils.storage_factory import STORAGE_IMPL
from api.db.services.canvas_service import CanvasTemplateService, UserCanvasService from api.db.services.canvas_service import CanvasTemplateService, UserCanvasService
from agent.canvas import Canvas from agent.canvas import Canvas
@ -427,10 +427,10 @@ def upload():
retmsg="This type of file has not been supported yet!") retmsg="This type of file has not been supported yet!")
location = filename location = filename
while MINIO.obj_exist(kb_id, location): while STORAGE_IMPL.obj_exist(kb_id, location):
location += "_" location += "_"
blob = request.files['file'].read() blob = request.files['file'].read()
MINIO.put(kb_id, location, blob) STORAGE_IMPL.put(kb_id, location, blob)
doc = { doc = {
"id": get_uuid(), "id": get_uuid(),
"kb_id": kb.id, "kb_id": kb.id,
@ -650,7 +650,7 @@ def document_rm():
FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id]) FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id])
File2DocumentService.delete_by_document_id(doc_id) File2DocumentService.delete_by_document_id(doc_id)
MINIO.rm(b, n) STORAGE_IMPL.rm(b, n)
except Exception as e: except Exception as e:
errors += str(e) errors += str(e)
@ -723,7 +723,7 @@ def completion_faq():
if ans["reference"]["chunks"][chunk_idx]["img_id"]: if ans["reference"]["chunks"][chunk_idx]["img_id"]:
try: try:
bkt, nm = ans["reference"]["chunks"][chunk_idx]["img_id"].split("-") bkt, nm = ans["reference"]["chunks"][chunk_idx]["img_id"].split("-")
response = MINIO.get(bkt, nm) response = STORAGE_IMPL.get(bkt, nm)
data_type_picture["url"] = base64.b64encode(response).decode('utf-8') data_type_picture["url"] = base64.b64encode(response).decode('utf-8')
data.append(data_type_picture) data.append(data_type_picture)
break break

View File

@ -42,7 +42,7 @@ from api.utils.file_utils import filename_type, thumbnail
from rag.app import book, laws, manual, naive, one, paper, presentation, qa, resume, table, picture, audio, email from rag.app import book, laws, manual, naive, one, paper, presentation, qa, resume, table, picture, audio, email
from rag.nlp import search from rag.nlp import search
from rag.utils.es_conn import ELASTICSEARCH from rag.utils.es_conn import ELASTICSEARCH
from rag.utils.minio_conn import MINIO from rag.utils.storage_factory import STORAGE_IMPL
MAXIMUM_OF_UPLOADING_FILES = 256 MAXIMUM_OF_UPLOADING_FILES = 256
@ -352,7 +352,7 @@ def upload_documents(dataset_id):
# upload to the minio # upload to the minio
location = filename location = filename
while MINIO.obj_exist(dataset_id, location): while STORAGE_IMPL.obj_exist(dataset_id, location):
location += "_" location += "_"
blob = file.read() blob = file.read()
@ -361,7 +361,7 @@ def upload_documents(dataset_id):
if blob == b'': if blob == b'':
warnings.warn(f"[WARNING]: The content of the file {filename} is empty.") warnings.warn(f"[WARNING]: The content of the file {filename} is empty.")
MINIO.put(dataset_id, location, blob) STORAGE_IMPL.put(dataset_id, location, blob)
doc = { doc = {
"id": get_uuid(), "id": get_uuid(),
@ -441,7 +441,7 @@ def delete_document(document_id, dataset_id): # string
File2DocumentService.delete_by_document_id(document_id) File2DocumentService.delete_by_document_id(document_id)
# delete it from minio # delete it from minio
MINIO.rm(dataset_id, location) STORAGE_IMPL.rm(dataset_id, location)
except Exception as e: except Exception as e:
errors += str(e) errors += str(e)
if errors: if errors:
@ -596,7 +596,7 @@ def download_document(dataset_id, document_id):
# The process of downloading # The process of downloading
doc_id, doc_location = File2DocumentService.get_minio_address(doc_id=document_id) # minio address doc_id, doc_location = File2DocumentService.get_minio_address(doc_id=document_id) # minio address
file_stream = MINIO.get(doc_id, doc_location) file_stream = STORAGE_IMPL.get(doc_id, doc_location)
if not file_stream: if not file_stream:
return construct_json_result(message="This file is empty.", code=RetCode.DATA_ERROR) return construct_json_result(message="This file is empty.", code=RetCode.DATA_ERROR)
@ -737,7 +737,7 @@ def parsing_document_internal(id):
doc_id = doc_attributes["id"] doc_id = doc_attributes["id"]
bucket, doc_name = File2DocumentService.get_minio_address(doc_id=doc_id) bucket, doc_name = File2DocumentService.get_minio_address(doc_id=doc_id)
binary = MINIO.get(bucket, doc_name) binary = STORAGE_IMPL.get(bucket, doc_name)
parser_name = doc_attributes["parser_id"] parser_name = doc_attributes["parser_id"]
if binary: if binary:
res = doc_parse(binary, doc_name, parser_name, tenant_id, doc_id) res = doc_parse(binary, doc_name, parser_name, tenant_id, doc_id)

View File

@ -48,7 +48,7 @@ from api.db import FileType, TaskStatus, ParserType, FileSource, LLMType
from api.db.services.document_service import DocumentService, doc_upload_and_parse from api.db.services.document_service import DocumentService, doc_upload_and_parse
from api.settings import RetCode, stat_logger from api.settings import RetCode, stat_logger
from api.utils.api_utils import get_json_result from api.utils.api_utils import get_json_result
from rag.utils.minio_conn import MINIO from rag.utils.storage_factory import STORAGE_IMPL
from api.utils.file_utils import filename_type, thumbnail, get_project_base_directory from api.utils.file_utils import filename_type, thumbnail, get_project_base_directory
from api.utils.web_utils import html2pdf, is_valid_url from api.utils.web_utils import html2pdf, is_valid_url
@ -118,9 +118,9 @@ def web_crawl():
raise RuntimeError("This type of file has not been supported yet!") raise RuntimeError("This type of file has not been supported yet!")
location = filename location = filename
while MINIO.obj_exist(kb_id, location): while STORAGE_IMPL.obj_exist(kb_id, location):
location += "_" location += "_"
MINIO.put(kb_id, location, blob) STORAGE_IMPL.put(kb_id, location, blob)
doc = { doc = {
"id": get_uuid(), "id": get_uuid(),
"kb_id": kb.id, "kb_id": kb.id,
@ -307,7 +307,7 @@ def rm():
FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id]) FileService.filter_delete([File.source_type == FileSource.KNOWLEDGEBASE, File.id == f2d[0].file_id])
File2DocumentService.delete_by_document_id(doc_id) File2DocumentService.delete_by_document_id(doc_id)
MINIO.rm(b, n) STORAGE_IMPL.rm(b, n)
except Exception as e: except Exception as e:
errors += str(e) errors += str(e)
@ -394,7 +394,7 @@ def get(doc_id):
return get_data_error_result(retmsg="Document not found!") return get_data_error_result(retmsg="Document not found!")
b, n = File2DocumentService.get_minio_address(doc_id=doc_id) b, n = File2DocumentService.get_minio_address(doc_id=doc_id)
response = flask.make_response(MINIO.get(b, n)) response = flask.make_response(STORAGE_IMPL.get(b, n))
ext = re.search(r"\.([^.]+)$", doc.name) ext = re.search(r"\.([^.]+)$", doc.name)
if ext: if ext:
@ -458,7 +458,7 @@ def change_parser():
def get_image(image_id): def get_image(image_id):
try: try:
bkt, nm = image_id.split("-") bkt, nm = image_id.split("-")
response = flask.make_response(MINIO.get(bkt, nm)) response = flask.make_response(STORAGE_IMPL.get(bkt, nm))
response.headers.set('Content-Type', 'image/JPEG') response.headers.set('Content-Type', 'image/JPEG')
return response return response
except Exception as e: except Exception as e:

View File

@ -34,7 +34,7 @@ from api.utils.api_utils import get_json_result
from api.utils.file_utils import filename_type from api.utils.file_utils import filename_type
from rag.nlp import search from rag.nlp import search
from rag.utils.es_conn import ELASTICSEARCH from rag.utils.es_conn import ELASTICSEARCH
from rag.utils.minio_conn import MINIO from rag.utils.storage_factory import STORAGE_IMPL
@manager.route('/upload', methods=['POST']) @manager.route('/upload', methods=['POST'])
@ -98,7 +98,7 @@ def upload():
# file type # file type
filetype = filename_type(file_obj_names[file_len - 1]) filetype = filename_type(file_obj_names[file_len - 1])
location = file_obj_names[file_len - 1] location = file_obj_names[file_len - 1]
while MINIO.obj_exist(last_folder.id, location): while STORAGE_IMPL.obj_exist(last_folder.id, location):
location += "_" location += "_"
blob = file_obj.read() blob = file_obj.read()
filename = duplicate_name( filename = duplicate_name(
@ -260,7 +260,7 @@ def rm():
e, file = FileService.get_by_id(inner_file_id) e, file = FileService.get_by_id(inner_file_id)
if not e: if not e:
return get_data_error_result(retmsg="File not found!") return get_data_error_result(retmsg="File not found!")
MINIO.rm(file.parent_id, file.location) STORAGE_IMPL.rm(file.parent_id, file.location)
FileService.delete_folder_by_pf_id(current_user.id, file_id) FileService.delete_folder_by_pf_id(current_user.id, file_id)
else: else:
if not FileService.delete(file): if not FileService.delete(file):
@ -333,7 +333,7 @@ def get(file_id):
if not e: if not e:
return get_data_error_result(retmsg="Document not found!") return get_data_error_result(retmsg="Document not found!")
b, n = File2DocumentService.get_minio_address(file_id=file_id) b, n = File2DocumentService.get_minio_address(file_id=file_id)
response = flask.make_response(MINIO.get(b, n)) response = flask.make_response(STORAGE_IMPL.get(b, n))
ext = re.search(r"\.([^.]+)$", file.name) ext = re.search(r"\.([^.]+)$", file.name)
if ext: if ext:
if file.type == FileType.VISUAL.value: if file.type == FileType.VISUAL.value:

View File

@ -22,7 +22,7 @@ from api.utils.api_utils import get_json_result
from api.versions import get_rag_version from api.versions import get_rag_version
from rag.settings import SVR_QUEUE_NAME from rag.settings import SVR_QUEUE_NAME
from rag.utils.es_conn import ELASTICSEARCH from rag.utils.es_conn import ELASTICSEARCH
from rag.utils.minio_conn import MINIO from rag.utils.storage_factory import STORAGE_IMPL
from timeit import default_timer as timer from timeit import default_timer as timer
from rag.utils.redis_conn import REDIS_CONN from rag.utils.redis_conn import REDIS_CONN
@ -47,7 +47,7 @@ def status():
st = timer() st = timer()
try: try:
MINIO.health() STORAGE_IMPL.health()
res["minio"] = {"status": "green", "elapsed": "{:.1f}".format((timer() - st)*1000.)} res["minio"] = {"status": "green", "elapsed": "{:.1f}".format((timer() - st)*1000.)}
except Exception as e: except Exception as e:
res["minio"] = {"status": "red", "elapsed": "{:.1f}".format((timer() - st)*1000.), "error": str(e)} res["minio"] = {"status": "red", "elapsed": "{:.1f}".format((timer() - st)*1000.), "error": str(e)}

View File

@ -34,7 +34,7 @@ from api.utils.file_utils import get_project_base_directory
from graphrag.mind_map_extractor import MindMapExtractor from graphrag.mind_map_extractor import MindMapExtractor
from rag.settings import SVR_QUEUE_NAME from rag.settings import SVR_QUEUE_NAME
from rag.utils.es_conn import ELASTICSEARCH from rag.utils.es_conn import ELASTICSEARCH
from rag.utils.minio_conn import MINIO from rag.utils.storage_factory import STORAGE_IMPL
from rag.nlp import search, rag_tokenizer from rag.nlp import search, rag_tokenizer
from api.db import FileType, TaskStatus, ParserType, LLMType from api.db import FileType, TaskStatus, ParserType, LLMType
@ -473,7 +473,7 @@ def doc_upload_and_parse(conversation_id, file_objs, user_id):
else: else:
d["image"].save(output_buffer, format='JPEG') d["image"].save(output_buffer, format='JPEG')
MINIO.put(kb.id, d["_id"], output_buffer.getvalue()) STORAGE_IMPL.put(kb.id, d["_id"], output_buffer.getvalue())
d["img_id"] = "{}-{}".format(kb.id, d["_id"]) d["img_id"] = "{}-{}".format(kb.id, d["_id"])
del d["image"] del d["image"]
docs.append(d) docs.append(d)

View File

@ -27,7 +27,7 @@ from api.db.services.document_service import DocumentService
from api.db.services.file2document_service import File2DocumentService from api.db.services.file2document_service import File2DocumentService
from api.utils import get_uuid from api.utils import get_uuid
from api.utils.file_utils import filename_type, thumbnail from api.utils.file_utils import filename_type, thumbnail
from rag.utils.minio_conn import MINIO from rag.utils.storage_factory import STORAGE_IMPL
class FileService(CommonService): class FileService(CommonService):
@ -350,10 +350,10 @@ class FileService(CommonService):
raise RuntimeError("This type of file has not been supported yet!") raise RuntimeError("This type of file has not been supported yet!")
location = filename location = filename
while MINIO.obj_exist(kb.id, location): while STORAGE_IMPL.obj_exist(kb.id, location):
location += "_" location += "_"
blob = file.read() blob = file.read()
MINIO.put(kb.id, location, blob) STORAGE_IMPL.put(kb.id, location, blob)
doc = { doc = {
"id": get_uuid(), "id": get_uuid(),
"kb_id": kb.id, "kb_id": kb.id,

View File

@ -27,7 +27,7 @@ from api.db.services.document_service import DocumentService
from api.utils import current_timestamp, get_uuid from api.utils import current_timestamp, get_uuid
from deepdoc.parser.excel_parser import RAGFlowExcelParser from deepdoc.parser.excel_parser import RAGFlowExcelParser
from rag.settings import SVR_QUEUE_NAME from rag.settings import SVR_QUEUE_NAME
from rag.utils.minio_conn import MINIO from rag.utils.storage_factory import STORAGE_IMPL
from rag.utils.redis_conn import REDIS_CONN from rag.utils.redis_conn import REDIS_CONN
@ -143,7 +143,7 @@ def queue_tasks(doc, bucket, name):
tsks = [] tsks = []
if doc["type"] == FileType.PDF.value: if doc["type"] == FileType.PDF.value:
file_bin = MINIO.get(bucket, name) file_bin = STORAGE_IMPL.get(bucket, name)
do_layout = doc["parser_config"].get("layout_recognize", True) do_layout = doc["parser_config"].get("layout_recognize", True)
pages = PdfParser.total_page_number(doc["name"], file_bin) pages = PdfParser.total_page_number(doc["name"], file_bin)
page_size = doc["parser_config"].get("task_page_size", 12) page_size = doc["parser_config"].get("task_page_size", 12)
@ -169,7 +169,7 @@ def queue_tasks(doc, bucket, name):
tsks.append(task) tsks.append(task)
elif doc["parser_id"] == "table": elif doc["parser_id"] == "table":
file_bin = MINIO.get(bucket, name) file_bin = STORAGE_IMPL.get(bucket, name)
rn = RAGFlowExcelParser.row_number( rn = RAGFlowExcelParser.row_number(
doc["name"], file_bin) doc["name"], file_bin)
for i in range(0, rn, 3000): for i in range(0, rn, 3000):

View File

@ -13,6 +13,22 @@ minio:
user: 'rag_flow' user: 'rag_flow'
password: 'infini_rag_flow' password: 'infini_rag_flow'
host: 'minio:9000' host: 'minio:9000'
azure:
auth_type: 'sas'
container_url: 'container_url'
sas_token: 'sas_token'
#azure:
# auth_type: 'spn'
# account_url: 'account_url'
# client_id: 'client_id'
# secret: 'secret'
# tenant_id: 'tenant_id'
# container_name: 'container_name'
s3:
endpoint: 'endpoint'
access_key: 'access_key'
secret_key: 'secret_key'
region: 'region'
es: es:
hosts: 'http://es01:9200' hosts: 'http://es01:9200'
username: 'elastic' username: 'elastic'

View File

@ -24,6 +24,8 @@ RAG_CONF_PATH = os.path.join(get_project_base_directory(), "conf")
SUBPROCESS_STD_LOG_NAME = "std.log" SUBPROCESS_STD_LOG_NAME = "std.log"
ES = get_base_config("es", {}) ES = get_base_config("es", {})
AZURE = get_base_config("azure", {})
S3 = get_base_config("s3", {})
MINIO = decrypt_database_config(name="minio") MINIO = decrypt_database_config(name="minio")
try: try:
REDIS = decrypt_database_config(name="redis") REDIS = decrypt_database_config(name="redis")
@ -43,6 +45,8 @@ LoggerFactory.LEVEL = 30
es_logger = getLogger("es") es_logger = getLogger("es")
minio_logger = getLogger("minio") minio_logger = getLogger("minio")
s3_logger = getLogger("s3")
azure_logger = getLogger("azure")
cron_logger = getLogger("cron_logger") cron_logger = getLogger("cron_logger")
cron_logger.setLevel(20) cron_logger.setLevel(20)
chunk_logger = getLogger("chunk_logger") chunk_logger = getLogger("chunk_logger")

View File

@ -20,7 +20,7 @@ import traceback
from api.db.db_models import close_connection from api.db.db_models import close_connection
from api.db.services.task_service import TaskService from api.db.services.task_service import TaskService
from rag.settings import cron_logger from rag.settings import cron_logger
from rag.utils.minio_conn import MINIO from rag.utils.storage_factory import STORAGE_IMPL
from rag.utils.redis_conn import REDIS_CONN from rag.utils.redis_conn import REDIS_CONN
@ -42,7 +42,7 @@ def main():
try: try:
key = "{}/{}".format(kb_id, loc) key = "{}/{}".format(kb_id, loc)
if REDIS_CONN.exist(key):continue if REDIS_CONN.exist(key):continue
file_bin = MINIO.get(kb_id, loc) file_bin = STORAGE_IMPL.get(kb_id, loc)
REDIS_CONN.transaction(key, file_bin, 12 * 60) REDIS_CONN.transaction(key, file_bin, 12 * 60)
cron_logger.info("CACHE: {}".format(loc)) cron_logger.info("CACHE: {}".format(loc))
except Exception as e: except Exception as e:

View File

@ -29,7 +29,7 @@ from functools import partial
from api.db.services.file2document_service import File2DocumentService from api.db.services.file2document_service import File2DocumentService
from api.settings import retrievaler from api.settings import retrievaler
from rag.raptor import RecursiveAbstractiveProcessing4TreeOrganizedRetrieval as Raptor from rag.raptor import RecursiveAbstractiveProcessing4TreeOrganizedRetrieval as Raptor
from rag.utils.minio_conn import MINIO from rag.utils.storage_factory import STORAGE_IMPL
from api.db.db_models import close_connection from api.db.db_models import close_connection
from rag.settings import database_logger, SVR_QUEUE_NAME from rag.settings import database_logger, SVR_QUEUE_NAME
from rag.settings import cron_logger, DOC_MAXIMUM_SIZE from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
@ -138,7 +138,7 @@ def collect():
def get_minio_binary(bucket, name): def get_minio_binary(bucket, name):
return MINIO.get(bucket, name) return STORAGE_IMPL.get(bucket, name)
def build(row): def build(row):
@ -214,7 +214,7 @@ def build(row):
d["image"].save(output_buffer, format='JPEG') d["image"].save(output_buffer, format='JPEG')
st = timer() st = timer()
MINIO.put(row["kb_id"], d["_id"], output_buffer.getvalue()) STORAGE_IMPL.put(row["kb_id"], d["_id"], output_buffer.getvalue())
el += timer() - st el += timer() - st
except Exception as e: except Exception as e:
cron_logger.error(str(e)) cron_logger.error(str(e))

View File

@ -0,0 +1,80 @@
import os
import time
from io import BytesIO
from rag import settings
from rag.settings import azure_logger
from rag.utils import singleton
from azure.storage.blob import ContainerClient
@singleton
class RAGFlowAzureSasBlob(object):
def __init__(self):
self.conn = None
self.container_url = os.getenv('CONTAINER_URL', settings.AZURE["container_url"])
self.sas_token = os.getenv('SAS_TOKEN', settings.AZURE["sas_token"])
self.__open__()
def __open__(self):
try:
if self.conn:
self.__close__()
except Exception as e:
pass
try:
self.conn = ContainerClient.from_container_url(self.account_url + "?" + self.sas_token)
except Exception as e:
azure_logger.error(
"Fail to connect %s " % self.account_url + str(e))
def __close__(self):
del self.conn
self.conn = None
def health(self):
bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1"
return self.conn.upload_blob(name=fnm, data=BytesIO(binary), length=len(binary))
def put(self, bucket, fnm, binary):
for _ in range(3):
try:
return self.conn.upload_blob(name=fnm, data=BytesIO(binary), length=len(binary))
except Exception as e:
azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e))
self.__open__()
time.sleep(1)
def rm(self, bucket, fnm):
try:
self.conn.delete_blob(fnm)
except Exception as e:
azure_logger.error(f"Fail rm {bucket}/{fnm}: " + str(e))
def get(self, bucket, fnm):
for _ in range(1):
try:
r = self.conn.download_blob(fnm)
return r.read()
except Exception as e:
azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e))
self.__open__()
time.sleep(1)
return
def obj_exist(self, bucket, fnm):
try:
return self.conn.get_blob_client(fnm).exists()
except Exception as e:
azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e))
return False
def get_presigned_url(self, bucket, fnm, expires):
for _ in range(10):
try:
return self.conn.get_presigned_url("GET", bucket, fnm, expires)
except Exception as e:
azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e))
self.__open__()
time.sleep(1)
return

View File

@ -0,0 +1,90 @@
import os
import time
from rag import settings
from rag.settings import azure_logger
from rag.utils import singleton
from azure.identity import ClientSecretCredential, AzureAuthorityHosts
from azure.storage.filedatalake import FileSystemClient
@singleton
class RAGFlowAzureSpnBlob(object):
def __init__(self):
self.conn = None
self.account_url = os.getenv('ACCOUNT_URL', settings.AZURE["account_url"])
self.client_id = os.getenv('CLIENT_ID', settings.AZURE["client_id"])
self.secret = os.getenv('SECRET', settings.AZURE["secret"])
self.tenant_id = os.getenv('TENANT_ID', settings.AZURE["tenant_id"])
self.container_name = os.getenv('CONTAINER_NAME', settings.AZURE["container_name"])
self.__open__()
def __open__(self):
try:
if self.conn:
self.__close__()
except Exception as e:
pass
try:
credentials = ClientSecretCredential(tenant_id=self.tenant_id, client_id=self.client_id, client_secret=self.secret, authority=AzureAuthorityHosts.AZURE_CHINA)
self.conn = FileSystemClient(account_url=self.account_url, file_system_name=self.container_name, credential=credentials)
except Exception as e:
azure_logger.error(
"Fail to connect %s " % self.account_url + str(e))
def __close__(self):
del self.conn
self.conn = None
def health(self):
bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1"
f = self.conn.create_file(fnm)
f.append_data(binary, offset=0, length=len(binary))
return f.flush_data(len(binary))
def put(self, bucket, fnm, binary):
for _ in range(3):
try:
f = self.conn.create_file(fnm)
f.append_data(binary, offset=0, length=len(binary))
return f.flush_data(len(binary))
except Exception as e:
azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e))
self.__open__()
time.sleep(1)
def rm(self, bucket, fnm):
try:
self.conn.delete_file(fnm)
except Exception as e:
azure_logger.error(f"Fail rm {bucket}/{fnm}: " + str(e))
def get(self, bucket, fnm):
for _ in range(1):
try:
client = self.conn.get_file_client(fnm)
r = client.download_file()
return r.read()
except Exception as e:
azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e))
self.__open__()
time.sleep(1)
return
def obj_exist(self, bucket, fnm):
try:
client = self.conn.get_file_client(fnm)
return client.exists()
except Exception as e:
azure_logger.error(f"Fail put {bucket}/{fnm}: " + str(e))
return False
def get_presigned_url(self, bucket, fnm, expires):
for _ in range(10):
try:
return self.conn.get_presigned_url("GET", bucket, fnm, expires)
except Exception as e:
azure_logger.error(f"fail get {bucket}/{fnm}: " + str(e))
self.__open__()
time.sleep(1)
return

135
rag/utils/s3_conn.py Normal file
View File

@ -0,0 +1,135 @@
import boto3
import os
from botocore.exceptions import ClientError
from botocore.client import Config
import time
from io import BytesIO
from rag.settings import s3_logger
from rag.utils import singleton
@singleton
class RAGFlowS3(object):
def __init__(self):
self.conn = None
self.endpoint = os.getenv('ENDPOINT', None)
self.access_key = os.getenv('ACCESS_KEY', None)
self.secret_key = os.getenv('SECRET_KEY', None)
self.region = os.getenv('REGION', None)
self.__open__()
def __open__(self):
try:
if self.conn:
self.__close__()
except Exception as e:
pass
try:
config = Config(
s3={
'addressing_style': 'virtual'
}
)
self.conn = boto3.client(
's3',
endpoint_url=self.endpoint,
region_name=self.region,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
config=config
)
except Exception as e:
s3_logger.error(
"Fail to connect %s " % self.endpoint + str(e))
def __close__(self):
del self.conn
self.conn = None
def bucket_exists(self, bucket):
try:
s3_logger.error(f"head_bucket bucketname {bucket}")
self.conn.head_bucket(Bucket=bucket)
exists = True
except ClientError as e:
s3_logger.error(f"head_bucket error {bucket}: " + str(e))
exists = False
return exists
def health(self):
bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1"
if not self.bucket_exists(bucket):
self.conn.create_bucket(Bucket=bucket)
s3_logger.error(f"create bucket {bucket} ********")
r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm)
return r
def get_properties(self, bucket, key):
return {}
def list(self, bucket, dir, recursive=True):
return []
def put(self, bucket, fnm, binary):
s3_logger.error(f"bucket name {bucket}; filename :{fnm}:")
for _ in range(1):
try:
if not self.bucket_exists(bucket):
self.conn.create_bucket(Bucket=bucket)
s3_logger.error(f"create bucket {bucket} ********")
r = self.conn.upload_fileobj(BytesIO(binary), bucket, fnm)
return r
except Exception as e:
s3_logger.error(f"Fail put {bucket}/{fnm}: " + str(e))
self.__open__()
time.sleep(1)
def rm(self, bucket, fnm):
try:
self.conn.delete_object(Bucket=bucket, Key=fnm)
except Exception as e:
s3_logger.error(f"Fail rm {bucket}/{fnm}: " + str(e))
def get(self, bucket, fnm):
for _ in range(1):
try:
r = self.conn.get_object(Bucket=bucket, Key=fnm)
object_data = r['Body'].read()
return object_data
except Exception as e:
s3_logger.error(f"fail get {bucket}/{fnm}: " + str(e))
self.__open__()
time.sleep(1)
return
def obj_exist(self, bucket, fnm):
try:
if self.conn.head_object(Bucket=bucket, Key=fnm):
return True
except ClientError as e:
if e.response['Error']['Code'] == '404':
return False
else:
raise
def get_presigned_url(self, bucket, fnm, expires):
for _ in range(10):
try:
r = self.conn.generate_presigned_url('get_object',
Params={'Bucket': bucket,
'Key': fnm},
ExpiresIn=expires)
return r
except Exception as e:
s3_logger.error(f"fail get url {bucket}/{fnm}: " + str(e))
self.__open__()
time.sleep(1)
return

View File

@ -0,0 +1,30 @@
import os
from enum import Enum
from rag.utils.azure_sas_conn import RAGFlowAzureSasBlob
from rag.utils.azure_spn_conn import RAGFlowAzureSpnBlob
from rag.utils.minio_conn import RAGFlowMinio
from rag.utils.s3_conn import RAGFlowS3
class Storage(Enum):
MINIO = 1
AZURE_SPN = 2
AZURE_SAS = 3
AWS_S3 = 4
class StorageFactory:
storage_mapping = {
Storage.MINIO: RAGFlowMinio,
Storage.AZURE_SPN: RAGFlowAzureSpnBlob,
Storage.AZURE_SAS: RAGFlowAzureSasBlob,
Storage.AWS_S3: RAGFlowS3,
}
@classmethod
def create(cls, storage: Storage):
return cls.storage_mapping[storage]()
STORAGE_IMPL = StorageFactory.create(Storage[os.getenv('STORAGE_IMPL', 'MINIO')])

View File

@ -1,5 +1,7 @@
akshare==1.14.72 azure-storage-blob==12.22.0
anthropic==0.34.1 azure-identity==1.17.1
azure-storage-file-datalake==12.16.0
anthropic===0.34.1
arxiv==2.1.3 arxiv==2.1.3
Aspose.Slides==24.2.0 Aspose.Slides==24.2.0
BCEmbedding==0.1.3 BCEmbedding==0.1.3