mirror of
https://git.mirrors.martin98.com/https://github.com/infiniflow/ragflow.git
synced 2025-06-04 11:24:00 +08:00
Let ThreadPool exit gracefully. (#3653)
### What problem does this PR solve? #3646 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
parent
5c59651bda
commit
0891a393d7
@ -65,7 +65,3 @@ class Crawler(ComponentBase, ABC):
|
|||||||
elif self._param.extract_type == 'content':
|
elif self._param.extract_type == 'content':
|
||||||
result.extracted_content
|
result.extracted_content
|
||||||
return result.markdown
|
return result.markdown
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -64,27 +64,27 @@ def build_knowledge_graph_chunks(tenant_id: str, chunks: list[str], callback, en
|
|||||||
BATCH_SIZE=4
|
BATCH_SIZE=4
|
||||||
texts, graphs = [], []
|
texts, graphs = [], []
|
||||||
cnt = 0
|
cnt = 0
|
||||||
threads = []
|
|
||||||
max_workers = int(os.environ.get('GRAPH_EXTRACTOR_MAX_WORKERS', 50))
|
max_workers = int(os.environ.get('GRAPH_EXTRACTOR_MAX_WORKERS', 50))
|
||||||
exe = ThreadPoolExecutor(max_workers=max_workers)
|
with ThreadPoolExecutor(max_workers=max_workers) as exe:
|
||||||
for i in range(len(chunks)):
|
threads = []
|
||||||
tkn_cnt = num_tokens_from_string(chunks[i])
|
for i in range(len(chunks)):
|
||||||
if cnt+tkn_cnt >= left_token_count and texts:
|
tkn_cnt = num_tokens_from_string(chunks[i])
|
||||||
|
if cnt+tkn_cnt >= left_token_count and texts:
|
||||||
|
for b in range(0, len(texts), BATCH_SIZE):
|
||||||
|
threads.append(exe.submit(ext, ["\n".join(texts[b:b+BATCH_SIZE])], {"entity_types": entity_types}, callback))
|
||||||
|
texts = []
|
||||||
|
cnt = 0
|
||||||
|
texts.append(chunks[i])
|
||||||
|
cnt += tkn_cnt
|
||||||
|
if texts:
|
||||||
for b in range(0, len(texts), BATCH_SIZE):
|
for b in range(0, len(texts), BATCH_SIZE):
|
||||||
threads.append(exe.submit(ext, ["\n".join(texts[b:b+BATCH_SIZE])], {"entity_types": entity_types}, callback))
|
threads.append(exe.submit(ext, ["\n".join(texts[b:b+BATCH_SIZE])], {"entity_types": entity_types}, callback))
|
||||||
texts = []
|
|
||||||
cnt = 0
|
|
||||||
texts.append(chunks[i])
|
|
||||||
cnt += tkn_cnt
|
|
||||||
if texts:
|
|
||||||
for b in range(0, len(texts), BATCH_SIZE):
|
|
||||||
threads.append(exe.submit(ext, ["\n".join(texts[b:b+BATCH_SIZE])], {"entity_types": entity_types}, callback))
|
|
||||||
|
|
||||||
callback(0.5, "Extracting entities.")
|
callback(0.5, "Extracting entities.")
|
||||||
graphs = []
|
graphs = []
|
||||||
for i, _ in enumerate(threads):
|
for i, _ in enumerate(threads):
|
||||||
graphs.append(_.result().output)
|
graphs.append(_.result().output)
|
||||||
callback(0.5 + 0.1*i/len(threads), f"Entities extraction progress ... {i+1}/{len(threads)}")
|
callback(0.5 + 0.1*i/len(threads), f"Entities extraction progress ... {i+1}/{len(threads)}")
|
||||||
|
|
||||||
graph = reduce(graph_merge, graphs) if graphs else nx.Graph()
|
graph = reduce(graph_merge, graphs) if graphs else nx.Graph()
|
||||||
er = EntityResolution(llm_bdl)
|
er = EntityResolution(llm_bdl)
|
||||||
|
@ -88,26 +88,26 @@ class MindMapExtractor:
|
|||||||
prompt_variables = {}
|
prompt_variables = {}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
max_workers = int(os.environ.get('MINDMAP_EXTRACTOR_MAX_WORKERS', 12))
|
|
||||||
exe = ThreadPoolExecutor(max_workers=max_workers)
|
|
||||||
threads = []
|
|
||||||
token_count = max(self._llm.max_length * 0.8, self._llm.max_length - 512)
|
|
||||||
texts = []
|
|
||||||
res = []
|
res = []
|
||||||
cnt = 0
|
max_workers = int(os.environ.get('MINDMAP_EXTRACTOR_MAX_WORKERS', 12))
|
||||||
for i in range(len(sections)):
|
with ThreadPoolExecutor(max_workers=max_workers) as exe:
|
||||||
section_cnt = num_tokens_from_string(sections[i])
|
threads = []
|
||||||
if cnt + section_cnt >= token_count and texts:
|
token_count = max(self._llm.max_length * 0.8, self._llm.max_length - 512)
|
||||||
|
texts = []
|
||||||
|
cnt = 0
|
||||||
|
for i in range(len(sections)):
|
||||||
|
section_cnt = num_tokens_from_string(sections[i])
|
||||||
|
if cnt + section_cnt >= token_count and texts:
|
||||||
|
threads.append(exe.submit(self._process_document, "".join(texts), prompt_variables))
|
||||||
|
texts = []
|
||||||
|
cnt = 0
|
||||||
|
texts.append(sections[i])
|
||||||
|
cnt += section_cnt
|
||||||
|
if texts:
|
||||||
threads.append(exe.submit(self._process_document, "".join(texts), prompt_variables))
|
threads.append(exe.submit(self._process_document, "".join(texts), prompt_variables))
|
||||||
texts = []
|
|
||||||
cnt = 0
|
|
||||||
texts.append(sections[i])
|
|
||||||
cnt += section_cnt
|
|
||||||
if texts:
|
|
||||||
threads.append(exe.submit(self._process_document, "".join(texts), prompt_variables))
|
|
||||||
|
|
||||||
for i, _ in enumerate(threads):
|
for i, _ in enumerate(threads):
|
||||||
res.append(_.result())
|
res.append(_.result())
|
||||||
|
|
||||||
if not res:
|
if not res:
|
||||||
return MindMapResult(output={"id": "root", "children": []})
|
return MindMapResult(output={"id": "root", "children": []})
|
||||||
|
@ -366,7 +366,7 @@ class OllamaChat(Base):
|
|||||||
keep_alive=-1
|
keep_alive=-1
|
||||||
)
|
)
|
||||||
ans = response["message"]["content"].strip()
|
ans = response["message"]["content"].strip()
|
||||||
return ans, response["eval_count"] + response.get("prompt_eval_count", 0)
|
return ans, response.get("eval_count", 0) + response.get("prompt_eval_count", 0)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return "**ERROR**: " + str(e), 0
|
return "**ERROR**: " + str(e), 0
|
||||||
|
|
||||||
|
@ -492,6 +492,7 @@ def report_status():
|
|||||||
logging.exception("report_status got exception")
|
logging.exception("report_status got exception")
|
||||||
time.sleep(30)
|
time.sleep(30)
|
||||||
|
|
||||||
|
|
||||||
def analyze_heap(snapshot1: tracemalloc.Snapshot, snapshot2: tracemalloc.Snapshot, snapshot_id: int, dump_full: bool):
|
def analyze_heap(snapshot1: tracemalloc.Snapshot, snapshot2: tracemalloc.Snapshot, snapshot_id: int, dump_full: bool):
|
||||||
msg = ""
|
msg = ""
|
||||||
if dump_full:
|
if dump_full:
|
||||||
@ -508,6 +509,7 @@ def analyze_heap(snapshot1: tracemalloc.Snapshot, snapshot2: tracemalloc.Snapsho
|
|||||||
msg += '\n'.join(stat.traceback.format())
|
msg += '\n'.join(stat.traceback.format())
|
||||||
logging.info(msg)
|
logging.info(msg)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
settings.init_settings()
|
settings.init_settings()
|
||||||
background_thread = threading.Thread(target=report_status)
|
background_thread = threading.Thread(target=report_status)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user