feat(scrapeURL/pdf): switch to MinerU

This commit is contained in:
Gergő Móricz 2024-12-27 16:37:32 +01:00
parent c543f4f76c
commit 0b55fb836b

View File

@ -1,4 +1,3 @@
import { createReadStream, promises as fs } from "node:fs";
import { Meta } from "../.."; import { Meta } from "../..";
import { EngineScrapeResult } from ".."; import { EngineScrapeResult } from "..";
import * as marked from "marked"; import * as marked from "marked";
@ -8,55 +7,42 @@ import * as Sentry from "@sentry/node";
import escapeHtml from "escape-html"; import escapeHtml from "escape-html";
import PdfParse from "pdf-parse"; import PdfParse from "pdf-parse";
import { downloadFile, fetchFileToBuffer } from "../utils/downloadFile"; import { downloadFile, fetchFileToBuffer } from "../utils/downloadFile";
import { RemoveFeatureError } from "../../error"; import { RemoveFeatureError, UnsupportedFileError } from "../../error";
import { stat, readFile, unlink } from "node:fs/promises";
import path from "node:path";
type PDFProcessorResult = { html: string; markdown?: string }; type PDFProcessorResult = { html: string; markdown?: string };
async function scrapePDFWithLlamaParse( async function scrapePDFWithMinerU(
meta: Meta, meta: Meta,
tempFilePath: string, tempFilePath: string,
timeToRun: number | undefined, timeToRun: number | undefined,
): Promise<PDFProcessorResult> { ): Promise<PDFProcessorResult> {
meta.logger.debug("Processing PDF document with LlamaIndex", { meta.logger.debug("Processing PDF document with MinerU", {
tempFilePath, tempFilePath,
}); });
const uploadForm = new FormData(); const fileStat = await stat(tempFilePath);
if (fileStat.size > ((2**10)**2)*10) {
throw new UnsupportedFileError("File is larger than PDF parser limit (10MiB)");
}
// This is utterly stupid but it works! - mogery console.log(tempFilePath);
uploadForm.append("file", {
[Symbol.toStringTag]: "Blob",
name: tempFilePath,
stream() {
return createReadStream(
tempFilePath,
) as unknown as ReadableStream<Uint8Array>;
},
bytes() {
throw Error("Unimplemented in mock Blob: bytes");
},
arrayBuffer() {
throw Error("Unimplemented in mock Blob: arrayBuffer");
},
size: (await fs.stat(tempFilePath)).size,
text() {
throw Error("Unimplemented in mock Blob: text");
},
slice(start, end, contentType) {
throw Error("Unimplemented in mock Blob: slice");
},
type: "application/pdf",
} as Blob);
const upload = await robustFetch({ const upload = await robustFetch({
url: "https://api.cloud.llamaindex.ai/api/parsing/upload", url: "https://api.runpod.ai/v2/" + process.env.MINERU_POD_ID + "/run",
method: "POST", method: "POST",
headers: { headers: {
Authorization: `Bearer ${process.env.LLAMAPARSE_API_KEY}`, Authorization: `Bearer ${process.env.MINERU_API_KEY}`,
},
body: {
input: {
file_content: (await readFile(tempFilePath)).toString("base64"),
filename: path.basename(tempFilePath) + ".pdf",
},
}, },
body: uploadForm,
logger: meta.logger.child({ logger: meta.logger.child({
method: "scrapePDFWithLlamaParse/upload/robustFetch", method: "scrapePDFWithMinerU/upload/robustFetch",
}), }),
schema: z.object({ schema: z.object({
id: z.string(), id: z.string(),
@ -72,35 +58,48 @@ async function scrapePDFWithLlamaParse(
while (Date.now() <= startedAt + timeout) { while (Date.now() <= startedAt + timeout) {
try { try {
const result = await robustFetch({ const result = await robustFetch({
url: `https://api.cloud.llamaindex.ai/api/parsing/job/${jobId}/result/markdown`, url: `https://api.runpod.ai/v2/${process.env.MINERU_POD_ID}/status/${jobId}`,
method: "GET", method: "GET",
headers: { headers: {
Authorization: `Bearer ${process.env.LLAMAPARSE_API_KEY}`, Authorization: `Bearer ${process.env.MINERU_API_KEY}`,
}, },
logger: meta.logger.child({ logger: meta.logger.child({
method: "scrapePDFWithLlamaParse/result/robustFetch", method: "scrapePDFWithMinerU/result/robustFetch",
}), }),
schema: z.object({ schema: z.object({
markdown: z.string(), status: z.string(),
error: z.any().optional(),
output: z.object({
markdown: z.string(),
}).optional(),
}), }),
}); });
return {
markdown: result.markdown, if (result.status === "COMPLETED") {
html: await marked.parse(result.markdown, { async: true }), return {
}; markdown: result.output!.markdown,
html: await marked.parse(result.output!.markdown, { async: true }),
};
}
if (result.status === "FAILED") {
throw new Error("MinerU failed to parse PDF: " + result.error!, { cause: result.error });
}
// result not up yet
} catch (e) { } catch (e) {
if (e instanceof Error && e.message === "Request sent failure status") { if (e instanceof Error && e.message === "Request sent failure status") {
if ((e.cause as any).response.status === 404) { // if ((e.cause as any).response.status === 404) {
// no-op, result not up yet // // no-op, result not up yet
} else if ((e.cause as any).response.body.includes("PDF_IS_BROKEN")) { // } else if ((e.cause as any).response.body.includes("PDF_IS_BROKEN")) {
// URL is not a PDF, actually! // // URL is not a PDF, actually!
meta.logger.debug("URL is not actually a PDF, signalling..."); // meta.logger.debug("URL is not actually a PDF, signalling...");
throw new RemoveFeatureError(["pdf"]); // throw new RemoveFeatureError(["pdf"]);
} else { // } else {
throw new Error("LlamaParse threw an error", { throw new Error("MinerU threw an error", {
cause: e.cause, cause: e.cause,
}); });
} // }
} else { } else {
throw e; throw e;
} }
@ -109,7 +108,7 @@ async function scrapePDFWithLlamaParse(
await new Promise<void>((resolve) => setTimeout(() => resolve(), 250)); await new Promise<void>((resolve) => setTimeout(() => resolve(), 250));
} }
throw new Error("LlamaParse timed out"); throw new Error("MinerU timed out");
} }
async function scrapePDFWithParsePDF( async function scrapePDFWithParsePDF(
@ -118,7 +117,7 @@ async function scrapePDFWithParsePDF(
): Promise<PDFProcessorResult> { ): Promise<PDFProcessorResult> {
meta.logger.debug("Processing PDF document with parse-pdf", { tempFilePath }); meta.logger.debug("Processing PDF document with parse-pdf", { tempFilePath });
const result = await PdfParse(await fs.readFile(tempFilePath)); const result = await PdfParse(await readFile(tempFilePath));
const escaped = escapeHtml(result.text); const escaped = escapeHtml(result.text);
return { return {
@ -158,34 +157,33 @@ export async function scrapePDF(
tempFilePath, tempFilePath,
); );
// If the parsed text is under 500 characters and LLAMAPARSE_API_KEY exists, try LlamaParse // Then, if output is too short, pass to MinerU
if ( if (
result.markdown && result.markdown && result.markdown.length < 500 &&
result.markdown.length < 500 && process.env.MINERU_API_KEY && process.env.MINERU_POD_ID
process.env.LLAMAPARSE_API_KEY
) { ) {
try { try {
const llamaResult = await scrapePDFWithLlamaParse( const mineruResult = await scrapePDFWithMinerU(
{ {
...meta, ...meta,
logger: meta.logger.child({ logger: meta.logger.child({
method: "scrapePDF/scrapePDFWithLlamaParse", method: "scrapePDF/scrapePDFWithMinerU",
}), }),
}, },
tempFilePath, tempFilePath,
timeToRun, timeToRun,
); );
result = llamaResult; // Use LlamaParse result if successful result = mineruResult; // Use LlamaParse result if successful
} catch (error) { } catch (error) {
if (error instanceof Error && error.message === "LlamaParse timed out") { if (error instanceof Error && error.message === "MinerU timed out") {
meta.logger.warn("LlamaParse timed out -- using parse-pdf result", { meta.logger.warn("MinerU timed out -- using parse-pdf result", {
error, error,
}); });
} else if (error instanceof RemoveFeatureError) { } else if (error instanceof RemoveFeatureError) {
throw error; throw error;
} else { } else {
meta.logger.warn( meta.logger.warn(
"LlamaParse failed to parse PDF -- using parse-pdf result", "MinerU failed to parse PDF -- using parse-pdf result",
{ error }, { error },
); );
Sentry.captureException(error); Sentry.captureException(error);
@ -193,7 +191,7 @@ export async function scrapePDF(
} }
} }
await fs.unlink(tempFilePath); await unlink(tempFilePath);
return { return {
url: response.url, url: response.url,