mirror of
https://git.mirrors.martin98.com/https://github.com/infiniflow/ragflow.git
synced 2025-04-21 05:29:57 +08:00
Fix: failed to acquire lock exception with retry mechanism for postgres and mysql (#6483)
Added the with_retry decorator in db_models.py to add a retry mechanism for database operations. Applied the retry mechanism to the lock and unlock methods of the PostgresDatabaseLock and MysqlDatabaseLock classes to enhance the reliability of lock operations. ### What problem does this PR solve? resolve failed to acquire lock exception with retry mechanism ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --------- Co-authored-by: wenju.li <wenju.li@deepctr.cn>
This commit is contained in:
parent
60c3a253ad
commit
814a210f5d
@ -19,8 +19,10 @@ import operator
|
||||
import os
|
||||
import sys
|
||||
import typing
|
||||
import time
|
||||
from enum import Enum
|
||||
from functools import wraps
|
||||
import hashlib
|
||||
|
||||
from flask_login import UserMixin
|
||||
from itsdangerous.url_safe import URLSafeTimedSerializer as Serializer
|
||||
@ -260,14 +262,54 @@ class BaseDataBase:
|
||||
logging.info("init database on cluster mode successfully")
|
||||
|
||||
|
||||
def with_retry(max_retries=3, retry_delay=1.0):
|
||||
"""Decorator: Add retry mechanism to database operations
|
||||
|
||||
Args:
|
||||
max_retries (int): maximum number of retries
|
||||
retry_delay (float): initial retry delay (seconds), will increase exponentially
|
||||
|
||||
Returns:
|
||||
decorated function
|
||||
"""
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
last_exception = None
|
||||
for retry in range(max_retries):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except Exception as e:
|
||||
last_exception = e
|
||||
# get self and method name for logging
|
||||
self_obj = args[0] if args else None
|
||||
func_name = func.__name__
|
||||
lock_name = getattr(self_obj, 'lock_name', 'unknown') if self_obj else 'unknown'
|
||||
|
||||
if retry < max_retries - 1:
|
||||
current_delay = retry_delay * (2 ** retry)
|
||||
logging.warning(f"{func_name} {lock_name} failed: {str(e)}, retrying ({retry+1}/{max_retries})")
|
||||
time.sleep(current_delay)
|
||||
else:
|
||||
logging.error(f"{func_name} {lock_name} failed after all attempts: {str(e)}")
|
||||
|
||||
if last_exception:
|
||||
raise last_exception
|
||||
return False
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
class PostgresDatabaseLock:
|
||||
def __init__(self, lock_name, timeout=10, db=None):
|
||||
self.lock_name = lock_name
|
||||
self.lock_id = int(hashlib.md5(lock_name.encode()).hexdigest(), 16) % (2**31-1)
|
||||
self.timeout = int(timeout)
|
||||
self.db = db if db else DB
|
||||
|
||||
@with_retry(max_retries=3, retry_delay=1.0)
|
||||
def lock(self):
|
||||
cursor = self.db.execute_sql("SELECT pg_try_advisory_lock(%s)", self.timeout)
|
||||
cursor = self.db.execute_sql("SELECT pg_try_advisory_lock(%s)", (self.lock_id,))
|
||||
ret = cursor.fetchone()
|
||||
if ret[0] == 0:
|
||||
raise Exception(f"acquire postgres lock {self.lock_name} timeout")
|
||||
@ -276,8 +318,9 @@ class PostgresDatabaseLock:
|
||||
else:
|
||||
raise Exception(f"failed to acquire lock {self.lock_name}")
|
||||
|
||||
@with_retry(max_retries=3, retry_delay=1.0)
|
||||
def unlock(self):
|
||||
cursor = self.db.execute_sql("SELECT pg_advisory_unlock(%s)", self.timeout)
|
||||
cursor = self.db.execute_sql("SELECT pg_advisory_unlock(%s)", (self.lock_id,))
|
||||
ret = cursor.fetchone()
|
||||
if ret[0] == 0:
|
||||
raise Exception(f"postgres lock {self.lock_name} was not established by this thread")
|
||||
@ -287,12 +330,12 @@ class PostgresDatabaseLock:
|
||||
raise Exception(f"postgres lock {self.lock_name} does not exist")
|
||||
|
||||
def __enter__(self):
|
||||
if isinstance(self.db, PostgresDatabaseLock):
|
||||
if isinstance(self.db, PooledPostgresqlDatabase):
|
||||
self.lock()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if isinstance(self.db, PostgresDatabaseLock):
|
||||
if isinstance(self.db, PooledPostgresqlDatabase):
|
||||
self.unlock()
|
||||
|
||||
def __call__(self, func):
|
||||
@ -310,6 +353,7 @@ class MysqlDatabaseLock:
|
||||
self.timeout = int(timeout)
|
||||
self.db = db if db else DB
|
||||
|
||||
@with_retry(max_retries=3, retry_delay=1.0)
|
||||
def lock(self):
|
||||
# SQL parameters only support %s format placeholders
|
||||
cursor = self.db.execute_sql("SELECT GET_LOCK(%s, %s)", (self.lock_name, self.timeout))
|
||||
@ -321,6 +365,7 @@ class MysqlDatabaseLock:
|
||||
else:
|
||||
raise Exception(f"failed to acquire lock {self.lock_name}")
|
||||
|
||||
@with_retry(max_retries=3, retry_delay=1.0)
|
||||
def unlock(self):
|
||||
cursor = self.db.execute_sql("SELECT RELEASE_LOCK(%s)", (self.lock_name,))
|
||||
ret = cursor.fetchone()
|
||||
|
Loading…
x
Reference in New Issue
Block a user