mirror of
https://git.mirrors.martin98.com/https://github.com/infiniflow/ragflow.git
synced 2025-04-22 14:10:01 +08:00
Fix update_progress (#6340)
### What problem does this PR solve? Fix update_progress ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
parent
1d9ca172e3
commit
dba0caa00b
@ -29,6 +29,7 @@ import time
|
|||||||
import traceback
|
import traceback
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
import threading
|
import threading
|
||||||
|
import uuid
|
||||||
|
|
||||||
from werkzeug.serving import run_simple
|
from werkzeug.serving import run_simple
|
||||||
from api import settings
|
from api import settings
|
||||||
@ -47,17 +48,17 @@ from rag.utils.redis_conn import RedisDistributedLock
|
|||||||
stop_event = threading.Event()
|
stop_event = threading.Event()
|
||||||
|
|
||||||
def update_progress():
|
def update_progress():
|
||||||
redis_lock = RedisDistributedLock("update_progress", timeout=60)
|
lock_value = str(uuid.uuid4())
|
||||||
|
redis_lock = RedisDistributedLock("update_progress", lock_value=lock_value, timeout=60)
|
||||||
|
logging.info(f"update_progress lock_value: {lock_value}")
|
||||||
while not stop_event.is_set():
|
while not stop_event.is_set():
|
||||||
try:
|
try:
|
||||||
if not redis_lock.acquire():
|
if redis_lock.acquire():
|
||||||
continue
|
DocumentService.update_progress()
|
||||||
DocumentService.update_progress()
|
redis_lock.release()
|
||||||
stop_event.wait(6)
|
stop_event.wait(6)
|
||||||
except Exception:
|
except Exception:
|
||||||
logging.exception("update_progress exception")
|
logging.exception("update_progress exception")
|
||||||
finally:
|
|
||||||
redis_lock.release()
|
|
||||||
|
|
||||||
def signal_handler(sig, frame):
|
def signal_handler(sig, frame):
|
||||||
logging.info("Received interrupt signal, shutting down...")
|
logging.info("Received interrupt signal, shutting down...")
|
||||||
|
@ -48,10 +48,27 @@ class RedisMsg:
|
|||||||
|
|
||||||
@singleton
|
@singleton
|
||||||
class RedisDB:
|
class RedisDB:
|
||||||
|
lua_delete_if_equal = None
|
||||||
|
LUA_DELETE_IF_EQUAL_SCRIPT = """
|
||||||
|
local current_value = redis.call('get', KEYS[1])
|
||||||
|
if current_value and current_value == ARGV[1] then
|
||||||
|
redis.call('del', KEYS[1])
|
||||||
|
return 1
|
||||||
|
end
|
||||||
|
return 0
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.REDIS = None
|
self.REDIS = None
|
||||||
self.config = settings.REDIS
|
self.config = settings.REDIS
|
||||||
self.__open__()
|
self.__open__()
|
||||||
|
self.register_scripts()
|
||||||
|
|
||||||
|
def register_scripts(self) -> None:
|
||||||
|
cls = self.__class__
|
||||||
|
client = self.REDIS
|
||||||
|
if cls.lua_delete_if_equal is None:
|
||||||
|
cls.lua_delete_if_equal = client.register_script(cls.LUA_DELETE_IF_EQUAL_SCRIPT)
|
||||||
|
|
||||||
def __open__(self):
|
def __open__(self):
|
||||||
try:
|
try:
|
||||||
@ -277,6 +294,12 @@ class RedisDB:
|
|||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def delete_if_equal(self, key: str, expected_value: str) -> bool:
|
||||||
|
"""
|
||||||
|
Do follwing atomically:
|
||||||
|
Delete a key if its value is equals to the given one, do nothing otherwise.
|
||||||
|
"""
|
||||||
|
return bool(self.lua_delete_if_equal(keys=[key], args=[expected_value], client=self.REDIS))
|
||||||
|
|
||||||
REDIS_CONN = RedisDB()
|
REDIS_CONN = RedisDB()
|
||||||
|
|
||||||
@ -292,7 +315,8 @@ class RedisDistributedLock:
|
|||||||
self.lock = Lock(REDIS_CONN.REDIS, lock_key, timeout=timeout, blocking_timeout=blocking_timeout)
|
self.lock = Lock(REDIS_CONN.REDIS, lock_key, timeout=timeout, blocking_timeout=blocking_timeout)
|
||||||
|
|
||||||
def acquire(self):
|
def acquire(self):
|
||||||
return self.lock.acquire()
|
REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value)
|
||||||
|
return self.lock.acquire(token=self.lock_value)
|
||||||
|
|
||||||
def release(self):
|
def release(self):
|
||||||
return self.lock.release()
|
return self.lock.release()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user