Fix: pymysql.err.InterfaceError: (0, '') during long time streaming chat responses (#6548) (#7057)

### Related Issue:
https://github.com/infiniflow/ragflow/issues/6548

### Related PR:
https://github.com/infiniflow/ragflow/pull/6861


### Environment:
Commit version:
[[48730e0](48730e00a8)]

### Bug Description:
Unexpected `pymysql.err.InterfaceError: (0, '') `when using Peewee +
PyMySQL + PooledMySQLDatabase after a long-running `chat streamly`
operation.

This is a common issue with Peewee + PyMySQL + connection pooling: you
end up using a connection that was silently closed by the server, but
Peewee doesn't realize it's dead.

**I found that the error only occurs during longer streaming outputs**
and is unrelated to the database connection context, so it's likely
because:

- The prolonged streaming response caused the database connection to
time out

- The original database connection might have been disconnected by the
server during the streaming process

### Why This Happens
This error happens even when using `@DB.connection_context() `after the
stream is done. After investigation, I found this is caused by MySQL
connection pools that appear to be open but are actually dead (expired
due to` wait_timeout`).

1. `@DB.connection_context()` (as a decorator or context manager) pulls
a connection from the pool.

2. If this connection was idle and expired on the MySQL server (e.g.,
due to `wait_timeout`), but not closed in Python, it will still be
considered “open” (`DB.is_closed() == False`).

3. The real error will occur only when I execute a SQL command (such as
.`get_or_none()`), and PyMySQL tries to send it to the server via a
broken socket.


### Changes Made:

1. I implemented manual connection checks before executing SQL:
```
    try:
        DB.execute_sql("SELECT 1")
    except Exception:
        print("Connection dead, reconnecting...")
        DB.close()
        DB.connect()
```
2. Delayed the token count update until after the streaming response is
completed to ensure the streaming output isn't interrupted by database
operations.
```
        total_tokens = 0 
        for txt in chat_streamly(system, history, gen_conf):
            if isinstance(txt, int):
                total_tokens = txt
......
                break
......
        if total_tokens > 0:
            if not TenantLLMService.increase_usage(self.tenant_id, self.llm_type, txt, self.llm_name):
                logging.error("LLMBundle.chat_streamly can't update token usage for {}/CHAT llm_name: {}, content: {}".format(self.tenant_id, self.llm_name, txt))
```
This commit is contained in:
alulala 2025-04-16 19:15:35 +08:00 committed by GitHub
parent de5727f90a
commit 5d253e0a34
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -159,6 +159,12 @@ class TenantLLMService(CommonService):
@classmethod
@DB.connection_context()
def increase_usage(cls, tenant_id, llm_type, used_tokens, llm_name=None):
try:
if not DB.is_connection_usable():
DB.connect()
except Exception:
DB.close()
DB.connect()
e, tenant = TenantService.get_by_id(tenant_id)
if not e:
logging.error(f"Tenant not found: {tenant_id}")
@ -356,21 +362,22 @@ class LLMBundle:
ans = ""
chat_streamly = self.mdl.chat_streamly
total_tokens = 0
if self.is_tools and self.mdl.is_tools:
chat_streamly = self.mdl.chat_streamly_with_tools
for txt in chat_streamly(system, history, gen_conf):
if isinstance(txt, int):
total_tokens = txt
if self.langfuse:
generation.end(output={"output": ans})
if not TenantLLMService.increase_usage(self.tenant_id, self.llm_type, txt, self.llm_name):
logging.error("LLMBundle.chat_streamly can't update token usage for {}/CHAT llm_name: {}, content: {}".format(self.tenant_id, self.llm_name, txt))
return ans
break
if txt.endswith("</think>"):
ans = ans.rstrip("</think>")
ans += txt
yield ans
if total_tokens > 0:
if not TenantLLMService.increase_usage(self.tenant_id, self.llm_type, txt, self.llm_name):
logging.error("LLMBundle.chat_streamly can't update token usage for {}/CHAT llm_name: {}, content: {}".format(self.tenant_id, self.llm_name, txt))