import os import shutil from collections.abc import Generator from pathlib import Path from flask import current_app from configs import dify_config from extensions.storage.base_storage import BaseStorage class LocalFsStorage(BaseStorage): """Implementation for local filesystem storage.""" def __init__(self): super().__init__() folder = dify_config.STORAGE_LOCAL_PATH if not os.path.isabs(folder): folder = os.path.join(current_app.root_path, folder) self.folder = folder def save(self, filename, data): if not self.folder or self.folder.endswith("/"): filename = self.folder + filename else: filename = self.folder + "/" + filename folder = os.path.dirname(filename) os.makedirs(folder, exist_ok=True) Path(os.path.join(os.getcwd(), filename)).write_bytes(data) def load_once(self, filename: str) -> bytes: if not self.folder or self.folder.endswith("/"): filename = self.folder + filename else: filename = self.folder + "/" + filename if not os.path.exists(filename): raise FileNotFoundError("File not found") data = Path(filename).read_bytes() return data def load_stream(self, filename: str) -> Generator: def generate(filename: str = filename) -> Generator: if not self.folder or self.folder.endswith("/"): filename = self.folder + filename else: filename = self.folder + "/" + filename if not os.path.exists(filename): raise FileNotFoundError("File not found") with open(filename, "rb") as f: while chunk := f.read(4096): # Read in chunks of 4KB yield chunk return generate() def download(self, filename, target_filepath): if not self.folder or self.folder.endswith("/"): filename = self.folder + filename else: filename = self.folder + "/" + filename if not os.path.exists(filename): raise FileNotFoundError("File not found") shutil.copyfile(filename, target_filepath) def exists(self, filename): if not self.folder or self.folder.endswith("/"): filename = self.folder + filename else: filename = self.folder + "/" + filename return os.path.exists(filename) def delete(self, filename): if not self.folder or self.folder.endswith("/"): filename = self.folder + filename else: filename = self.folder + "/" + filename if os.path.exists(filename): os.remove(filename)