diff --git a/api/db/services/__init__.py b/api/db/services/__init__.py index 964a7a17b..4b3af3ecf 100644 --- a/api/db/services/__init__.py +++ b/api/db/services/__init__.py @@ -13,27 +13,87 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import pathlib import re +from pathlib import PurePath + from .user_service import UserService as UserService -def duplicate_name(query_func, **kwargs): - fnm = kwargs["name"] - objs = query_func(**kwargs) - if not objs: - return fnm - ext = pathlib.Path(fnm).suffix #.jpg - nm = re.sub(r"%s$"%ext, "", fnm) - r = re.search(r"\(([0-9]+)\)$", nm) - c = 0 - if r: - c = int(r.group(1)) - nm = re.sub(r"\([0-9]+\)$", "", nm) - c += 1 - nm = f"{nm}({c})" - if ext: - nm += f"{ext}" +def split_name_counter(filename: str) -> tuple[str, int | None]: + """ + Splits a filename into main part and counter (if present in parentheses). - kwargs["name"] = nm - return duplicate_name(query_func, **kwargs) + Args: + filename: Input filename string to be parsed + + Returns: + A tuple containing: + - The main filename part (string) + - The counter from parentheses (integer) or None if no counter exists + """ + pattern = re.compile(r"^(.*?)\((\d+)\)$") + + match = pattern.search(filename) + if match: + main_part = match.group(1).rstrip() + bracket_part = match.group(2) + return main_part, int(bracket_part) + + return filename, None + + +def duplicate_name(query_func, **kwargs) -> str: + """ + Generates a unique filename by appending/incrementing a counter when duplicates exist. + + Continuously checks for name availability using the provided query function, + automatically appending (1), (2), etc. until finding an available name or + reaching maximum retries. + + Args: + query_func: Callable that accepts keyword arguments and returns: + - True if name exists (should be modified) + - False if name is available + **kwargs: Must contain 'name' key with original filename to check + + Returns: + str: Available filename, either: + - Original name (if available) + - Modified name with counter (e.g., "file(1).txt") + + Raises: + KeyError: If 'name' key not provided in kwargs + RuntimeError: If unable to generate unique name after maximum retries + + Example: + >>> def name_exists(name): return name in existing_files + >>> duplicate_name(name_exists, name="document.pdf") + 'document(1).pdf' # If original exists + """ + MAX_RETRIES = 1000 + + if "name" not in kwargs: + raise KeyError("Arguments must contain 'name' key") + + original_name = kwargs["name"] + current_name = original_name + retries = 0 + + while retries < MAX_RETRIES: + if not query_func(**kwargs): + return current_name + + path = PurePath(current_name) + stem = path.stem + suffix = path.suffix + + main_part, counter = split_name_counter(stem) + counter = counter + 1 if counter else 1 + + new_name = f"{main_part}({counter}){suffix}" + + kwargs["name"] = new_name + current_name = new_name + retries += 1 + + raise RuntimeError(f"Failed to generate unique name within {MAX_RETRIES} attempts. Original: {original_name}")