import random import re from io import BytesIO from nltk import word_tokenize from openpyxl import load_workbook from rag.parser import is_english, random_choices from rag.nlp import huqie, stemmer class Excel(object): def __call__(self, fnm, binary=None, callback=None): if not binary: wb = load_workbook(fnm) else: wb = load_workbook(BytesIO(binary)) total = 0 for sheetname in wb.sheetnames: total += len(list(wb[sheetname].rows)) res, fails = [], [] for sheetname in wb.sheetnames: ws = wb[sheetname] rows = list(ws.rows) for i, r in enumerate(rows): q, a = "", "" for cell in r: if not cell.value: continue if not q: q = str(cell.value) elif not a: a = str(cell.value) else: break if q and a: res.append((q, a)) else: fails.append(str(i+1)) if len(res) % 999 == 0: callback(len(res)*0.6/total, ("Extract Q&A: {}".format(len(res)) + (f"{len(fails)} failure, line: %s..."%(",".join(fails[:3])) if fails else ""))) callback(0.6, ("Extract Q&A: {}. ".format(len(res)) + ( f"{len(fails)} failure, line: %s..." % (",".join(fails[:3])) if fails else ""))) self.is_english = is_english([rmPrefix(q) for q, _ in random_choices(res, k=30) if len(q)>1]) return res def rmPrefix(txt): return re.sub(r"^(问题|答案|回答|user|assistant|Q|A|Question|Answer|问|答)[\t:: ]+", "", txt.strip(), flags=re.IGNORECASE) def beAdoc(d, q, a, eng): qprefix = "Question: " if eng else "问题:" aprefix = "Answer: " if eng else "回答:" d["content_with_weight"] = "\t".join([qprefix+rmPrefix(q), aprefix+rmPrefix(a)]) if eng: d["content_ltks"] = " ".join([stemmer.stem(w) for w in word_tokenize(q)]) else: d["content_ltks"] = huqie.qie(q) d["content_sm_ltks"] = huqie.qieqie(d["content_ltks"]) return d def chunk(filename, binary=None, callback=None, **kwargs): res = [] if re.search(r"\.xlsx?$", filename, re.IGNORECASE): callback(0.1, "Start to parse.") excel_parser = Excel() for q,a in excel_parser(filename, binary, callback): res.append(beAdoc({}, q, a, excel_parser.is_english)) return res elif re.search(r"\.(txt|csv)$", filename, re.IGNORECASE): callback(0.1, "Start to parse.") txt = "" if binary: txt = binary.decode("utf-8") else: with open(filename, "r") as f: while True: l = f.readline() if not l: break txt += l lines = txt.split("\n") eng = is_english([rmPrefix(l) for l in lines[:100]]) fails = [] for i, line in enumerate(lines): arr = [l for l in line.split("\t") if len(l) > 1] if len(arr) != 2: fails.append(str(i)) continue res.append(beAdoc({}, arr[0], arr[1], eng)) if len(res) % 999 == 0: callback(len(res) * 0.6 / len(lines), ("Extract Q&A: {}".format(len(res)) + ( f"{len(fails)} failure, line: %s..." % (",".join(fails[:3])) if fails else ""))) callback(0.6, ("Extract Q&A: {}".format(len(res)) + ( f"{len(fails)} failure, line: %s..." % (",".join(fails[:3])) if fails else ""))) return res raise NotImplementedError("file type not supported yet(pptx, pdf supported)") if __name__== "__main__": import sys def dummy(a, b): pass chunk(sys.argv[1], callback=dummy)