From 6bd7d572ec696e00133bb03300c31a0982bfc646 Mon Sep 17 00:00:00 2001 From: liu an Date: Fri, 9 May 2025 17:52:03 +0800 Subject: [PATCH] Perf: Increase database connection pool size (#7559) ### What problem does this PR solve? 1. The MySQL instance is configured with max_connections=1000, but our connection pool was limited to max_connections: 100. This mismatch caused connection pool exhaustion during performance testing. 2. Increase stale_timeout to resolve #6548 ### Type of change - [x] Performance Improvement --- api/db/services/common_service.py | 98 ++++++++++++------------------- api/db/services/llm_service.py | 10 +--- conf/service_conf.yaml | 4 +- docker/service_conf.yaml.template | 4 +- 4 files changed, 45 insertions(+), 71 deletions(-) diff --git a/api/db/services/common_service.py b/api/db/services/common_service.py index 8f76bec8d..95f5d759f 100644 --- a/api/db/services/common_service.py +++ b/api/db/services/common_service.py @@ -18,57 +18,57 @@ from datetime import datetime import peewee from api.db.db_models import DB -from api.utils import datetime_format, current_timestamp, get_uuid +from api.utils import current_timestamp, datetime_format, get_uuid class CommonService: """Base service class that provides common database operations. - + This class serves as a foundation for all service classes in the application, implementing standard CRUD operations and common database query patterns. It uses the Peewee ORM for database interactions and provides a consistent interface for database operations across all derived service classes. - + Attributes: model: The Peewee model class that this service operates on. Must be set by subclasses. """ + model = None @classmethod @DB.connection_context() def query(cls, cols=None, reverse=None, order_by=None, **kwargs): """Execute a database query with optional column selection and ordering. - + This method provides a flexible way to query the database with various filters and sorting options. It supports column selection, sort order control, and additional filter conditions. - + Args: cols (list, optional): List of column names to select. If None, selects all columns. reverse (bool, optional): If True, sorts in descending order. If False, sorts in ascending order. order_by (str, optional): Column name to sort results by. **kwargs: Additional filter conditions passed as keyword arguments. - + Returns: peewee.ModelSelect: A query result containing matching records. """ - return cls.model.query(cols=cols, reverse=reverse, - order_by=order_by, **kwargs) + return cls.model.query(cols=cols, reverse=reverse, order_by=order_by, **kwargs) @classmethod @DB.connection_context() def get_all(cls, cols=None, reverse=None, order_by=None): """Retrieve all records from the database with optional column selection and ordering. - + This method fetches all records from the model's table with support for column selection and result ordering. If no order_by is specified and reverse is True, it defaults to ordering by create_time. - + Args: cols (list, optional): List of column names to select. If None, selects all columns. reverse (bool, optional): If True, sorts in descending order. If False, sorts in ascending order. order_by (str, optional): Column name to sort results by. Defaults to 'create_time' if reverse is specified. - + Returns: peewee.ModelSelect: A query containing all matching records. """ @@ -80,27 +80,25 @@ class CommonService: if not order_by or not hasattr(cls, order_by): order_by = "create_time" if reverse is True: - query_records = query_records.order_by( - cls.model.getter_by(order_by).desc()) + query_records = query_records.order_by(cls.model.getter_by(order_by).desc()) elif reverse is False: - query_records = query_records.order_by( - cls.model.getter_by(order_by).asc()) + query_records = query_records.order_by(cls.model.getter_by(order_by).asc()) return query_records @classmethod @DB.connection_context() def get(cls, **kwargs): """Get a single record matching the given criteria. - + This method retrieves a single record from the database that matches the specified filter conditions. - + Args: **kwargs: Filter conditions as keyword arguments. - + Returns: Model instance: Single matching record. - + Raises: peewee.DoesNotExist: If no matching record is found. """ @@ -110,13 +108,13 @@ class CommonService: @DB.connection_context() def get_or_none(cls, **kwargs): """Get a single record or None if not found. - + This method attempts to retrieve a single record matching the given criteria, returning None if no match is found instead of raising an exception. - + Args: **kwargs: Filter conditions as keyword arguments. - + Returns: Model instance or None: Matching record if found, None otherwise. """ @@ -129,13 +127,13 @@ class CommonService: @DB.connection_context() def save(cls, **kwargs): """Save a new record to database. - + This method creates a new record in the database with the provided field values, forcing an insert operation rather than an update. - + Args: **kwargs: Record field values as keyword arguments. - + Returns: Model instance: The created record object. """ @@ -146,13 +144,13 @@ class CommonService: @DB.connection_context() def insert(cls, **kwargs): """Insert a new record with automatic ID and timestamps. - + This method creates a new record with automatically generated ID and timestamp fields. It handles the creation of create_time, create_date, update_time, and update_date fields. - + Args: **kwargs: Record field values as keyword arguments. - + Returns: Model instance: The newly created record object. """ @@ -169,10 +167,10 @@ class CommonService: @DB.connection_context() def insert_many(cls, data_list, batch_size=100): """Insert multiple records in batches. - + This method efficiently inserts multiple records into the database using batch processing. It automatically sets creation timestamps for all records. - + Args: data_list (list): List of dictionaries containing record data to insert. batch_size (int, optional): Number of records to insert in each batch. Defaults to 100. @@ -182,16 +180,16 @@ class CommonService: d["create_time"] = current_timestamp() d["create_date"] = datetime_format(datetime.now()) for i in range(0, len(data_list), batch_size): - cls.model.insert_many(data_list[i:i + batch_size]).execute() + cls.model.insert_many(data_list[i : i + batch_size]).execute() @classmethod @DB.connection_context() def update_many_by_id(cls, data_list): """Update multiple records by their IDs. - + This method updates multiple records in the database, identified by their IDs. It automatically updates the update_time and update_date fields for each record. - + Args: data_list (list): List of dictionaries containing record data to update. Each dictionary must include an 'id' field. @@ -200,8 +198,7 @@ class CommonService: for data in data_list: data["update_time"] = current_timestamp() data["update_date"] = datetime_format(datetime.now()) - cls.model.update(data).where( - cls.model.id == data["id"]).execute() + cls.model.update(data).where(cls.model.id == data["id"]).execute() @classmethod @DB.connection_context() @@ -212,12 +209,6 @@ class CommonService: # data: Updated field values # Returns: # Number of records updated - try: - if not DB.is_connection_usable(): - DB.connect() - except Exception: - DB.close() - DB.connect() data["update_time"] = current_timestamp() data["update_date"] = datetime_format(datetime.now()) num = cls.model.update(data).where(cls.model.id == pid).execute() @@ -263,7 +254,7 @@ class CommonService: # Returns: # Number of records deleted return cls.model.delete().where(cls.model.id == pid).execute() - + @classmethod @DB.connection_context() def delete_by_ids(cls, pids): @@ -310,13 +301,12 @@ class CommonService: # List of tuples containing chunks length = len(tar_list) arr = range(length) - result = [tuple(tar_list[x:(x + n)]) for x in arr[::n]] + result = [tuple(tar_list[x : (x + n)]) for x in arr[::n]] return result @classmethod @DB.connection_context() - def filter_scope_list(cls, in_key, in_filters_list, - filters=None, cols=None): + def filter_scope_list(cls, in_key, in_filters_list, filters=None, cols=None): # Get records matching IN clause filters with optional column selection # Args: # in_key: Field name for IN clause @@ -331,22 +321,12 @@ class CommonService: res_list = [] if cols: for i in in_filters_tuple_list: - query_records = cls.model.select( - * - cols).where( - getattr( - cls.model, - in_key).in_(i), - * - filters) + query_records = cls.model.select(*cols).where(getattr(cls.model, in_key).in_(i), *filters) if query_records: - res_list.extend( - [query_record for query_record in query_records]) + res_list.extend([query_record for query_record in query_records]) else: for i in in_filters_tuple_list: - query_records = cls.model.select().where( - getattr(cls.model, in_key).in_(i), *filters) + query_records = cls.model.select().where(getattr(cls.model, in_key).in_(i), *filters) if query_records: - res_list.extend( - [query_record for query_record in query_records]) + res_list.extend([query_record for query_record in query_records]) return res_list diff --git a/api/db/services/llm_service.py b/api/db/services/llm_service.py index def50f9c1..8c20812db 100644 --- a/api/db/services/llm_service.py +++ b/api/db/services/llm_service.py @@ -105,7 +105,7 @@ class TenantLLMService(CommonService): if model_config: model_config = model_config.to_dict() llm = LLMService.query(llm_name=mdlnm) if not fid else LLMService.query(llm_name=mdlnm, fid=fid) - if not llm and fid: # for some cases seems fid mismatch + if not llm and fid: # for some cases seems fid mismatch llm = LLMService.query(llm_name=mdlnm) if llm: model_config["is_tools"] = llm[0].is_tools @@ -163,12 +163,6 @@ class TenantLLMService(CommonService): @classmethod @DB.connection_context() def increase_usage(cls, tenant_id, llm_type, used_tokens, llm_name=None): - try: - if not DB.is_connection_usable(): - DB.connect() - except Exception: - DB.close() - DB.connect() e, tenant = TenantService.get_by_id(tenant_id) if not e: logging.error(f"Tenant not found: {tenant_id}") @@ -366,7 +360,7 @@ class LLMBundle: ans = "" chat_streamly = self.mdl.chat_streamly - total_tokens = 0 + total_tokens = 0 if self.is_tools and self.mdl.is_tools: chat_streamly = self.mdl.chat_streamly_with_tools diff --git a/conf/service_conf.yaml b/conf/service_conf.yaml index dfb936a9b..45f4e9685 100644 --- a/conf/service_conf.yaml +++ b/conf/service_conf.yaml @@ -7,8 +7,8 @@ mysql: password: 'infini_rag_flow' host: 'localhost' port: 5455 - max_connections: 100 - stale_timeout: 30 + max_connections: 900 + stale_timeout: 300 minio: user: 'rag_flow' password: 'infini_rag_flow' diff --git a/docker/service_conf.yaml.template b/docker/service_conf.yaml.template index 3354925be..42a699833 100644 --- a/docker/service_conf.yaml.template +++ b/docker/service_conf.yaml.template @@ -7,8 +7,8 @@ mysql: password: '${MYSQL_PASSWORD:-infini_rag_flow}' host: '${MYSQL_HOST:-mysql}' port: 3306 - max_connections: 100 - stale_timeout: 30 + max_connections: 900 + stale_timeout: 300 minio: user: '${MINIO_USER:-rag_flow}' password: '${MINIO_PASSWORD:-infini_rag_flow}'