From 0f95086813b09b5373ca600361faab90d05175fd Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Wed, 21 Aug 2024 17:48:00 +0800 Subject: [PATCH] add taskexecutor status check (#2038) ### What problem does this PR solve? ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- api/apps/canvas_app.py | 2 +- api/apps/system_app.py | 18 ++++++++++++++++++ rag/svr/task_executor.py | 18 ++++++++++++++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/api/apps/canvas_app.py b/api/apps/canvas_app.py index 8584df9cf..0e7470513 100644 --- a/api/apps/canvas_app.py +++ b/api/apps/canvas_app.py @@ -177,4 +177,4 @@ def test_db_connect(): db.close() return get_json_result(retmsg="Database Connection Successful!") except Exception as e: - return server_error_response(str(e)) + return server_error_response(e) diff --git a/api/apps/system_app.py b/api/apps/system_app.py index fddbe1427..937f83ec9 100644 --- a/api/apps/system_app.py +++ b/api/apps/system_app.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License # +import json + from flask_login import login_required from api.db.services.knowledgebase_service import KnowledgebaseService @@ -65,4 +67,20 @@ def status(): except Exception as e: res["redis"] = {"status": "red", "elapsed": "{:.1f}".format((timer() - st)*1000.), "error": str(e)} + try: + obj = json.loads(REDIS_CONN.get("TASKEXE")) + color = "green" + for id in obj.keys(): + arr = obj[id] + if len(arr) == 1: + obj[id] = [0] + else: + obj[id] = [arr[i+1]-arr[i] for i in range(len(arr)-1)] + elapsed = max(obj[id]) + if elapsed > 50: color = "yellow" + if elapsed > 120: color = "red" + res["task_executor"] = {"status": color, "elapsed": obj} + except Exception as e: + res["task_executor"] = {"status": "red", "error": str(e)} + return get_json_result(data=res) diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 2cd04b0f2..f772882d6 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -23,6 +23,7 @@ import re import sys import time import traceback +from concurrent.futures import ThreadPoolExecutor from functools import partial from api.db.services.file2document_service import File2DocumentService @@ -373,11 +374,28 @@ def main(): r["id"], tk_count, len(cks), timer() - st)) +def report_status(): + id = "0" if len(sys.argv) < 2 else sys.argv[1] + while True: + try: + obj = REDIS_CONN.get("TASKEXE") + obj = json.load(obj) + if id not in obj: obj[id] = [] + obj[id].append(timer()*1000) + obj[id] = obj[id][:-60] + REDIS_CONN.set_obj("TASKEXE", obj) + except Exception as e: + print("[Exception]:", str(e)) + time.sleep(60) + if __name__ == "__main__": peewee_logger = logging.getLogger('peewee') peewee_logger.propagate = False peewee_logger.addHandler(database_logger.handlers[0]) peewee_logger.setLevel(database_logger.level) + exe = ThreadPoolExecutor(max_workers=1) + exe.submit(report_status) + while True: main()