add storage factory (#3922)

This commit is contained in:
Jyong 2024-04-29 18:22:03 +08:00 committed by GitHub
parent c5e2659771
commit 338e4669e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 384 additions and 211 deletions

View File

@ -1,90 +1,42 @@
import base64
import os
import shutil
from collections.abc import Generator from collections.abc import Generator
from contextlib import closing
from datetime import datetime, timedelta, timezone
from typing import Union from typing import Union
import boto3
import oss2 as aliyun_s3
from azure.storage.blob import AccountSasPermissions, BlobServiceClient, ResourceTypes, generate_account_sas
from botocore.client import Config
from botocore.exceptions import ClientError
from flask import Flask from flask import Flask
from google.cloud import storage as GoogleStorage
from extensions.storage.aliyun_storage import AliyunStorage
from extensions.storage.azure_storage import AzureStorage
from extensions.storage.google_storage import GoogleStorage
from extensions.storage.local_storage import LocalStorage
from extensions.storage.s3_storage import S3Storage
class Storage: class Storage:
def __init__(self): def __init__(self):
self.storage_type = None self.storage_runner = None
self.bucket_name = None
self.client = None
self.folder = None
def init_app(self, app: Flask): def init_app(self, app: Flask):
self.storage_type = app.config.get('STORAGE_TYPE') storage_type = app.config.get('STORAGE_TYPE')
if self.storage_type == 's3': if storage_type == 's3':
self.bucket_name = app.config.get('S3_BUCKET_NAME') self.storage_runner = S3Storage(
self.client = boto3.client( app=app
's3',
aws_secret_access_key=app.config.get('S3_SECRET_KEY'),
aws_access_key_id=app.config.get('S3_ACCESS_KEY'),
endpoint_url=app.config.get('S3_ENDPOINT'),
region_name=app.config.get('S3_REGION'),
config=Config(s3={'addressing_style': app.config.get('S3_ADDRESS_STYLE')})
) )
elif self.storage_type == 'azure-blob': elif storage_type == 'azure-blob':
self.bucket_name = app.config.get('AZURE_BLOB_CONTAINER_NAME') self.storage_runner = AzureStorage(
sas_token = generate_account_sas( app=app
account_name=app.config.get('AZURE_BLOB_ACCOUNT_NAME'),
account_key=app.config.get('AZURE_BLOB_ACCOUNT_KEY'),
resource_types=ResourceTypes(service=True, container=True, object=True),
permission=AccountSasPermissions(read=True, write=True, delete=True, list=True, add=True, create=True),
expiry=datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(hours=1)
) )
self.client = BlobServiceClient(account_url=app.config.get('AZURE_BLOB_ACCOUNT_URL'), elif storage_type == 'aliyun-oss':
credential=sas_token) self.storage_runner = AliyunStorage(
elif self.storage_type == 'aliyun-oss': app=app
self.bucket_name = app.config.get('ALIYUN_OSS_BUCKET_NAME') )
self.client = aliyun_s3.Bucket( elif storage_type == 'google-storage':
aliyun_s3.Auth(app.config.get('ALIYUN_OSS_ACCESS_KEY'), app.config.get('ALIYUN_OSS_SECRET_KEY')), self.storage_runner = GoogleStorage(
app.config.get('ALIYUN_OSS_ENDPOINT'), app=app
self.bucket_name,
connect_timeout=30
) )
elif self.storage_type == 'google-storage':
self.bucket_name = app.config.get('GOOGLE_STORAGE_BUCKET_NAME')
service_account_json = base64.b64decode(app.config.get('GOOGLE_STORAGE_SERVICE_ACCOUNT_JSON_BASE64')).decode('utf-8')
self.client = GoogleStorage.Client().from_service_account_json(service_account_json)
else: else:
self.folder = app.config.get('STORAGE_LOCAL_PATH') self.storage_runner = LocalStorage(app=app)
if not os.path.isabs(self.folder):
self.folder = os.path.join(app.root_path, self.folder)
def save(self, filename, data): def save(self, filename, data):
if self.storage_type == 's3': self.storage_runner.save(filename, data)
self.client.put_object(Bucket=self.bucket_name, Key=filename, Body=data)
elif self.storage_type == 'azure-blob':
blob_container = self.client.get_container_client(container=self.bucket_name)
blob_container.upload_blob(filename, data)
elif self.storage_type == 'aliyun-oss':
self.client.put_object(filename, data)
elif self.storage_type == 'google-storage':
bucket = self.client.get_bucket(self.bucket_name)
blob = bucket.blob(filename)
blob.upload_from_file(data)
else:
if not self.folder or self.folder.endswith('/'):
filename = self.folder + filename
else:
filename = self.folder + '/' + filename
folder = os.path.dirname(filename)
os.makedirs(folder, exist_ok=True)
with open(os.path.join(os.getcwd(), filename), "wb") as f:
f.write(data)
def load(self, filename: str, stream: bool = False) -> Union[bytes, Generator]: def load(self, filename: str, stream: bool = False) -> Union[bytes, Generator]:
if stream: if stream:
@ -93,154 +45,19 @@ class Storage:
return self.load_once(filename) return self.load_once(filename)
def load_once(self, filename: str) -> bytes: def load_once(self, filename: str) -> bytes:
if self.storage_type == 's3': return self.storage_runner.load_once(filename)
try:
with closing(self.client) as client:
data = client.get_object(Bucket=self.bucket_name, Key=filename)['Body'].read()
except ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
raise FileNotFoundError("File not found")
else:
raise
elif self.storage_type == 'azure-blob':
blob = self.client.get_container_client(container=self.bucket_name)
blob = blob.get_blob_client(blob=filename)
data = blob.download_blob().readall()
elif self.storage_type == 'aliyun-oss':
with closing(self.client.get_object(filename)) as obj:
data = obj.read()
elif self.storage_type == 'google-storage':
bucket = self.client.get_bucket(self.bucket_name)
blob = bucket.get_blob(filename)
data = blob.download_as_bytes()
else:
if not self.folder or self.folder.endswith('/'):
filename = self.folder + filename
else:
filename = self.folder + '/' + filename
if not os.path.exists(filename):
raise FileNotFoundError("File not found")
with open(filename, "rb") as f:
data = f.read()
return data
def load_stream(self, filename: str) -> Generator: def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator: return self.storage_runner.load_stream(filename)
if self.storage_type == 's3':
try:
with closing(self.client) as client:
response = client.get_object(Bucket=self.bucket_name, Key=filename)
for chunk in response['Body'].iter_chunks():
yield chunk
except ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
raise FileNotFoundError("File not found")
else:
raise
elif self.storage_type == 'azure-blob':
blob = self.client.get_blob_client(container=self.bucket_name, blob=filename)
with closing(blob.download_blob()) as blob_stream:
while chunk := blob_stream.readall(4096):
yield chunk
elif self.storage_type == 'aliyun-oss':
with closing(self.client.get_object(filename)) as obj:
while chunk := obj.read(4096):
yield chunk
elif self.storage_type == 'google-storage':
bucket = self.client.get_bucket(self.bucket_name)
blob = bucket.get_blob(filename)
with closing(blob.open(mode='rb')) as blob_stream:
while chunk := blob_stream.read(4096):
yield chunk
else:
if not self.folder or self.folder.endswith('/'):
filename = self.folder + filename
else:
filename = self.folder + '/' + filename
if not os.path.exists(filename):
raise FileNotFoundError("File not found")
with open(filename, "rb") as f:
while chunk := f.read(4096): # Read in chunks of 4KB
yield chunk
return generate()
def download(self, filename, target_filepath): def download(self, filename, target_filepath):
if self.storage_type == 's3': self.storage_runner.download(filename, target_filepath)
with closing(self.client) as client:
client.download_file(self.bucket_name, filename, target_filepath)
elif self.storage_type == 'azure-blob':
blob = self.client.get_blob_client(container=self.bucket_name, blob=filename)
with open(target_filepath, "wb") as my_blob:
blob_data = blob.download_blob()
blob_data.readinto(my_blob)
elif self.storage_type == 'aliyun-oss':
self.client.get_object_to_file(filename, target_filepath)
elif self.storage_type == 'google-storage':
bucket = self.client.get_bucket(self.bucket_name)
blob = bucket.get_blob(filename)
with open(target_filepath, "wb") as my_blob:
blob_data = blob.download_blob()
blob_data.readinto(my_blob)
else:
if not self.folder or self.folder.endswith('/'):
filename = self.folder + filename
else:
filename = self.folder + '/' + filename
if not os.path.exists(filename):
raise FileNotFoundError("File not found")
shutil.copyfile(filename, target_filepath)
def exists(self, filename): def exists(self, filename):
if self.storage_type == 's3': return self.storage_runner.exists(filename)
with closing(self.client) as client:
try:
client.head_object(Bucket=self.bucket_name, Key=filename)
return True
except:
return False
elif self.storage_type == 'azure-blob':
blob = self.client.get_blob_client(container=self.bucket_name, blob=filename)
return blob.exists()
elif self.storage_type == 'aliyun-oss':
return self.client.object_exists(filename)
elif self.storage_type == 'google-storage':
bucket = self.client.get_bucket(self.bucket_name)
blob = bucket.blob(filename)
return blob.exists()
else:
if not self.folder or self.folder.endswith('/'):
filename = self.folder + filename
else:
filename = self.folder + '/' + filename
return os.path.exists(filename)
def delete(self, filename): def delete(self, filename):
if self.storage_type == 's3': return self.storage_runner.delete(filename)
self.client.delete_object(Bucket=self.bucket_name, Key=filename)
elif self.storage_type == 'azure-blob':
blob_container = self.client.get_container_client(container=self.bucket_name)
blob_container.delete_blob(filename)
elif self.storage_type == 'aliyun-oss':
self.client.delete_object(filename)
elif self.storage_type == 'google-storage':
bucket = self.client.get_bucket(self.bucket_name)
bucket.delete_blob(filename)
else:
if not self.folder or self.folder.endswith('/'):
filename = self.folder + filename
else:
filename = self.folder + '/' + filename
if os.path.exists(filename):
os.remove(filename)
storage = Storage() storage = Storage()

View File

@ -0,0 +1,48 @@
from collections.abc import Generator
from contextlib import closing
import oss2 as aliyun_s3
from flask import Flask
from extensions.storage.base_storage import BaseStorage
class AliyunStorage(BaseStorage):
"""Implementation for aliyun storage.
"""
def __init__(self, app: Flask):
super().__init__(app)
app_config = self.app.config
self.bucket_name = app_config.get('ALIYUN_OSS_BUCKET_NAME')
self.client = aliyun_s3.Bucket(
aliyun_s3.Auth(app_config.get('ALIYUN_OSS_ACCESS_KEY'), app_config.get('ALIYUN_OSS_SECRET_KEY')),
app_config.get('ALIYUN_OSS_ENDPOINT'),
self.bucket_name,
connect_timeout=30
)
def save(self, filename, data):
self.client.put_object(filename, data)
def load_once(self, filename: str) -> bytes:
with closing(self.client.get_object(filename)) as obj:
data = obj.read()
return data
def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator:
with closing(self.client.get_object(filename)) as obj:
while chunk := obj.read(4096):
yield chunk
return generate()
def download(self, filename, target_filepath):
self.client.get_object_to_file(filename, target_filepath)
def exists(self, filename):
return self.client.object_exists(filename)
def delete(self, filename):
self.client.delete_object(filename)

View File

@ -0,0 +1,58 @@
from collections.abc import Generator
from contextlib import closing
from datetime import datetime, timedelta, timezone
from azure.storage.blob import AccountSasPermissions, BlobServiceClient, ResourceTypes, generate_account_sas
from flask import Flask
from extensions.storage.base_storage import BaseStorage
class AzureStorage(BaseStorage):
"""Implementation for azure storage.
"""
def __init__(self, app: Flask):
super().__init__(app)
app_config = self.app.config
self.bucket_name = app_config.get('AZURE_STORAGE_CONTAINER_NAME')
sas_token = generate_account_sas(
account_name=app_config.get('AZURE_BLOB_ACCOUNT_NAME'),
account_key=app_config.get('AZURE_BLOB_ACCOUNT_KEY'),
resource_types=ResourceTypes(service=True, container=True, object=True),
permission=AccountSasPermissions(read=True, write=True, delete=True, list=True, add=True, create=True),
expiry=datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(hours=1)
)
self.client = BlobServiceClient(account_url=app_config.get('AZURE_BLOB_ACCOUNT_URL'),
credential=sas_token)
def save(self, filename, data):
blob_container = self.client.get_container_client(container=self.bucket_name)
blob_container.upload_blob(filename, data)
def load_once(self, filename: str) -> bytes:
blob = self.client.get_container_client(container=self.bucket_name)
blob = blob.get_blob_client(blob=filename)
data = blob.download_blob().readall()
return data
def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator:
blob = self.client.get_blob_client(container=self.bucket_name, blob=filename)
with closing(blob.download_blob()) as blob_stream:
while chunk := blob_stream.readall(4096):
yield chunk
return generate()
def download(self, filename, target_filepath):
blob = self.client.get_blob_client(container=self.bucket_name, blob=filename)
with open(target_filepath, "wb") as my_blob:
blob_data = blob.download_blob()
blob_data.readinto(my_blob)
def exists(self, filename):
blob = self.client.get_blob_client(container=self.bucket_name, blob=filename)
return blob.exists()
def delete(self, filename):
blob_container = self.client.get_container_client(container=self.bucket_name)
blob_container.delete_blob(filename)

View File

@ -0,0 +1,38 @@
"""Abstract interface for file storage implementations."""
from abc import ABC, abstractmethod
from collections.abc import Generator
from flask import Flask
class BaseStorage(ABC):
"""Interface for file storage.
"""
app = None
def __init__(self, app: Flask):
self.app = app
@abstractmethod
def save(self, filename, data):
raise NotImplementedError
@abstractmethod
def load_once(self, filename: str) -> bytes:
raise NotImplementedError
@abstractmethod
def load_stream(self, filename: str) -> Generator:
raise NotImplementedError
@abstractmethod
def download(self, filename, target_filepath):
raise NotImplementedError
@abstractmethod
def exists(self, filename):
raise NotImplementedError
@abstractmethod
def delete(self, filename):
raise NotImplementedError

View File

@ -0,0 +1,56 @@
import base64
from collections.abc import Generator
from contextlib import closing
from flask import Flask
from google.cloud import storage as GoogleCloudStorage
from extensions.storage.base_storage import BaseStorage
class GoogleStorage(BaseStorage):
"""Implementation for google storage.
"""
def __init__(self, app: Flask):
super().__init__(app)
app_config = self.app.config
self.bucket_name = app_config.get('GOOGLE_STORAGE_BUCKET_NAME')
service_account_json = base64.b64decode(app_config.get('GOOGLE_STORAGE_SERVICE_ACCOUNT_JSON_BASE64')).decode(
'utf-8')
self.client = GoogleCloudStorage.Client().from_service_account_json(service_account_json)
def save(self, filename, data):
bucket = self.client.get_bucket(self.bucket_name)
blob = bucket.blob(filename)
blob.upload_from_file(data)
def load_once(self, filename: str) -> bytes:
bucket = self.client.get_bucket(self.bucket_name)
blob = bucket.get_blob(filename)
data = blob.download_as_bytes()
return data
def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator:
bucket = self.client.get_bucket(self.bucket_name)
blob = bucket.get_blob(filename)
with closing(blob.open(mode='rb')) as blob_stream:
while chunk := blob_stream.read(4096):
yield chunk
return generate()
def download(self, filename, target_filepath):
bucket = self.client.get_bucket(self.bucket_name)
blob = bucket.get_blob(filename)
with open(target_filepath, "wb") as my_blob:
blob_data = blob.download_blob()
blob_data.readinto(my_blob)
def exists(self, filename):
bucket = self.client.get_bucket(self.bucket_name)
blob = bucket.blob(filename)
return blob.exists()
def delete(self, filename):
bucket = self.client.get_bucket(self.bucket_name)
bucket.delete_blob(filename)

View File

@ -0,0 +1,88 @@
import os
import shutil
from collections.abc import Generator
from flask import Flask
from extensions.storage.base_storage import BaseStorage
class LocalStorage(BaseStorage):
"""Implementation for local storage.
"""
def __init__(self, app: Flask):
super().__init__(app)
folder = self.app.config.get('STORAGE_LOCAL_PATH')
if not os.path.isabs(folder):
folder = os.path.join(app.root_path, folder)
self.folder = folder
def save(self, filename, data):
if not self.folder or self.folder.endswith('/'):
filename = self.folder + filename
else:
filename = self.folder + '/' + filename
folder = os.path.dirname(filename)
os.makedirs(folder, exist_ok=True)
with open(os.path.join(os.getcwd(), filename), "wb") as f:
f.write(data)
def load_once(self, filename: str) -> bytes:
if not self.folder or self.folder.endswith('/'):
filename = self.folder + filename
else:
filename = self.folder + '/' + filename
if not os.path.exists(filename):
raise FileNotFoundError("File not found")
with open(filename, "rb") as f:
data = f.read()
return data
def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator:
if not self.folder or self.folder.endswith('/'):
filename = self.folder + filename
else:
filename = self.folder + '/' + filename
if not os.path.exists(filename):
raise FileNotFoundError("File not found")
with open(filename, "rb") as f:
while chunk := f.read(4096): # Read in chunks of 4KB
yield chunk
return generate()
def download(self, filename, target_filepath):
if not self.folder or self.folder.endswith('/'):
filename = self.folder + filename
else:
filename = self.folder + '/' + filename
if not os.path.exists(filename):
raise FileNotFoundError("File not found")
shutil.copyfile(filename, target_filepath)
def exists(self, filename):
if not self.folder or self.folder.endswith('/'):
filename = self.folder + filename
else:
filename = self.folder + '/' + filename
return os.path.exists(filename)
def delete(self, filename):
if not self.folder or self.folder.endswith('/'):
filename = self.folder + filename
else:
filename = self.folder + '/' + filename
if os.path.exists(filename):
os.remove(filename)

View File

@ -0,0 +1,68 @@
from collections.abc import Generator
from contextlib import closing
import boto3
from botocore.client import Config
from botocore.exceptions import ClientError
from flask import Flask
from extensions.storage.base_storage import BaseStorage
class S3Storage(BaseStorage):
"""Implementation for s3 storage.
"""
def __init__(self, app: Flask):
super().__init__(app)
app_config = self.app.config
self.bucket_name = app_config.get('S3_BUCKET_NAME')
self.client = boto3.client(
's3',
aws_secret_access_key=app_config.get('S3_SECRET_KEY'),
aws_access_key_id=app_config.get('S3_ACCESS_KEY'),
endpoint_url=app_config.get('S3_ENDPOINT'),
region_name=app_config.get('S3_REGION'),
config=Config(s3={'addressing_style': app_config.get('S3_ADDRESS_STYLE')})
)
def save(self, filename, data):
self.client.put_object(Bucket=self.bucket_name, Key=filename, Body=data)
def load_once(self, filename: str) -> bytes:
try:
with closing(self.client) as client:
data = client.get_object(Bucket=self.bucket_name, Key=filename)['Body'].read()
except ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
raise FileNotFoundError("File not found")
else:
raise
return data
def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator:
try:
with closing(self.client) as client:
response = client.get_object(Bucket=self.bucket_name, Key=filename)
yield from response['Body'].iter_chunks()
except ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
raise FileNotFoundError("File not found")
else:
raise
return generate()
def download(self, filename, target_filepath):
with closing(self.client) as client:
client.download_file(self.bucket_name, filename, target_filepath)
def exists(self, filename):
with closing(self.client) as client:
try:
client.head_object(Bucket=self.bucket_name, Key=filename)
return True
except:
return False
def delete(self, filename):
self.client.delete_object(Bucket=self.bucket_name, Key=filename)