mirror of
https://git.mirrors.martin98.com/https://github.com/langgenius/dify.git
synced 2025-04-22 05:39:42 +08:00
82 lines
3.1 KiB
Python
82 lines
3.1 KiB
Python
import datetime
|
|
import time
|
|
|
|
import click
|
|
from werkzeug.exceptions import NotFound
|
|
|
|
import app
|
|
from configs import dify_config
|
|
from extensions.ext_database import db
|
|
from extensions.ext_redis import redis_client
|
|
from models.model import (
|
|
App,
|
|
Message,
|
|
MessageAgentThought,
|
|
MessageAnnotation,
|
|
MessageChain,
|
|
MessageFeedback,
|
|
MessageFile,
|
|
)
|
|
from models.web import SavedMessage
|
|
from services.feature_service import FeatureService
|
|
|
|
|
|
@app.celery.task(queue="dataset")
|
|
def clean_messages():
|
|
click.echo(click.style("Start clean messages.", fg="green"))
|
|
start_at = time.perf_counter()
|
|
plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
|
|
days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
|
|
)
|
|
while True:
|
|
try:
|
|
# Main query with join and filter
|
|
# FIXME:for mypy no paginate method error
|
|
messages = (
|
|
db.session.query(Message) # type: ignore
|
|
.filter(Message.created_at < plan_sandbox_clean_message_day)
|
|
.order_by(Message.created_at.desc())
|
|
.limit(100)
|
|
.all()
|
|
)
|
|
|
|
except NotFound:
|
|
break
|
|
if not messages:
|
|
break
|
|
for message in messages:
|
|
plan_sandbox_clean_message_day = message.created_at
|
|
app = App.query.filter_by(id=message.app_id).first()
|
|
features_cache_key = f"features:{app.tenant_id}"
|
|
plan_cache = redis_client.get(features_cache_key)
|
|
if plan_cache is None:
|
|
features = FeatureService.get_features(app.tenant_id)
|
|
redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
|
|
plan = features.billing.subscription.plan
|
|
else:
|
|
plan = plan_cache.decode()
|
|
if plan == "sandbox":
|
|
# clean related message
|
|
db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
|
|
synchronize_session=False
|
|
)
|
|
db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
|
|
synchronize_session=False
|
|
)
|
|
db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
|
|
synchronize_session=False
|
|
)
|
|
db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
|
|
synchronize_session=False
|
|
)
|
|
db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
|
|
synchronize_session=False
|
|
)
|
|
db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
|
|
synchronize_session=False
|
|
)
|
|
db.session.query(Message).filter(Message.id == message.id).delete()
|
|
db.session.commit()
|
|
end_at = time.perf_counter()
|
|
click.echo(click.style("Cleaned messages from db success latency: {}".format(end_at - start_at), fg="green"))
|