Perf: Increase database connection pool size (#7559)

### What problem does this PR solve?

1. The MySQL instance is configured with max_connections=1000,
but our connection pool was limited to max_connections: 100.
This mismatch caused connection pool exhaustion during performance
testing.

2.  Increase stale_timeout to resolve #6548

### Type of change

- [x] Performance Improvement
This commit is contained in:
liu an 2025-05-09 17:52:03 +08:00 committed by GitHub
parent 5b626870d0
commit 6bd7d572ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 45 additions and 71 deletions

View File

@ -18,57 +18,57 @@ from datetime import datetime
import peewee
from api.db.db_models import DB
from api.utils import datetime_format, current_timestamp, get_uuid
from api.utils import current_timestamp, datetime_format, get_uuid
class CommonService:
"""Base service class that provides common database operations.
This class serves as a foundation for all service classes in the application,
implementing standard CRUD operations and common database query patterns.
It uses the Peewee ORM for database interactions and provides a consistent
interface for database operations across all derived service classes.
Attributes:
model: The Peewee model class that this service operates on. Must be set by subclasses.
"""
model = None
@classmethod
@DB.connection_context()
def query(cls, cols=None, reverse=None, order_by=None, **kwargs):
"""Execute a database query with optional column selection and ordering.
This method provides a flexible way to query the database with various filters
and sorting options. It supports column selection, sort order control, and
additional filter conditions.
Args:
cols (list, optional): List of column names to select. If None, selects all columns.
reverse (bool, optional): If True, sorts in descending order. If False, sorts in ascending order.
order_by (str, optional): Column name to sort results by.
**kwargs: Additional filter conditions passed as keyword arguments.
Returns:
peewee.ModelSelect: A query result containing matching records.
"""
return cls.model.query(cols=cols, reverse=reverse,
order_by=order_by, **kwargs)
return cls.model.query(cols=cols, reverse=reverse, order_by=order_by, **kwargs)
@classmethod
@DB.connection_context()
def get_all(cls, cols=None, reverse=None, order_by=None):
"""Retrieve all records from the database with optional column selection and ordering.
This method fetches all records from the model's table with support for
column selection and result ordering. If no order_by is specified and reverse
is True, it defaults to ordering by create_time.
Args:
cols (list, optional): List of column names to select. If None, selects all columns.
reverse (bool, optional): If True, sorts in descending order. If False, sorts in ascending order.
order_by (str, optional): Column name to sort results by. Defaults to 'create_time' if reverse is specified.
Returns:
peewee.ModelSelect: A query containing all matching records.
"""
@ -80,27 +80,25 @@ class CommonService:
if not order_by or not hasattr(cls, order_by):
order_by = "create_time"
if reverse is True:
query_records = query_records.order_by(
cls.model.getter_by(order_by).desc())
query_records = query_records.order_by(cls.model.getter_by(order_by).desc())
elif reverse is False:
query_records = query_records.order_by(
cls.model.getter_by(order_by).asc())
query_records = query_records.order_by(cls.model.getter_by(order_by).asc())
return query_records
@classmethod
@DB.connection_context()
def get(cls, **kwargs):
"""Get a single record matching the given criteria.
This method retrieves a single record from the database that matches
the specified filter conditions.
Args:
**kwargs: Filter conditions as keyword arguments.
Returns:
Model instance: Single matching record.
Raises:
peewee.DoesNotExist: If no matching record is found.
"""
@ -110,13 +108,13 @@ class CommonService:
@DB.connection_context()
def get_or_none(cls, **kwargs):
"""Get a single record or None if not found.
This method attempts to retrieve a single record matching the given criteria,
returning None if no match is found instead of raising an exception.
Args:
**kwargs: Filter conditions as keyword arguments.
Returns:
Model instance or None: Matching record if found, None otherwise.
"""
@ -129,13 +127,13 @@ class CommonService:
@DB.connection_context()
def save(cls, **kwargs):
"""Save a new record to database.
This method creates a new record in the database with the provided field values,
forcing an insert operation rather than an update.
Args:
**kwargs: Record field values as keyword arguments.
Returns:
Model instance: The created record object.
"""
@ -146,13 +144,13 @@ class CommonService:
@DB.connection_context()
def insert(cls, **kwargs):
"""Insert a new record with automatic ID and timestamps.
This method creates a new record with automatically generated ID and timestamp fields.
It handles the creation of create_time, create_date, update_time, and update_date fields.
Args:
**kwargs: Record field values as keyword arguments.
Returns:
Model instance: The newly created record object.
"""
@ -169,10 +167,10 @@ class CommonService:
@DB.connection_context()
def insert_many(cls, data_list, batch_size=100):
"""Insert multiple records in batches.
This method efficiently inserts multiple records into the database using batch processing.
It automatically sets creation timestamps for all records.
Args:
data_list (list): List of dictionaries containing record data to insert.
batch_size (int, optional): Number of records to insert in each batch. Defaults to 100.
@ -182,16 +180,16 @@ class CommonService:
d["create_time"] = current_timestamp()
d["create_date"] = datetime_format(datetime.now())
for i in range(0, len(data_list), batch_size):
cls.model.insert_many(data_list[i:i + batch_size]).execute()
cls.model.insert_many(data_list[i : i + batch_size]).execute()
@classmethod
@DB.connection_context()
def update_many_by_id(cls, data_list):
"""Update multiple records by their IDs.
This method updates multiple records in the database, identified by their IDs.
It automatically updates the update_time and update_date fields for each record.
Args:
data_list (list): List of dictionaries containing record data to update.
Each dictionary must include an 'id' field.
@ -200,8 +198,7 @@ class CommonService:
for data in data_list:
data["update_time"] = current_timestamp()
data["update_date"] = datetime_format(datetime.now())
cls.model.update(data).where(
cls.model.id == data["id"]).execute()
cls.model.update(data).where(cls.model.id == data["id"]).execute()
@classmethod
@DB.connection_context()
@ -212,12 +209,6 @@ class CommonService:
# data: Updated field values
# Returns:
# Number of records updated
try:
if not DB.is_connection_usable():
DB.connect()
except Exception:
DB.close()
DB.connect()
data["update_time"] = current_timestamp()
data["update_date"] = datetime_format(datetime.now())
num = cls.model.update(data).where(cls.model.id == pid).execute()
@ -263,7 +254,7 @@ class CommonService:
# Returns:
# Number of records deleted
return cls.model.delete().where(cls.model.id == pid).execute()
@classmethod
@DB.connection_context()
def delete_by_ids(cls, pids):
@ -310,13 +301,12 @@ class CommonService:
# List of tuples containing chunks
length = len(tar_list)
arr = range(length)
result = [tuple(tar_list[x:(x + n)]) for x in arr[::n]]
result = [tuple(tar_list[x : (x + n)]) for x in arr[::n]]
return result
@classmethod
@DB.connection_context()
def filter_scope_list(cls, in_key, in_filters_list,
filters=None, cols=None):
def filter_scope_list(cls, in_key, in_filters_list, filters=None, cols=None):
# Get records matching IN clause filters with optional column selection
# Args:
# in_key: Field name for IN clause
@ -331,22 +321,12 @@ class CommonService:
res_list = []
if cols:
for i in in_filters_tuple_list:
query_records = cls.model.select(
*
cols).where(
getattr(
cls.model,
in_key).in_(i),
*
filters)
query_records = cls.model.select(*cols).where(getattr(cls.model, in_key).in_(i), *filters)
if query_records:
res_list.extend(
[query_record for query_record in query_records])
res_list.extend([query_record for query_record in query_records])
else:
for i in in_filters_tuple_list:
query_records = cls.model.select().where(
getattr(cls.model, in_key).in_(i), *filters)
query_records = cls.model.select().where(getattr(cls.model, in_key).in_(i), *filters)
if query_records:
res_list.extend(
[query_record for query_record in query_records])
res_list.extend([query_record for query_record in query_records])
return res_list

View File

@ -105,7 +105,7 @@ class TenantLLMService(CommonService):
if model_config:
model_config = model_config.to_dict()
llm = LLMService.query(llm_name=mdlnm) if not fid else LLMService.query(llm_name=mdlnm, fid=fid)
if not llm and fid: # for some cases seems fid mismatch
if not llm and fid: # for some cases seems fid mismatch
llm = LLMService.query(llm_name=mdlnm)
if llm:
model_config["is_tools"] = llm[0].is_tools
@ -163,12 +163,6 @@ class TenantLLMService(CommonService):
@classmethod
@DB.connection_context()
def increase_usage(cls, tenant_id, llm_type, used_tokens, llm_name=None):
try:
if not DB.is_connection_usable():
DB.connect()
except Exception:
DB.close()
DB.connect()
e, tenant = TenantService.get_by_id(tenant_id)
if not e:
logging.error(f"Tenant not found: {tenant_id}")
@ -366,7 +360,7 @@ class LLMBundle:
ans = ""
chat_streamly = self.mdl.chat_streamly
total_tokens = 0
total_tokens = 0
if self.is_tools and self.mdl.is_tools:
chat_streamly = self.mdl.chat_streamly_with_tools

View File

@ -7,8 +7,8 @@ mysql:
password: 'infini_rag_flow'
host: 'localhost'
port: 5455
max_connections: 100
stale_timeout: 30
max_connections: 900
stale_timeout: 300
minio:
user: 'rag_flow'
password: 'infini_rag_flow'

View File

@ -7,8 +7,8 @@ mysql:
password: '${MYSQL_PASSWORD:-infini_rag_flow}'
host: '${MYSQL_HOST:-mysql}'
port: 3306
max_connections: 100
stale_timeout: 30
max_connections: 900
stale_timeout: 300
minio:
user: '${MINIO_USER:-rag_flow}'
password: '${MINIO_PASSWORD:-infini_rag_flow}'