mirror of
https://git.mirrors.martin98.com/https://github.com/infiniflow/ragflow.git
synced 2025-04-20 13:10:05 +08:00

ragflow: v0.17 also encountered this problem. #1453 The task table shows that the actual task has been completed. Since the process_msg of the task is not synchronized to the document table, there is no progress update on the page. This may be caused by the lock not being released when the exception occurs. ragflow:v0.17同样碰到这个问题, 看task表实际任务已经完成,由于没有把task的process_msg同步给document表, 所以在页面看没有进度更新。 可能是这里异常时没有释放锁导致的。 ```/api/ragflow_server.py def update_progress(): lock_value = str(uuid.uuid4()) redis_lock = RedisDistributedLock("update_progress", lock_value=lock_value, timeout=60) logging.info(f"update_progress lock_value: {lock_value}") while not stop_event.is_set(): try: if redis_lock.acquire(): DocumentService.update_progress() redis_lock.release() stop_event.wait(6) except Exception: logging.exception("update_progress exception") ++ if redis_lock.acquired: ++ redis_lock.release() ```
144 lines
4.5 KiB
Python
144 lines
4.5 KiB
Python
#
|
|
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
# from beartype import BeartypeConf
|
|
# from beartype.claw import beartype_all # <-- you didn't sign up for this
|
|
# beartype_all(conf=BeartypeConf(violation_type=UserWarning)) # <-- emit warnings from all code
|
|
|
|
from api.utils.log_utils import initRootLogger
|
|
initRootLogger("ragflow_server")
|
|
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import threading
|
|
import uuid
|
|
|
|
from werkzeug.serving import run_simple
|
|
from api import settings
|
|
from api.apps import app
|
|
from api.db.runtime_config import RuntimeConfig
|
|
from api.db.services.document_service import DocumentService
|
|
from api import utils
|
|
|
|
from api.db.db_models import init_database_tables as init_web_db
|
|
from api.db.init_data import init_web_data
|
|
from api.versions import get_ragflow_version
|
|
from api.utils import show_configs
|
|
from rag.settings import print_rag_settings
|
|
from rag.utils.redis_conn import RedisDistributedLock
|
|
|
|
stop_event = threading.Event()
|
|
|
|
RAGFLOW_DEBUGPY_LISTEN = int(os.environ.get('RAGFLOW_DEBUGPY_LISTEN', "0"))
|
|
|
|
def update_progress():
|
|
lock_value = str(uuid.uuid4())
|
|
redis_lock = RedisDistributedLock("update_progress", lock_value=lock_value, timeout=60)
|
|
logging.info(f"update_progress lock_value: {lock_value}")
|
|
while not stop_event.is_set():
|
|
try:
|
|
if redis_lock.acquire():
|
|
DocumentService.update_progress()
|
|
redis_lock.release()
|
|
stop_event.wait(6)
|
|
except Exception:
|
|
logging.exception("update_progress exception")
|
|
if redis_lock.acquired:
|
|
redis_lock.release()
|
|
|
|
def signal_handler(sig, frame):
|
|
logging.info("Received interrupt signal, shutting down...")
|
|
stop_event.set()
|
|
time.sleep(1)
|
|
sys.exit(0)
|
|
|
|
if __name__ == '__main__':
|
|
logging.info(r"""
|
|
____ ___ ______ ______ __
|
|
/ __ \ / | / ____// ____// /____ _ __
|
|
/ /_/ // /| | / / __ / /_ / // __ \| | /| / /
|
|
/ _, _// ___ |/ /_/ // __/ / // /_/ /| |/ |/ /
|
|
/_/ |_|/_/ |_|\____//_/ /_/ \____/ |__/|__/
|
|
|
|
""")
|
|
logging.info(
|
|
f'RAGFlow version: {get_ragflow_version()}'
|
|
)
|
|
logging.info(
|
|
f'project base: {utils.file_utils.get_project_base_directory()}'
|
|
)
|
|
show_configs()
|
|
settings.init_settings()
|
|
print_rag_settings()
|
|
|
|
if RAGFLOW_DEBUGPY_LISTEN > 0:
|
|
logging.info(f"debugpy listen on {RAGFLOW_DEBUGPY_LISTEN}")
|
|
import debugpy
|
|
debugpy.listen(("0.0.0.0", RAGFLOW_DEBUGPY_LISTEN))
|
|
|
|
# init db
|
|
init_web_db()
|
|
init_web_data()
|
|
# init runtime config
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
"--version", default=False, help="RAGFlow version", action="store_true"
|
|
)
|
|
parser.add_argument(
|
|
"--debug", default=False, help="debug mode", action="store_true"
|
|
)
|
|
args = parser.parse_args()
|
|
if args.version:
|
|
print(get_ragflow_version())
|
|
sys.exit(0)
|
|
|
|
RuntimeConfig.DEBUG = args.debug
|
|
if RuntimeConfig.DEBUG:
|
|
logging.info("run on debug mode")
|
|
|
|
RuntimeConfig.init_env()
|
|
RuntimeConfig.init_config(JOB_SERVER_HOST=settings.HOST_IP, HTTP_PORT=settings.HOST_PORT)
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
thread = ThreadPoolExecutor(max_workers=1)
|
|
thread.submit(update_progress)
|
|
|
|
# start http server
|
|
try:
|
|
logging.info("RAGFlow HTTP server start...")
|
|
run_simple(
|
|
hostname=settings.HOST_IP,
|
|
port=settings.HOST_PORT,
|
|
application=app,
|
|
threaded=True,
|
|
use_reloader=RuntimeConfig.DEBUG,
|
|
use_debugger=RuntimeConfig.DEBUG,
|
|
)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
stop_event.set()
|
|
time.sleep(1)
|
|
os.kill(os.getpid(), signal.SIGKILL)
|