mirror of
https://git.mirrors.martin98.com/https://github.com/infiniflow/ragflow.git
synced 2025-08-12 06:49:00 +08:00
add taskexecutor status check (#2038)
### What problem does this PR solve? ### Type of change - [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
parent
9b3f5fd38b
commit
0f95086813
@ -177,4 +177,4 @@ def test_db_connect():
|
||||
db.close()
|
||||
return get_json_result(retmsg="Database Connection Successful!")
|
||||
except Exception as e:
|
||||
return server_error_response(str(e))
|
||||
return server_error_response(e)
|
||||
|
@ -13,6 +13,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License
|
||||
#
|
||||
import json
|
||||
|
||||
from flask_login import login_required
|
||||
|
||||
from api.db.services.knowledgebase_service import KnowledgebaseService
|
||||
@ -65,4 +67,20 @@ def status():
|
||||
except Exception as e:
|
||||
res["redis"] = {"status": "red", "elapsed": "{:.1f}".format((timer() - st)*1000.), "error": str(e)}
|
||||
|
||||
try:
|
||||
obj = json.loads(REDIS_CONN.get("TASKEXE"))
|
||||
color = "green"
|
||||
for id in obj.keys():
|
||||
arr = obj[id]
|
||||
if len(arr) == 1:
|
||||
obj[id] = [0]
|
||||
else:
|
||||
obj[id] = [arr[i+1]-arr[i] for i in range(len(arr)-1)]
|
||||
elapsed = max(obj[id])
|
||||
if elapsed > 50: color = "yellow"
|
||||
if elapsed > 120: color = "red"
|
||||
res["task_executor"] = {"status": color, "elapsed": obj}
|
||||
except Exception as e:
|
||||
res["task_executor"] = {"status": "red", "error": str(e)}
|
||||
|
||||
return get_json_result(data=res)
|
||||
|
@ -23,6 +23,7 @@ import re
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from functools import partial
|
||||
|
||||
from api.db.services.file2document_service import File2DocumentService
|
||||
@ -373,11 +374,28 @@ def main():
|
||||
r["id"], tk_count, len(cks), timer() - st))
|
||||
|
||||
|
||||
def report_status():
|
||||
id = "0" if len(sys.argv) < 2 else sys.argv[1]
|
||||
while True:
|
||||
try:
|
||||
obj = REDIS_CONN.get("TASKEXE")
|
||||
obj = json.load(obj)
|
||||
if id not in obj: obj[id] = []
|
||||
obj[id].append(timer()*1000)
|
||||
obj[id] = obj[id][:-60]
|
||||
REDIS_CONN.set_obj("TASKEXE", obj)
|
||||
except Exception as e:
|
||||
print("[Exception]:", str(e))
|
||||
time.sleep(60)
|
||||
|
||||
if __name__ == "__main__":
|
||||
peewee_logger = logging.getLogger('peewee')
|
||||
peewee_logger.propagate = False
|
||||
peewee_logger.addHandler(database_logger.handlers[0])
|
||||
peewee_logger.setLevel(database_logger.level)
|
||||
|
||||
exe = ThreadPoolExecutor(max_workers=1)
|
||||
exe.submit(report_status)
|
||||
|
||||
while True:
|
||||
main()
|
||||
|
Loading…
x
Reference in New Issue
Block a user