Log llm response on exception (#6750)

### What problem does this PR solve?

Log llm response on exception

### Type of change

- [x] Refactoring
This commit is contained in:
Zhichang Yu 2025-04-02 17:10:57 +08:00 committed by GitHub
parent 724a36fcdb
commit e7a2a4b7ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 45 additions and 57 deletions

View File

@ -56,7 +56,8 @@ class Extractor:
response = self._llm.chat(system_msg[0]["content"], hist, conf)
response = re.sub(r"<think>.*</think>", "", response, flags=re.DOTALL)
if response.find("**ERROR**") >= 0:
raise Exception(response)
logging.warning(f"Extractor._chat got error. response: {response}")
return ""
set_llm_cache(self._llm.llm_name, system, response, history, gen_conf)
return response

View File

@ -94,7 +94,7 @@ class GraphExtractor(Extractor):
self._tuple_delimiter_key: DEFAULT_TUPLE_DELIMITER,
self._record_delimiter_key: DEFAULT_RECORD_DELIMITER,
self._completion_delimiter_key: DEFAULT_COMPLETION_DELIMITER,
self._entity_types_key: entity_types,
self._entity_types_key: ",".join(entity_types),
}
async def _process_single_content(self, chunk_key_dp: tuple[str, str], chunk_seq: int, num_chunks: int, out_results):

View File

@ -72,41 +72,51 @@ async def run_graphrag(
if not subgraph:
return
subgraph_nodes = set(subgraph.nodes())
new_graph = await merge_subgraph(
tenant_id,
kb_id,
doc_id,
subgraph,
embedding_model,
callback,
)
assert new_graph is not None
graphrag_task_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value=doc_id, timeout=3600)
while True:
if graphrag_task_lock.acquire():
break
callback(msg=f"merge_subgraph {doc_id} is waiting graphrag_task_lock")
await trio.sleep(20)
if not with_resolution or not with_community:
return
if with_resolution:
await resolve_entities(
new_graph,
subgraph_nodes,
try:
subgraph_nodes = set(subgraph.nodes())
new_graph = await merge_subgraph(
tenant_id,
kb_id,
doc_id,
chat_model,
embedding_model,
callback,
)
if with_community:
await extract_community(
new_graph,
tenant_id,
kb_id,
doc_id,
chat_model,
subgraph,
embedding_model,
callback,
)
assert new_graph is not None
if not with_resolution or not with_community:
return
if with_resolution:
await resolve_entities(
new_graph,
subgraph_nodes,
tenant_id,
kb_id,
doc_id,
chat_model,
embedding_model,
callback,
)
if with_community:
await extract_community(
new_graph,
tenant_id,
kb_id,
doc_id,
chat_model,
embedding_model,
callback,
)
finally:
graphrag_task_lock.release()
now = trio.current_time()
callback(msg=f"GraphRAG for doc {doc_id} done in {now - start:.2f} seconds.")
return
@ -191,13 +201,6 @@ async def merge_subgraph(
embedding_model,
callback,
):
graphrag_task_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value=doc_id, timeout=600)
while True:
if graphrag_task_lock.acquire():
break
callback(msg=f"merge_subgraph {doc_id} is waiting graphrag_task_lock")
await trio.sleep(10)
start = trio.current_time()
change = GraphChange()
old_graph = await get_graph(tenant_id, kb_id)
@ -214,7 +217,6 @@ async def merge_subgraph(
new_graph.nodes[node_name]["pagerank"] = pagerank
await set_graph(tenant_id, kb_id, embedding_model, new_graph, change, callback)
graphrag_task_lock.release()
now = trio.current_time()
callback(
msg=f"merging subgraph for doc {doc_id} into the global graph done in {now - start:.2f} seconds."
@ -232,13 +234,6 @@ async def resolve_entities(
embed_bdl,
callback,
):
graphrag_task_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value=doc_id, timeout=600)
while True:
if graphrag_task_lock.acquire():
break
callback(msg=f"resolve_entities {doc_id} is waiting graphrag_task_lock")
await trio.sleep(10)
start = trio.current_time()
er = EntityResolution(
llm_bdl,
@ -250,7 +245,6 @@ async def resolve_entities(
callback(msg="Graph resolution updated pagerank.")
await set_graph(tenant_id, kb_id, embed_bdl, graph, change, callback)
graphrag_task_lock.release()
now = trio.current_time()
callback(msg=f"Graph resolution done in {now - start:.2f}s.")
@ -264,13 +258,6 @@ async def extract_community(
embed_bdl,
callback,
):
graphrag_task_lock = RedisDistributedLock(f"graphrag_task_{kb_id}", lock_value=doc_id, timeout=600)
while True:
if graphrag_task_lock.acquire():
break
callback(msg=f"extract_community {doc_id} is waiting graphrag_task_lock")
await trio.sleep(10)
start = trio.current_time()
ext = CommunityReportsExtractor(
llm_bdl,
@ -326,7 +313,6 @@ async def extract_community(
error_message = f"Insert chunk error: {doc_store_result}, please check log file and Elasticsearch/Infinity status!"
raise Exception(error_message)
graphrag_task_lock.release()
now = trio.current_time()
callback(
msg=f"Graph indexed {len(cr.structured_output)} communities in {now - start:.2f}s."

View File

@ -82,7 +82,7 @@ class Base(ABC):
return ERROR_CONNECTION
elif "quota" in error_str or "capacity" in error_str or "credit" in error_str or "billing" in error_str or "limit" in error_str and "rate" not in error_str:
return ERROR_QUOTA
elif "filter" in error_str or "content" in error_str or "policy" in error_str or "blocked" in error_str or "safety" in error_str:
elif "filter" in error_str or "content" in error_str or "policy" in error_str or "blocked" in error_str or "safety" in error_str or "inappropriate" in error_str:
return ERROR_CONTENT_FILTER
elif "model" in error_str or "not found" in error_str or "does not exist" in error_str or "not available" in error_str:
return ERROR_MODEL
@ -110,6 +110,7 @@ class Base(ABC):
ans += LENGTH_NOTIFICATION_EN
return ans, self.total_token_count(response)
except Exception as e:
logging.exception("chat_model.Base.chat got exception")
# Classify the error
error_code = self._classify_error(e)
@ -124,7 +125,7 @@ class Base(ABC):
# For non-rate limit errors or the last attempt, return an error message
if attempt == self.max_retries - 1:
error_code = ERROR_MAX_RETRIES
return f"{ERROR_PREFIX}: {error_code} - {str(e)}", 0
return f"{ERROR_PREFIX}: {error_code} - {str(e)}. response: {response}", 0
def chat_streamly(self, system, history, gen_conf):
if system:

View File

@ -318,4 +318,4 @@ class RedisDistributedLock:
return self.lock.acquire(token=self.lock_value)
def release(self):
return self.lock.release()
REDIS_CONN.delete_if_equal(self.lock_key, self.lock_value)