ragflow/api/ragflow_server.py
benni82 b70abe52b2
Fix: Ensure lock is released in update_progress using context manager (#6975)
ragflow: v0.17 also encountered this problem. #1453 The task table shows
that the actual task has been completed. Since the process_msg of the
task is not synchronized to the document table, there is no progress
update on the page.
This may be caused by the lock not being released when the exception
occurs.

ragflow:v0.17同样碰到这个问题, 看task表实际任务已经完成,由于没有把task的process_msg同步给document表,
所以在页面看没有进度更新。
可能是这里异常时没有释放锁导致的。

```/api/ragflow_server.py
def update_progress():
    lock_value = str(uuid.uuid4())
    redis_lock = RedisDistributedLock("update_progress", lock_value=lock_value, timeout=60)
    logging.info(f"update_progress lock_value: {lock_value}")
    while not stop_event.is_set():
        try:
            if redis_lock.acquire():
                DocumentService.update_progress()
                redis_lock.release()
            stop_event.wait(6)
        except Exception:
            logging.exception("update_progress exception")
++       if redis_lock.acquired:
++               redis_lock.release()
```
2025-04-11 20:46:19 +08:00

144 lines
4.5 KiB
Python

#
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# from beartype import BeartypeConf
# from beartype.claw import beartype_all # <-- you didn't sign up for this
# beartype_all(conf=BeartypeConf(violation_type=UserWarning)) # <-- emit warnings from all code
from api.utils.log_utils import initRootLogger
initRootLogger("ragflow_server")
import logging
import os
import signal
import sys
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
import threading
import uuid
from werkzeug.serving import run_simple
from api import settings
from api.apps import app
from api.db.runtime_config import RuntimeConfig
from api.db.services.document_service import DocumentService
from api import utils
from api.db.db_models import init_database_tables as init_web_db
from api.db.init_data import init_web_data
from api.versions import get_ragflow_version
from api.utils import show_configs
from rag.settings import print_rag_settings
from rag.utils.redis_conn import RedisDistributedLock
stop_event = threading.Event()
RAGFLOW_DEBUGPY_LISTEN = int(os.environ.get('RAGFLOW_DEBUGPY_LISTEN', "0"))
def update_progress():
lock_value = str(uuid.uuid4())
redis_lock = RedisDistributedLock("update_progress", lock_value=lock_value, timeout=60)
logging.info(f"update_progress lock_value: {lock_value}")
while not stop_event.is_set():
try:
if redis_lock.acquire():
DocumentService.update_progress()
redis_lock.release()
stop_event.wait(6)
except Exception:
logging.exception("update_progress exception")
if redis_lock.acquired:
redis_lock.release()
def signal_handler(sig, frame):
logging.info("Received interrupt signal, shutting down...")
stop_event.set()
time.sleep(1)
sys.exit(0)
if __name__ == '__main__':
logging.info(r"""
____ ___ ______ ______ __
/ __ \ / | / ____// ____// /____ _ __
/ /_/ // /| | / / __ / /_ / // __ \| | /| / /
/ _, _// ___ |/ /_/ // __/ / // /_/ /| |/ |/ /
/_/ |_|/_/ |_|\____//_/ /_/ \____/ |__/|__/
""")
logging.info(
f'RAGFlow version: {get_ragflow_version()}'
)
logging.info(
f'project base: {utils.file_utils.get_project_base_directory()}'
)
show_configs()
settings.init_settings()
print_rag_settings()
if RAGFLOW_DEBUGPY_LISTEN > 0:
logging.info(f"debugpy listen on {RAGFLOW_DEBUGPY_LISTEN}")
import debugpy
debugpy.listen(("0.0.0.0", RAGFLOW_DEBUGPY_LISTEN))
# init db
init_web_db()
init_web_data()
# init runtime config
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"--version", default=False, help="RAGFlow version", action="store_true"
)
parser.add_argument(
"--debug", default=False, help="debug mode", action="store_true"
)
args = parser.parse_args()
if args.version:
print(get_ragflow_version())
sys.exit(0)
RuntimeConfig.DEBUG = args.debug
if RuntimeConfig.DEBUG:
logging.info("run on debug mode")
RuntimeConfig.init_env()
RuntimeConfig.init_config(JOB_SERVER_HOST=settings.HOST_IP, HTTP_PORT=settings.HOST_PORT)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
thread = ThreadPoolExecutor(max_workers=1)
thread.submit(update_progress)
# start http server
try:
logging.info("RAGFlow HTTP server start...")
run_simple(
hostname=settings.HOST_IP,
port=settings.HOST_PORT,
application=app,
threaded=True,
use_reloader=RuntimeConfig.DEBUG,
use_debugger=RuntimeConfig.DEBUG,
)
except Exception:
traceback.print_exc()
stop_event.set()
time.sleep(1)
os.kill(os.getpid(), signal.SIGKILL)