Merge pull request #1016 from mendableai/mog/mineru

feat(scrapeURL/pdf): switch to MU (FIR-356)
This commit is contained in:
Nicolas 2024-12-27 21:19:48 -03:00 committed by GitHub
commit c1fa5a44ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,4 +1,3 @@
import { createReadStream, promises as fs } from "node:fs";
import { Meta } from "../.."; import { Meta } from "../..";
import { EngineScrapeResult } from ".."; import { EngineScrapeResult } from "..";
import * as marked from "marked"; import * as marked from "marked";
@ -8,108 +7,51 @@ import * as Sentry from "@sentry/node";
import escapeHtml from "escape-html"; import escapeHtml from "escape-html";
import PdfParse from "pdf-parse"; import PdfParse from "pdf-parse";
import { downloadFile, fetchFileToBuffer } from "../utils/downloadFile"; import { downloadFile, fetchFileToBuffer } from "../utils/downloadFile";
import { RemoveFeatureError } from "../../error"; import { RemoveFeatureError, UnsupportedFileError } from "../../error";
import { readFile, unlink } from "node:fs/promises";
import path from "node:path";
type PDFProcessorResult = { html: string; markdown?: string }; type PDFProcessorResult = { html: string; markdown?: string };
async function scrapePDFWithLlamaParse( const MAX_FILE_SIZE = 19 * 1024 * 1024; // 19MB
async function scrapePDFWithRunPodMU(
meta: Meta, meta: Meta,
tempFilePath: string, tempFilePath: string,
timeToRun: number | undefined, timeToRun: number | undefined,
base64Content: string,
): Promise<PDFProcessorResult> { ): Promise<PDFProcessorResult> {
meta.logger.debug("Processing PDF document with LlamaIndex", { meta.logger.debug("Processing PDF document with RunPod MU", {
tempFilePath, tempFilePath,
}); });
const uploadForm = new FormData(); const result = await robustFetch({
url:
// This is utterly stupid but it works! - mogery "https://api.runpod.ai/v2/" + process.env.RUNPOD_MU_POD_ID + "/runsync",
uploadForm.append("file", {
[Symbol.toStringTag]: "Blob",
name: tempFilePath,
stream() {
return createReadStream(
tempFilePath,
) as unknown as ReadableStream<Uint8Array>;
},
bytes() {
throw Error("Unimplemented in mock Blob: bytes");
},
arrayBuffer() {
throw Error("Unimplemented in mock Blob: arrayBuffer");
},
size: (await fs.stat(tempFilePath)).size,
text() {
throw Error("Unimplemented in mock Blob: text");
},
slice(start, end, contentType) {
throw Error("Unimplemented in mock Blob: slice");
},
type: "application/pdf",
} as Blob);
const upload = await robustFetch({
url: "https://api.cloud.llamaindex.ai/api/parsing/upload",
method: "POST", method: "POST",
headers: { headers: {
Authorization: `Bearer ${process.env.LLAMAPARSE_API_KEY}`, Authorization: `Bearer ${process.env.RUNPOD_MU_API_KEY}`,
},
body: {
input: {
file_content: base64Content,
filename: path.basename(tempFilePath) + ".pdf",
},
}, },
body: uploadForm,
logger: meta.logger.child({ logger: meta.logger.child({
method: "scrapePDFWithLlamaParse/upload/robustFetch", method: "scrapePDFWithRunPodMU/robustFetch",
}), }),
schema: z.object({ schema: z.object({
id: z.string(), output: z.object({
markdown: z.string(),
}),
}), }),
}); });
const jobId = upload.id; return {
markdown: result.output.markdown,
// TODO: timeout, retries html: await marked.parse(result.output.markdown, { async: true }),
const startedAt = Date.now(); };
const timeout = timeToRun ?? 300000;
while (Date.now() <= startedAt + timeout) {
try {
const result = await robustFetch({
url: `https://api.cloud.llamaindex.ai/api/parsing/job/${jobId}/result/markdown`,
method: "GET",
headers: {
Authorization: `Bearer ${process.env.LLAMAPARSE_API_KEY}`,
},
logger: meta.logger.child({
method: "scrapePDFWithLlamaParse/result/robustFetch",
}),
schema: z.object({
markdown: z.string(),
}),
});
return {
markdown: result.markdown,
html: await marked.parse(result.markdown, { async: true }),
};
} catch (e) {
if (e instanceof Error && e.message === "Request sent failure status") {
if ((e.cause as any).response.status === 404) {
// no-op, result not up yet
} else if ((e.cause as any).response.body.includes("PDF_IS_BROKEN")) {
// URL is not a PDF, actually!
meta.logger.debug("URL is not actually a PDF, signalling...");
throw new RemoveFeatureError(["pdf"]);
} else {
throw new Error("LlamaParse threw an error", {
cause: e.cause,
});
}
} else {
throw e;
}
}
await new Promise<void>((resolve) => setTimeout(() => resolve(), 250));
}
throw new Error("LlamaParse timed out");
} }
async function scrapePDFWithParsePDF( async function scrapePDFWithParsePDF(
@ -118,7 +60,7 @@ async function scrapePDFWithParsePDF(
): Promise<PDFProcessorResult> { ): Promise<PDFProcessorResult> {
meta.logger.debug("Processing PDF document with parse-pdf", { tempFilePath }); meta.logger.debug("Processing PDF document with parse-pdf", { tempFilePath });
const result = await PdfParse(await fs.readFile(tempFilePath)); const result = await PdfParse(await readFile(tempFilePath));
const escaped = escapeHtml(result.text); const escaped = escapeHtml(result.text);
return { return {
@ -147,59 +89,57 @@ export async function scrapePDF(
let result: PDFProcessorResult | null = null; let result: PDFProcessorResult | null = null;
// First, try parsing with PdfParse const base64Content = (await readFile(tempFilePath)).toString("base64");
result = await scrapePDFWithParsePDF(
{
...meta,
logger: meta.logger.child({
method: "scrapePDF/scrapePDFWithParsePDF",
}),
},
tempFilePath,
);
// If the parsed text is under 500 characters and LLAMAPARSE_API_KEY exists, try LlamaParse // First try RunPod MU if conditions are met
if ( if (
result.markdown && base64Content.length < MAX_FILE_SIZE &&
result.markdown.length < 500 && process.env.RUNPOD_MU_API_KEY &&
process.env.LLAMAPARSE_API_KEY process.env.RUNPOD_MU_POD_ID
) { ) {
try { try {
const llamaResult = await scrapePDFWithLlamaParse( result = await scrapePDFWithRunPodMU(
{ {
...meta, ...meta,
logger: meta.logger.child({ logger: meta.logger.child({
method: "scrapePDF/scrapePDFWithLlamaParse", method: "scrapePDF/scrapePDFWithRunPodMU",
}), }),
}, },
tempFilePath, tempFilePath,
timeToRun, timeToRun,
base64Content,
); );
result = llamaResult; // Use LlamaParse result if successful
} catch (error) { } catch (error) {
if (error instanceof Error && error.message === "LlamaParse timed out") { if (error instanceof RemoveFeatureError) {
meta.logger.warn("LlamaParse timed out -- using parse-pdf result", {
error,
});
} else if (error instanceof RemoveFeatureError) {
throw error; throw error;
} else {
meta.logger.warn(
"LlamaParse failed to parse PDF -- using parse-pdf result",
{ error },
);
Sentry.captureException(error);
} }
meta.logger.warn(
"RunPod MU failed to parse PDF -- falling back to parse-pdf",
{ error },
);
Sentry.captureException(error);
} }
} }
await fs.unlink(tempFilePath); // If RunPod MU failed or wasn't attempted, use PdfParse
if (!result) {
result = await scrapePDFWithParsePDF(
{
...meta,
logger: meta.logger.child({
method: "scrapePDF/scrapePDFWithParsePDF",
}),
},
tempFilePath,
);
}
await unlink(tempFilePath);
return { return {
url: response.url, url: response.url,
statusCode: response.status, statusCode: response.status,
html: result?.html ?? "",
html: result.html, markdown: result?.markdown ?? "",
markdown: result.markdown,
}; };
} }