From 338e4669e57640092a31b823fd248e7d4f6c2af1 Mon Sep 17 00:00:00 2001 From: Jyong <76649700+JohnJyong@users.noreply.github.com> Date: Mon, 29 Apr 2024 18:22:03 +0800 Subject: [PATCH] add storage factory (#3922) --- api/extensions/ext_storage.py | 239 +++-------------------- api/extensions/storage/aliyun_storage.py | 48 +++++ api/extensions/storage/azure_storage.py | 58 ++++++ api/extensions/storage/base_storage.py | 38 ++++ api/extensions/storage/google_storage.py | 56 ++++++ api/extensions/storage/local_storage.py | 88 +++++++++ api/extensions/storage/s3_storage.py | 68 +++++++ 7 files changed, 384 insertions(+), 211 deletions(-) create mode 100644 api/extensions/storage/aliyun_storage.py create mode 100644 api/extensions/storage/azure_storage.py create mode 100644 api/extensions/storage/base_storage.py create mode 100644 api/extensions/storage/google_storage.py create mode 100644 api/extensions/storage/local_storage.py create mode 100644 api/extensions/storage/s3_storage.py diff --git a/api/extensions/ext_storage.py b/api/extensions/ext_storage.py index db66df88d8..b59725d2d5 100644 --- a/api/extensions/ext_storage.py +++ b/api/extensions/ext_storage.py @@ -1,90 +1,42 @@ -import base64 -import os -import shutil from collections.abc import Generator -from contextlib import closing -from datetime import datetime, timedelta, timezone from typing import Union -import boto3 -import oss2 as aliyun_s3 -from azure.storage.blob import AccountSasPermissions, BlobServiceClient, ResourceTypes, generate_account_sas -from botocore.client import Config -from botocore.exceptions import ClientError from flask import Flask -from google.cloud import storage as GoogleStorage + +from extensions.storage.aliyun_storage import AliyunStorage +from extensions.storage.azure_storage import AzureStorage +from extensions.storage.google_storage import GoogleStorage +from extensions.storage.local_storage import LocalStorage +from extensions.storage.s3_storage import S3Storage class Storage: def __init__(self): - self.storage_type = None - self.bucket_name = None - self.client = None - self.folder = None + self.storage_runner = None def init_app(self, app: Flask): - self.storage_type = app.config.get('STORAGE_TYPE') - if self.storage_type == 's3': - self.bucket_name = app.config.get('S3_BUCKET_NAME') - self.client = boto3.client( - 's3', - aws_secret_access_key=app.config.get('S3_SECRET_KEY'), - aws_access_key_id=app.config.get('S3_ACCESS_KEY'), - endpoint_url=app.config.get('S3_ENDPOINT'), - region_name=app.config.get('S3_REGION'), - config=Config(s3={'addressing_style': app.config.get('S3_ADDRESS_STYLE')}) + storage_type = app.config.get('STORAGE_TYPE') + if storage_type == 's3': + self.storage_runner = S3Storage( + app=app ) - elif self.storage_type == 'azure-blob': - self.bucket_name = app.config.get('AZURE_BLOB_CONTAINER_NAME') - sas_token = generate_account_sas( - account_name=app.config.get('AZURE_BLOB_ACCOUNT_NAME'), - account_key=app.config.get('AZURE_BLOB_ACCOUNT_KEY'), - resource_types=ResourceTypes(service=True, container=True, object=True), - permission=AccountSasPermissions(read=True, write=True, delete=True, list=True, add=True, create=True), - expiry=datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(hours=1) + elif storage_type == 'azure-blob': + self.storage_runner = AzureStorage( + app=app ) - self.client = BlobServiceClient(account_url=app.config.get('AZURE_BLOB_ACCOUNT_URL'), - credential=sas_token) - elif self.storage_type == 'aliyun-oss': - self.bucket_name = app.config.get('ALIYUN_OSS_BUCKET_NAME') - self.client = aliyun_s3.Bucket( - aliyun_s3.Auth(app.config.get('ALIYUN_OSS_ACCESS_KEY'), app.config.get('ALIYUN_OSS_SECRET_KEY')), - app.config.get('ALIYUN_OSS_ENDPOINT'), - self.bucket_name, - connect_timeout=30 + elif storage_type == 'aliyun-oss': + self.storage_runner = AliyunStorage( + app=app + ) + elif storage_type == 'google-storage': + self.storage_runner = GoogleStorage( + app=app ) - elif self.storage_type == 'google-storage': - self.bucket_name = app.config.get('GOOGLE_STORAGE_BUCKET_NAME') - service_account_json = base64.b64decode(app.config.get('GOOGLE_STORAGE_SERVICE_ACCOUNT_JSON_BASE64')).decode('utf-8') - self.client = GoogleStorage.Client().from_service_account_json(service_account_json) else: - self.folder = app.config.get('STORAGE_LOCAL_PATH') - if not os.path.isabs(self.folder): - self.folder = os.path.join(app.root_path, self.folder) + self.storage_runner = LocalStorage(app=app) def save(self, filename, data): - if self.storage_type == 's3': - self.client.put_object(Bucket=self.bucket_name, Key=filename, Body=data) - elif self.storage_type == 'azure-blob': - blob_container = self.client.get_container_client(container=self.bucket_name) - blob_container.upload_blob(filename, data) - elif self.storage_type == 'aliyun-oss': - self.client.put_object(filename, data) - elif self.storage_type == 'google-storage': - bucket = self.client.get_bucket(self.bucket_name) - blob = bucket.blob(filename) - blob.upload_from_file(data) - else: - if not self.folder or self.folder.endswith('/'): - filename = self.folder + filename - else: - filename = self.folder + '/' + filename - - folder = os.path.dirname(filename) - os.makedirs(folder, exist_ok=True) - - with open(os.path.join(os.getcwd(), filename), "wb") as f: - f.write(data) + self.storage_runner.save(filename, data) def load(self, filename: str, stream: bool = False) -> Union[bytes, Generator]: if stream: @@ -93,154 +45,19 @@ class Storage: return self.load_once(filename) def load_once(self, filename: str) -> bytes: - if self.storage_type == 's3': - try: - with closing(self.client) as client: - data = client.get_object(Bucket=self.bucket_name, Key=filename)['Body'].read() - except ClientError as ex: - if ex.response['Error']['Code'] == 'NoSuchKey': - raise FileNotFoundError("File not found") - else: - raise - elif self.storage_type == 'azure-blob': - blob = self.client.get_container_client(container=self.bucket_name) - blob = blob.get_blob_client(blob=filename) - data = blob.download_blob().readall() - elif self.storage_type == 'aliyun-oss': - with closing(self.client.get_object(filename)) as obj: - data = obj.read() - elif self.storage_type == 'google-storage': - bucket = self.client.get_bucket(self.bucket_name) - blob = bucket.get_blob(filename) - data = blob.download_as_bytes() - else: - if not self.folder or self.folder.endswith('/'): - filename = self.folder + filename - else: - filename = self.folder + '/' + filename - - if not os.path.exists(filename): - raise FileNotFoundError("File not found") - - with open(filename, "rb") as f: - data = f.read() - - return data + return self.storage_runner.load_once(filename) def load_stream(self, filename: str) -> Generator: - def generate(filename: str = filename) -> Generator: - if self.storage_type == 's3': - try: - with closing(self.client) as client: - response = client.get_object(Bucket=self.bucket_name, Key=filename) - for chunk in response['Body'].iter_chunks(): - yield chunk - except ClientError as ex: - if ex.response['Error']['Code'] == 'NoSuchKey': - raise FileNotFoundError("File not found") - else: - raise - elif self.storage_type == 'azure-blob': - blob = self.client.get_blob_client(container=self.bucket_name, blob=filename) - with closing(blob.download_blob()) as blob_stream: - while chunk := blob_stream.readall(4096): - yield chunk - elif self.storage_type == 'aliyun-oss': - with closing(self.client.get_object(filename)) as obj: - while chunk := obj.read(4096): - yield chunk - elif self.storage_type == 'google-storage': - bucket = self.client.get_bucket(self.bucket_name) - blob = bucket.get_blob(filename) - with closing(blob.open(mode='rb')) as blob_stream: - while chunk := blob_stream.read(4096): - yield chunk - else: - if not self.folder or self.folder.endswith('/'): - filename = self.folder + filename - else: - filename = self.folder + '/' + filename - - if not os.path.exists(filename): - raise FileNotFoundError("File not found") - - with open(filename, "rb") as f: - while chunk := f.read(4096): # Read in chunks of 4KB - yield chunk - - return generate() + return self.storage_runner.load_stream(filename) def download(self, filename, target_filepath): - if self.storage_type == 's3': - with closing(self.client) as client: - client.download_file(self.bucket_name, filename, target_filepath) - elif self.storage_type == 'azure-blob': - blob = self.client.get_blob_client(container=self.bucket_name, blob=filename) - with open(target_filepath, "wb") as my_blob: - blob_data = blob.download_blob() - blob_data.readinto(my_blob) - elif self.storage_type == 'aliyun-oss': - self.client.get_object_to_file(filename, target_filepath) - elif self.storage_type == 'google-storage': - bucket = self.client.get_bucket(self.bucket_name) - blob = bucket.get_blob(filename) - with open(target_filepath, "wb") as my_blob: - blob_data = blob.download_blob() - blob_data.readinto(my_blob) - else: - if not self.folder or self.folder.endswith('/'): - filename = self.folder + filename - else: - filename = self.folder + '/' + filename - - if not os.path.exists(filename): - raise FileNotFoundError("File not found") - - shutil.copyfile(filename, target_filepath) + self.storage_runner.download(filename, target_filepath) def exists(self, filename): - if self.storage_type == 's3': - with closing(self.client) as client: - try: - client.head_object(Bucket=self.bucket_name, Key=filename) - return True - except: - return False - elif self.storage_type == 'azure-blob': - blob = self.client.get_blob_client(container=self.bucket_name, blob=filename) - return blob.exists() - elif self.storage_type == 'aliyun-oss': - return self.client.object_exists(filename) - elif self.storage_type == 'google-storage': - bucket = self.client.get_bucket(self.bucket_name) - blob = bucket.blob(filename) - return blob.exists() - else: - if not self.folder or self.folder.endswith('/'): - filename = self.folder + filename - else: - filename = self.folder + '/' + filename - - return os.path.exists(filename) + return self.storage_runner.exists(filename) def delete(self, filename): - if self.storage_type == 's3': - self.client.delete_object(Bucket=self.bucket_name, Key=filename) - elif self.storage_type == 'azure-blob': - blob_container = self.client.get_container_client(container=self.bucket_name) - blob_container.delete_blob(filename) - elif self.storage_type == 'aliyun-oss': - self.client.delete_object(filename) - elif self.storage_type == 'google-storage': - bucket = self.client.get_bucket(self.bucket_name) - bucket.delete_blob(filename) - else: - if not self.folder or self.folder.endswith('/'): - filename = self.folder + filename - else: - filename = self.folder + '/' + filename - if os.path.exists(filename): - os.remove(filename) + return self.storage_runner.delete(filename) storage = Storage() diff --git a/api/extensions/storage/aliyun_storage.py b/api/extensions/storage/aliyun_storage.py new file mode 100644 index 0000000000..e09ed60795 --- /dev/null +++ b/api/extensions/storage/aliyun_storage.py @@ -0,0 +1,48 @@ +from collections.abc import Generator +from contextlib import closing + +import oss2 as aliyun_s3 +from flask import Flask + +from extensions.storage.base_storage import BaseStorage + + +class AliyunStorage(BaseStorage): + """Implementation for aliyun storage. + """ + + def __init__(self, app: Flask): + super().__init__(app) + app_config = self.app.config + self.bucket_name = app_config.get('ALIYUN_OSS_BUCKET_NAME') + self.client = aliyun_s3.Bucket( + aliyun_s3.Auth(app_config.get('ALIYUN_OSS_ACCESS_KEY'), app_config.get('ALIYUN_OSS_SECRET_KEY')), + app_config.get('ALIYUN_OSS_ENDPOINT'), + self.bucket_name, + connect_timeout=30 + ) + + def save(self, filename, data): + self.client.put_object(filename, data) + + def load_once(self, filename: str) -> bytes: + with closing(self.client.get_object(filename)) as obj: + data = obj.read() + return data + + def load_stream(self, filename: str) -> Generator: + def generate(filename: str = filename) -> Generator: + with closing(self.client.get_object(filename)) as obj: + while chunk := obj.read(4096): + yield chunk + + return generate() + + def download(self, filename, target_filepath): + self.client.get_object_to_file(filename, target_filepath) + + def exists(self, filename): + return self.client.object_exists(filename) + + def delete(self, filename): + self.client.delete_object(filename) diff --git a/api/extensions/storage/azure_storage.py b/api/extensions/storage/azure_storage.py new file mode 100644 index 0000000000..01de8bab94 --- /dev/null +++ b/api/extensions/storage/azure_storage.py @@ -0,0 +1,58 @@ +from collections.abc import Generator +from contextlib import closing +from datetime import datetime, timedelta, timezone + +from azure.storage.blob import AccountSasPermissions, BlobServiceClient, ResourceTypes, generate_account_sas +from flask import Flask + +from extensions.storage.base_storage import BaseStorage + + +class AzureStorage(BaseStorage): + """Implementation for azure storage. + """ + def __init__(self, app: Flask): + super().__init__(app) + app_config = self.app.config + self.bucket_name = app_config.get('AZURE_STORAGE_CONTAINER_NAME') + sas_token = generate_account_sas( + account_name=app_config.get('AZURE_BLOB_ACCOUNT_NAME'), + account_key=app_config.get('AZURE_BLOB_ACCOUNT_KEY'), + resource_types=ResourceTypes(service=True, container=True, object=True), + permission=AccountSasPermissions(read=True, write=True, delete=True, list=True, add=True, create=True), + expiry=datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(hours=1) + ) + self.client = BlobServiceClient(account_url=app_config.get('AZURE_BLOB_ACCOUNT_URL'), + credential=sas_token) + def save(self, filename, data): + blob_container = self.client.get_container_client(container=self.bucket_name) + blob_container.upload_blob(filename, data) + + def load_once(self, filename: str) -> bytes: + blob = self.client.get_container_client(container=self.bucket_name) + blob = blob.get_blob_client(blob=filename) + data = blob.download_blob().readall() + return data + + def load_stream(self, filename: str) -> Generator: + def generate(filename: str = filename) -> Generator: + blob = self.client.get_blob_client(container=self.bucket_name, blob=filename) + with closing(blob.download_blob()) as blob_stream: + while chunk := blob_stream.readall(4096): + yield chunk + + return generate() + + def download(self, filename, target_filepath): + blob = self.client.get_blob_client(container=self.bucket_name, blob=filename) + with open(target_filepath, "wb") as my_blob: + blob_data = blob.download_blob() + blob_data.readinto(my_blob) + + def exists(self, filename): + blob = self.client.get_blob_client(container=self.bucket_name, blob=filename) + return blob.exists() + + def delete(self, filename): + blob_container = self.client.get_container_client(container=self.bucket_name) + blob_container.delete_blob(filename) \ No newline at end of file diff --git a/api/extensions/storage/base_storage.py b/api/extensions/storage/base_storage.py new file mode 100644 index 0000000000..13d9c34290 --- /dev/null +++ b/api/extensions/storage/base_storage.py @@ -0,0 +1,38 @@ +"""Abstract interface for file storage implementations.""" +from abc import ABC, abstractmethod +from collections.abc import Generator + +from flask import Flask + + +class BaseStorage(ABC): + """Interface for file storage. + """ + app = None + + def __init__(self, app: Flask): + self.app = app + + @abstractmethod + def save(self, filename, data): + raise NotImplementedError + + @abstractmethod + def load_once(self, filename: str) -> bytes: + raise NotImplementedError + + @abstractmethod + def load_stream(self, filename: str) -> Generator: + raise NotImplementedError + + @abstractmethod + def download(self, filename, target_filepath): + raise NotImplementedError + + @abstractmethod + def exists(self, filename): + raise NotImplementedError + + @abstractmethod + def delete(self, filename): + raise NotImplementedError diff --git a/api/extensions/storage/google_storage.py b/api/extensions/storage/google_storage.py new file mode 100644 index 0000000000..f6c69eb0ae --- /dev/null +++ b/api/extensions/storage/google_storage.py @@ -0,0 +1,56 @@ +import base64 +from collections.abc import Generator +from contextlib import closing + +from flask import Flask +from google.cloud import storage as GoogleCloudStorage + +from extensions.storage.base_storage import BaseStorage + + +class GoogleStorage(BaseStorage): + """Implementation for google storage. + """ + def __init__(self, app: Flask): + super().__init__(app) + app_config = self.app.config + self.bucket_name = app_config.get('GOOGLE_STORAGE_BUCKET_NAME') + service_account_json = base64.b64decode(app_config.get('GOOGLE_STORAGE_SERVICE_ACCOUNT_JSON_BASE64')).decode( + 'utf-8') + self.client = GoogleCloudStorage.Client().from_service_account_json(service_account_json) + + def save(self, filename, data): + bucket = self.client.get_bucket(self.bucket_name) + blob = bucket.blob(filename) + blob.upload_from_file(data) + + def load_once(self, filename: str) -> bytes: + bucket = self.client.get_bucket(self.bucket_name) + blob = bucket.get_blob(filename) + data = blob.download_as_bytes() + return data + + def load_stream(self, filename: str) -> Generator: + def generate(filename: str = filename) -> Generator: + bucket = self.client.get_bucket(self.bucket_name) + blob = bucket.get_blob(filename) + with closing(blob.open(mode='rb')) as blob_stream: + while chunk := blob_stream.read(4096): + yield chunk + return generate() + + def download(self, filename, target_filepath): + bucket = self.client.get_bucket(self.bucket_name) + blob = bucket.get_blob(filename) + with open(target_filepath, "wb") as my_blob: + blob_data = blob.download_blob() + blob_data.readinto(my_blob) + + def exists(self, filename): + bucket = self.client.get_bucket(self.bucket_name) + blob = bucket.blob(filename) + return blob.exists() + + def delete(self, filename): + bucket = self.client.get_bucket(self.bucket_name) + bucket.delete_blob(filename) \ No newline at end of file diff --git a/api/extensions/storage/local_storage.py b/api/extensions/storage/local_storage.py new file mode 100644 index 0000000000..389ef12f82 --- /dev/null +++ b/api/extensions/storage/local_storage.py @@ -0,0 +1,88 @@ +import os +import shutil +from collections.abc import Generator + +from flask import Flask + +from extensions.storage.base_storage import BaseStorage + + +class LocalStorage(BaseStorage): + """Implementation for local storage. + """ + + def __init__(self, app: Flask): + super().__init__(app) + folder = self.app.config.get('STORAGE_LOCAL_PATH') + if not os.path.isabs(folder): + folder = os.path.join(app.root_path, folder) + self.folder = folder + + def save(self, filename, data): + if not self.folder or self.folder.endswith('/'): + filename = self.folder + filename + else: + filename = self.folder + '/' + filename + + folder = os.path.dirname(filename) + os.makedirs(folder, exist_ok=True) + + with open(os.path.join(os.getcwd(), filename), "wb") as f: + f.write(data) + + def load_once(self, filename: str) -> bytes: + if not self.folder or self.folder.endswith('/'): + filename = self.folder + filename + else: + filename = self.folder + '/' + filename + + if not os.path.exists(filename): + raise FileNotFoundError("File not found") + + with open(filename, "rb") as f: + data = f.read() + + return data + + def load_stream(self, filename: str) -> Generator: + def generate(filename: str = filename) -> Generator: + if not self.folder or self.folder.endswith('/'): + filename = self.folder + filename + else: + filename = self.folder + '/' + filename + + if not os.path.exists(filename): + raise FileNotFoundError("File not found") + + with open(filename, "rb") as f: + while chunk := f.read(4096): # Read in chunks of 4KB + yield chunk + + return generate() + + def download(self, filename, target_filepath): + if not self.folder or self.folder.endswith('/'): + filename = self.folder + filename + else: + filename = self.folder + '/' + filename + + if not os.path.exists(filename): + raise FileNotFoundError("File not found") + + shutil.copyfile(filename, target_filepath) + + def exists(self, filename): + if not self.folder or self.folder.endswith('/'): + filename = self.folder + filename + else: + filename = self.folder + '/' + filename + + return os.path.exists(filename) + + def delete(self, filename): + if not self.folder or self.folder.endswith('/'): + filename = self.folder + filename + else: + filename = self.folder + '/' + filename + if os.path.exists(filename): + os.remove(filename) diff --git a/api/extensions/storage/s3_storage.py b/api/extensions/storage/s3_storage.py new file mode 100644 index 0000000000..8aae68a740 --- /dev/null +++ b/api/extensions/storage/s3_storage.py @@ -0,0 +1,68 @@ +from collections.abc import Generator +from contextlib import closing + +import boto3 +from botocore.client import Config +from botocore.exceptions import ClientError +from flask import Flask + +from extensions.storage.base_storage import BaseStorage + + +class S3Storage(BaseStorage): + """Implementation for s3 storage. + """ + def __init__(self, app: Flask): + super().__init__(app) + app_config = self.app.config + self.bucket_name = app_config.get('S3_BUCKET_NAME') + self.client = boto3.client( + 's3', + aws_secret_access_key=app_config.get('S3_SECRET_KEY'), + aws_access_key_id=app_config.get('S3_ACCESS_KEY'), + endpoint_url=app_config.get('S3_ENDPOINT'), + region_name=app_config.get('S3_REGION'), + config=Config(s3={'addressing_style': app_config.get('S3_ADDRESS_STYLE')}) + ) + + def save(self, filename, data): + self.client.put_object(Bucket=self.bucket_name, Key=filename, Body=data) + + def load_once(self, filename: str) -> bytes: + try: + with closing(self.client) as client: + data = client.get_object(Bucket=self.bucket_name, Key=filename)['Body'].read() + except ClientError as ex: + if ex.response['Error']['Code'] == 'NoSuchKey': + raise FileNotFoundError("File not found") + else: + raise + return data + + def load_stream(self, filename: str) -> Generator: + def generate(filename: str = filename) -> Generator: + try: + with closing(self.client) as client: + response = client.get_object(Bucket=self.bucket_name, Key=filename) + yield from response['Body'].iter_chunks() + except ClientError as ex: + if ex.response['Error']['Code'] == 'NoSuchKey': + raise FileNotFoundError("File not found") + else: + raise + return generate() + + def download(self, filename, target_filepath): + with closing(self.client) as client: + client.download_file(self.bucket_name, filename, target_filepath) + + def exists(self, filename): + with closing(self.client) as client: + try: + client.head_object(Bucket=self.bucket_name, Key=filename) + return True + except: + return False + + def delete(self, filename): + self.client.delete_object(Bucket=self.bucket_name, Key=filename)