refactor: update load_stream method to directly yield file chunks (#9806)

This commit is contained in:
zhuhao 2024-10-25 10:11:25 +08:00 committed by GitHub
parent dd17506078
commit 5bf31e7a86
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 48 additions and 83 deletions

View File

@ -36,12 +36,9 @@ class AliyunOssStorage(BaseStorage):
return data return data
def load_stream(self, filename: str) -> Generator: def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator: obj = self.client.get_object(self.__wrapper_folder_filename(filename))
obj = self.client.get_object(self.__wrapper_folder_filename(filename)) while chunk := obj.read(4096):
while chunk := obj.read(4096): yield chunk
yield chunk
return generate()
def download(self, filename, target_filepath): def download(self, filename, target_filepath):
self.client.get_object_to_file(self.__wrapper_folder_filename(filename), target_filepath) self.client.get_object_to_file(self.__wrapper_folder_filename(filename), target_filepath)

View File

@ -62,17 +62,14 @@ class AwsS3Storage(BaseStorage):
return data return data
def load_stream(self, filename: str) -> Generator: def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator: try:
try: response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
response = self.client.get_object(Bucket=self.bucket_name, Key=filename) yield from response["Body"].iter_chunks()
yield from response["Body"].iter_chunks() except ClientError as ex:
except ClientError as ex: if ex.response["Error"]["Code"] == "NoSuchKey":
if ex.response["Error"]["Code"] == "NoSuchKey": raise FileNotFoundError("File not found")
raise FileNotFoundError("File not found") else:
else: raise
raise
return generate()
def download(self, filename, target_filepath): def download(self, filename, target_filepath):
self.client.download_file(self.bucket_name, filename, target_filepath) self.client.download_file(self.bucket_name, filename, target_filepath)

View File

@ -32,13 +32,9 @@ class AzureBlobStorage(BaseStorage):
def load_stream(self, filename: str) -> Generator: def load_stream(self, filename: str) -> Generator:
client = self._sync_client() client = self._sync_client()
blob = client.get_blob_client(container=self.bucket_name, blob=filename)
def generate(filename: str = filename) -> Generator: blob_data = blob.download_blob()
blob = client.get_blob_client(container=self.bucket_name, blob=filename) yield from blob_data.chunks()
blob_data = blob.download_blob()
yield from blob_data.chunks()
return generate(filename)
def download(self, filename, target_filepath): def download(self, filename, target_filepath):
client = self._sync_client() client = self._sync_client()

View File

@ -39,12 +39,9 @@ class BaiduObsStorage(BaseStorage):
return response.data.read() return response.data.read()
def load_stream(self, filename: str) -> Generator: def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator: response = self.client.get_object(bucket_name=self.bucket_name, key=filename).data
response = self.client.get_object(bucket_name=self.bucket_name, key=filename).data while chunk := response.read(4096):
while chunk := response.read(4096): yield chunk
yield chunk
return generate()
def download(self, filename, target_filepath): def download(self, filename, target_filepath):
self.client.get_object_to_file(bucket_name=self.bucket_name, key=filename, file_name=target_filepath) self.client.get_object_to_file(bucket_name=self.bucket_name, key=filename, file_name=target_filepath)

View File

@ -39,14 +39,11 @@ class GoogleCloudStorage(BaseStorage):
return data return data
def load_stream(self, filename: str) -> Generator: def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator: bucket = self.client.get_bucket(self.bucket_name)
bucket = self.client.get_bucket(self.bucket_name) blob = bucket.get_blob(filename)
blob = bucket.get_blob(filename) with blob.open(mode="rb") as blob_stream:
with blob.open(mode="rb") as blob_stream: while chunk := blob_stream.read(4096):
while chunk := blob_stream.read(4096): yield chunk
yield chunk
return generate()
def download(self, filename, target_filepath): def download(self, filename, target_filepath):
bucket = self.client.get_bucket(self.bucket_name) bucket = self.client.get_bucket(self.bucket_name)

View File

@ -27,12 +27,9 @@ class HuaweiObsStorage(BaseStorage):
return data return data
def load_stream(self, filename: str) -> Generator: def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator: response = self.client.getObject(bucketName=self.bucket_name, objectKey=filename)["body"].response
response = self.client.getObject(bucketName=self.bucket_name, objectKey=filename)["body"].response while chunk := response.read(4096):
while chunk := response.read(4096): yield chunk
yield chunk
return generate()
def download(self, filename, target_filepath): def download(self, filename, target_filepath):
self.client.getObject(bucketName=self.bucket_name, objectKey=filename, downloadPath=target_filepath) self.client.getObject(bucketName=self.bucket_name, objectKey=filename, downloadPath=target_filepath)

View File

@ -40,15 +40,11 @@ class LocalFsStorage(BaseStorage):
def load_stream(self, filename: str) -> Generator: def load_stream(self, filename: str) -> Generator:
filepath = self._build_filepath(filename) filepath = self._build_filepath(filename)
if not os.path.exists(filepath):
def generate() -> Generator: raise FileNotFoundError("File not found")
if not os.path.exists(filepath): with open(filepath, "rb") as f:
raise FileNotFoundError("File not found") while chunk := f.read(4096): # Read in chunks of 4KB
with open(filepath, "rb") as f: yield chunk
while chunk := f.read(4096): # Read in chunks of 4KB
yield chunk
return generate()
def download(self, filename, target_filepath): def download(self, filename, target_filepath):
filepath = self._build_filepath(filename) filepath = self._build_filepath(filename)

View File

@ -36,17 +36,14 @@ class OracleOCIStorage(BaseStorage):
return data return data
def load_stream(self, filename: str) -> Generator: def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator: try:
try: response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
response = self.client.get_object(Bucket=self.bucket_name, Key=filename) yield from response["Body"].iter_chunks()
yield from response["Body"].iter_chunks() except ClientError as ex:
except ClientError as ex: if ex.response["Error"]["Code"] == "NoSuchKey":
if ex.response["Error"]["Code"] == "NoSuchKey": raise FileNotFoundError("File not found")
raise FileNotFoundError("File not found") else:
else: raise
raise
return generate()
def download(self, filename, target_filepath): def download(self, filename, target_filepath):
self.client.download_file(self.bucket_name, filename, target_filepath) self.client.download_file(self.bucket_name, filename, target_filepath)

View File

@ -36,17 +36,14 @@ class SupabaseStorage(BaseStorage):
return content return content
def load_stream(self, filename: str) -> Generator: def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator: result = self.client.storage.from_(self.bucket_name).download(filename)
result = self.client.storage.from_(self.bucket_name).download(filename) byte_stream = io.BytesIO(result)
byte_stream = io.BytesIO(result) while chunk := byte_stream.read(4096): # Read in chunks of 4KB
while chunk := byte_stream.read(4096): # Read in chunks of 4KB yield chunk
yield chunk
return generate()
def download(self, filename, target_filepath): def download(self, filename, target_filepath):
result = self.client.storage.from_(self.bucket_name).download(filename) result = self.client.storage.from_(self.bucket_name).download(filename)
Path(result).write_bytes(result) Path(target_filepath).write_bytes(result)
def exists(self, filename): def exists(self, filename):
result = self.client.storage.from_(self.bucket_name).list(filename) result = self.client.storage.from_(self.bucket_name).list(filename)

View File

@ -29,11 +29,8 @@ class TencentCosStorage(BaseStorage):
return data return data
def load_stream(self, filename: str) -> Generator: def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator: response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
response = self.client.get_object(Bucket=self.bucket_name, Key=filename) yield from response["Body"].get_stream(chunk_size=4096)
yield from response["Body"].get_stream(chunk_size=4096)
return generate()
def download(self, filename, target_filepath): def download(self, filename, target_filepath):
response = self.client.get_object(Bucket=self.bucket_name, Key=filename) response = self.client.get_object(Bucket=self.bucket_name, Key=filename)

View File

@ -27,12 +27,9 @@ class VolcengineTosStorage(BaseStorage):
return data return data
def load_stream(self, filename: str) -> Generator: def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator: response = self.client.get_object(bucket=self.bucket_name, key=filename)
response = self.client.get_object(bucket=self.bucket_name, key=filename) while chunk := response.read(4096):
while chunk := response.read(4096): yield chunk
yield chunk
return generate()
def download(self, filename, target_filepath): def download(self, filename, target_filepath):
self.client.get_object_to_file(bucket=self.bucket_name, key=filename, file_path=target_filepath) self.client.get_object_to_file(bucket=self.bucket_name, key=filename, file_path=target_filepath)