From 0b55fb836bd66336e2ba6cde2f0d5fca42bd3874 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Fri, 27 Dec 2024 16:37:32 +0100 Subject: [PATCH 1/4] feat(scrapeURL/pdf): switch to MinerU --- .../scraper/scrapeURL/engines/pdf/index.ts | 124 +++++++++--------- 1 file changed, 61 insertions(+), 63 deletions(-) diff --git a/apps/api/src/scraper/scrapeURL/engines/pdf/index.ts b/apps/api/src/scraper/scrapeURL/engines/pdf/index.ts index 6bac2ba4..dfc72df0 100644 --- a/apps/api/src/scraper/scrapeURL/engines/pdf/index.ts +++ b/apps/api/src/scraper/scrapeURL/engines/pdf/index.ts @@ -1,4 +1,3 @@ -import { createReadStream, promises as fs } from "node:fs"; import { Meta } from "../.."; import { EngineScrapeResult } from ".."; import * as marked from "marked"; @@ -8,55 +7,42 @@ import * as Sentry from "@sentry/node"; import escapeHtml from "escape-html"; import PdfParse from "pdf-parse"; import { downloadFile, fetchFileToBuffer } from "../utils/downloadFile"; -import { RemoveFeatureError } from "../../error"; +import { RemoveFeatureError, UnsupportedFileError } from "../../error"; +import { stat, readFile, unlink } from "node:fs/promises"; +import path from "node:path"; type PDFProcessorResult = { html: string; markdown?: string }; -async function scrapePDFWithLlamaParse( +async function scrapePDFWithMinerU( meta: Meta, tempFilePath: string, timeToRun: number | undefined, ): Promise { - meta.logger.debug("Processing PDF document with LlamaIndex", { + meta.logger.debug("Processing PDF document with MinerU", { tempFilePath, }); - const uploadForm = new FormData(); + const fileStat = await stat(tempFilePath); + if (fileStat.size > ((2**10)**2)*10) { + throw new UnsupportedFileError("File is larger than PDF parser limit (10MiB)"); + } - // This is utterly stupid but it works! - mogery - uploadForm.append("file", { - [Symbol.toStringTag]: "Blob", - name: tempFilePath, - stream() { - return createReadStream( - tempFilePath, - ) as unknown as ReadableStream; - }, - bytes() { - throw Error("Unimplemented in mock Blob: bytes"); - }, - arrayBuffer() { - throw Error("Unimplemented in mock Blob: arrayBuffer"); - }, - size: (await fs.stat(tempFilePath)).size, - text() { - throw Error("Unimplemented in mock Blob: text"); - }, - slice(start, end, contentType) { - throw Error("Unimplemented in mock Blob: slice"); - }, - type: "application/pdf", - } as Blob); + console.log(tempFilePath); const upload = await robustFetch({ - url: "https://api.cloud.llamaindex.ai/api/parsing/upload", + url: "https://api.runpod.ai/v2/" + process.env.MINERU_POD_ID + "/run", method: "POST", headers: { - Authorization: `Bearer ${process.env.LLAMAPARSE_API_KEY}`, + Authorization: `Bearer ${process.env.MINERU_API_KEY}`, + }, + body: { + input: { + file_content: (await readFile(tempFilePath)).toString("base64"), + filename: path.basename(tempFilePath) + ".pdf", + }, }, - body: uploadForm, logger: meta.logger.child({ - method: "scrapePDFWithLlamaParse/upload/robustFetch", + method: "scrapePDFWithMinerU/upload/robustFetch", }), schema: z.object({ id: z.string(), @@ -72,35 +58,48 @@ async function scrapePDFWithLlamaParse( while (Date.now() <= startedAt + timeout) { try { const result = await robustFetch({ - url: `https://api.cloud.llamaindex.ai/api/parsing/job/${jobId}/result/markdown`, + url: `https://api.runpod.ai/v2/${process.env.MINERU_POD_ID}/status/${jobId}`, method: "GET", headers: { - Authorization: `Bearer ${process.env.LLAMAPARSE_API_KEY}`, + Authorization: `Bearer ${process.env.MINERU_API_KEY}`, }, logger: meta.logger.child({ - method: "scrapePDFWithLlamaParse/result/robustFetch", + method: "scrapePDFWithMinerU/result/robustFetch", }), schema: z.object({ - markdown: z.string(), + status: z.string(), + error: z.any().optional(), + output: z.object({ + markdown: z.string(), + }).optional(), }), }); - return { - markdown: result.markdown, - html: await marked.parse(result.markdown, { async: true }), - }; + + if (result.status === "COMPLETED") { + return { + markdown: result.output!.markdown, + html: await marked.parse(result.output!.markdown, { async: true }), + }; + } + + if (result.status === "FAILED") { + throw new Error("MinerU failed to parse PDF: " + result.error!, { cause: result.error }); + } + + // result not up yet } catch (e) { if (e instanceof Error && e.message === "Request sent failure status") { - if ((e.cause as any).response.status === 404) { - // no-op, result not up yet - } else if ((e.cause as any).response.body.includes("PDF_IS_BROKEN")) { - // URL is not a PDF, actually! - meta.logger.debug("URL is not actually a PDF, signalling..."); - throw new RemoveFeatureError(["pdf"]); - } else { - throw new Error("LlamaParse threw an error", { + // if ((e.cause as any).response.status === 404) { + // // no-op, result not up yet + // } else if ((e.cause as any).response.body.includes("PDF_IS_BROKEN")) { + // // URL is not a PDF, actually! + // meta.logger.debug("URL is not actually a PDF, signalling..."); + // throw new RemoveFeatureError(["pdf"]); + // } else { + throw new Error("MinerU threw an error", { cause: e.cause, }); - } + // } } else { throw e; } @@ -109,7 +108,7 @@ async function scrapePDFWithLlamaParse( await new Promise((resolve) => setTimeout(() => resolve(), 250)); } - throw new Error("LlamaParse timed out"); + throw new Error("MinerU timed out"); } async function scrapePDFWithParsePDF( @@ -118,7 +117,7 @@ async function scrapePDFWithParsePDF( ): Promise { meta.logger.debug("Processing PDF document with parse-pdf", { tempFilePath }); - const result = await PdfParse(await fs.readFile(tempFilePath)); + const result = await PdfParse(await readFile(tempFilePath)); const escaped = escapeHtml(result.text); return { @@ -158,34 +157,33 @@ export async function scrapePDF( tempFilePath, ); - // If the parsed text is under 500 characters and LLAMAPARSE_API_KEY exists, try LlamaParse + // Then, if output is too short, pass to MinerU if ( - result.markdown && - result.markdown.length < 500 && - process.env.LLAMAPARSE_API_KEY + result.markdown && result.markdown.length < 500 && + process.env.MINERU_API_KEY && process.env.MINERU_POD_ID ) { try { - const llamaResult = await scrapePDFWithLlamaParse( + const mineruResult = await scrapePDFWithMinerU( { ...meta, logger: meta.logger.child({ - method: "scrapePDF/scrapePDFWithLlamaParse", + method: "scrapePDF/scrapePDFWithMinerU", }), }, tempFilePath, timeToRun, ); - result = llamaResult; // Use LlamaParse result if successful + result = mineruResult; // Use LlamaParse result if successful } catch (error) { - if (error instanceof Error && error.message === "LlamaParse timed out") { - meta.logger.warn("LlamaParse timed out -- using parse-pdf result", { + if (error instanceof Error && error.message === "MinerU timed out") { + meta.logger.warn("MinerU timed out -- using parse-pdf result", { error, }); } else if (error instanceof RemoveFeatureError) { throw error; } else { meta.logger.warn( - "LlamaParse failed to parse PDF -- using parse-pdf result", + "MinerU failed to parse PDF -- using parse-pdf result", { error }, ); Sentry.captureException(error); @@ -193,7 +191,7 @@ export async function scrapePDF( } } - await fs.unlink(tempFilePath); + await unlink(tempFilePath); return { url: response.url, From b8d7f9f2576b6318259b81003adeb3748bfe35cd Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 27 Dec 2024 19:59:05 -0300 Subject: [PATCH 2/4] Nick: we are using runpod --- .../scraper/scrapeURL/engines/pdf/index.ts | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/apps/api/src/scraper/scrapeURL/engines/pdf/index.ts b/apps/api/src/scraper/scrapeURL/engines/pdf/index.ts index dfc72df0..710228bf 100644 --- a/apps/api/src/scraper/scrapeURL/engines/pdf/index.ts +++ b/apps/api/src/scraper/scrapeURL/engines/pdf/index.ts @@ -13,12 +13,12 @@ import path from "node:path"; type PDFProcessorResult = { html: string; markdown?: string }; -async function scrapePDFWithMinerU( +async function scrapePDFWithRunPodMU( meta: Meta, tempFilePath: string, timeToRun: number | undefined, ): Promise { - meta.logger.debug("Processing PDF document with MinerU", { + meta.logger.debug("Processing PDF document with RunPod MU", { tempFilePath, }); @@ -30,10 +30,10 @@ async function scrapePDFWithMinerU( console.log(tempFilePath); const upload = await robustFetch({ - url: "https://api.runpod.ai/v2/" + process.env.MINERU_POD_ID + "/run", + url: "https://api.runpod.ai/v2/" + process.env.RUNPOD_MU_POD_ID + "/run", method: "POST", headers: { - Authorization: `Bearer ${process.env.MINERU_API_KEY}`, + Authorization: `Bearer ${process.env.RUNPOD_MU_API_KEY}`, }, body: { input: { @@ -42,7 +42,7 @@ async function scrapePDFWithMinerU( }, }, logger: meta.logger.child({ - method: "scrapePDFWithMinerU/upload/robustFetch", + method: "scrapePDFWithRunPodMU/upload/robustFetch", }), schema: z.object({ id: z.string(), @@ -58,13 +58,13 @@ async function scrapePDFWithMinerU( while (Date.now() <= startedAt + timeout) { try { const result = await robustFetch({ - url: `https://api.runpod.ai/v2/${process.env.MINERU_POD_ID}/status/${jobId}`, + url: `https://api.runpod.ai/v2/${process.env.RUNPOD_MU_POD_ID}/status/${jobId}`, method: "GET", headers: { - Authorization: `Bearer ${process.env.MINERU_API_KEY}`, + Authorization: `Bearer ${process.env.RUNPOD_MU_API_KEY}`, }, logger: meta.logger.child({ - method: "scrapePDFWithMinerU/result/robustFetch", + method: "scrapePDFWithRunPodMU/result/robustFetch", }), schema: z.object({ status: z.string(), @@ -83,7 +83,7 @@ async function scrapePDFWithMinerU( } if (result.status === "FAILED") { - throw new Error("MinerU failed to parse PDF: " + result.error!, { cause: result.error }); + throw new Error("RunPod MU failed to parse PDF: " + result.error!, { cause: result.error }); } // result not up yet @@ -96,7 +96,7 @@ async function scrapePDFWithMinerU( // meta.logger.debug("URL is not actually a PDF, signalling..."); // throw new RemoveFeatureError(["pdf"]); // } else { - throw new Error("MinerU threw an error", { + throw new Error("RunPod MU threw an error", { cause: e.cause, }); // } @@ -108,7 +108,7 @@ async function scrapePDFWithMinerU( await new Promise((resolve) => setTimeout(() => resolve(), 250)); } - throw new Error("MinerU timed out"); + throw new Error("RunPod MU timed out"); } async function scrapePDFWithParsePDF( @@ -157,33 +157,33 @@ export async function scrapePDF( tempFilePath, ); - // Then, if output is too short, pass to MinerU + // Then, if output is too short, pass to RunPod MU if ( result.markdown && result.markdown.length < 500 && - process.env.MINERU_API_KEY && process.env.MINERU_POD_ID + process.env.RUNPOD_MU_API_KEY && process.env.RUNPOD_MU_POD_ID ) { try { - const mineruResult = await scrapePDFWithMinerU( + const muResult = await scrapePDFWithRunPodMU( { ...meta, logger: meta.logger.child({ - method: "scrapePDF/scrapePDFWithMinerU", + method: "scrapePDF/scrapePDFWithRunPodMU", }), }, tempFilePath, timeToRun, ); - result = mineruResult; // Use LlamaParse result if successful + result = muResult; // Use LlamaParse result if successful } catch (error) { - if (error instanceof Error && error.message === "MinerU timed out") { - meta.logger.warn("MinerU timed out -- using parse-pdf result", { + if (error instanceof Error && error.message === "RunPod MU timed out") { + meta.logger.warn("RunPod MU timed out -- using parse-pdf result", { error, }); } else if (error instanceof RemoveFeatureError) { throw error; } else { meta.logger.warn( - "MinerU failed to parse PDF -- using parse-pdf result", + "RunPod MU failed to parse PDF -- using parse-pdf result", { error }, ); Sentry.captureException(error); From f9d55efba8dc9830487bc5f8a172c89f60d83824 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 27 Dec 2024 20:54:26 -0300 Subject: [PATCH 3/4] Update index.ts --- .../scraper/scrapeURL/engines/pdf/index.ts | 121 +++++------------- 1 file changed, 35 insertions(+), 86 deletions(-) diff --git a/apps/api/src/scraper/scrapeURL/engines/pdf/index.ts b/apps/api/src/scraper/scrapeURL/engines/pdf/index.ts index 710228bf..fbd836c4 100644 --- a/apps/api/src/scraper/scrapeURL/engines/pdf/index.ts +++ b/apps/api/src/scraper/scrapeURL/engines/pdf/index.ts @@ -13,102 +13,46 @@ import path from "node:path"; type PDFProcessorResult = { html: string; markdown?: string }; +const MAX_FILE_SIZE = 19 * 1024 * 1024; // 19MB + async function scrapePDFWithRunPodMU( meta: Meta, tempFilePath: string, timeToRun: number | undefined, + base64Content: string, ): Promise { meta.logger.debug("Processing PDF document with RunPod MU", { tempFilePath, }); - const fileStat = await stat(tempFilePath); - if (fileStat.size > ((2**10)**2)*10) { - throw new UnsupportedFileError("File is larger than PDF parser limit (10MiB)"); - } - - console.log(tempFilePath); - - const upload = await robustFetch({ - url: "https://api.runpod.ai/v2/" + process.env.RUNPOD_MU_POD_ID + "/run", + + const result = await robustFetch({ + url: + "https://api.runpod.ai/v2/" + process.env.RUNPOD_MU_POD_ID + "/runsync", method: "POST", headers: { Authorization: `Bearer ${process.env.RUNPOD_MU_API_KEY}`, }, body: { input: { - file_content: (await readFile(tempFilePath)).toString("base64"), + file_content: base64Content, filename: path.basename(tempFilePath) + ".pdf", }, }, logger: meta.logger.child({ - method: "scrapePDFWithRunPodMU/upload/robustFetch", + method: "scrapePDFWithRunPodMU/robustFetch", }), schema: z.object({ - id: z.string(), + output: z.object({ + markdown: z.string(), + }), }), }); - const jobId = upload.id; - - // TODO: timeout, retries - const startedAt = Date.now(); - const timeout = timeToRun ?? 300000; - - while (Date.now() <= startedAt + timeout) { - try { - const result = await robustFetch({ - url: `https://api.runpod.ai/v2/${process.env.RUNPOD_MU_POD_ID}/status/${jobId}`, - method: "GET", - headers: { - Authorization: `Bearer ${process.env.RUNPOD_MU_API_KEY}`, - }, - logger: meta.logger.child({ - method: "scrapePDFWithRunPodMU/result/robustFetch", - }), - schema: z.object({ - status: z.string(), - error: z.any().optional(), - output: z.object({ - markdown: z.string(), - }).optional(), - }), - }); - - if (result.status === "COMPLETED") { - return { - markdown: result.output!.markdown, - html: await marked.parse(result.output!.markdown, { async: true }), - }; - } - - if (result.status === "FAILED") { - throw new Error("RunPod MU failed to parse PDF: " + result.error!, { cause: result.error }); - } - - // result not up yet - } catch (e) { - if (e instanceof Error && e.message === "Request sent failure status") { - // if ((e.cause as any).response.status === 404) { - // // no-op, result not up yet - // } else if ((e.cause as any).response.body.includes("PDF_IS_BROKEN")) { - // // URL is not a PDF, actually! - // meta.logger.debug("URL is not actually a PDF, signalling..."); - // throw new RemoveFeatureError(["pdf"]); - // } else { - throw new Error("RunPod MU threw an error", { - cause: e.cause, - }); - // } - } else { - throw e; - } - } - - await new Promise((resolve) => setTimeout(() => resolve(), 250)); - } - - throw new Error("RunPod MU timed out"); + return { + markdown: result.output.markdown, + html: await marked.parse(result.output.markdown, { async: true }), + }; } async function scrapePDFWithParsePDF( @@ -146,21 +90,14 @@ export async function scrapePDF( let result: PDFProcessorResult | null = null; - // First, try parsing with PdfParse - result = await scrapePDFWithParsePDF( - { - ...meta, - logger: meta.logger.child({ - method: "scrapePDF/scrapePDFWithParsePDF", - }), - }, - tempFilePath, - ); + const base64Content = (await readFile(tempFilePath)).toString("base64"); // Then, if output is too short, pass to RunPod MU if ( - result.markdown && result.markdown.length < 500 && - process.env.RUNPOD_MU_API_KEY && process.env.RUNPOD_MU_POD_ID + // result.markdown && result.markdown.length < 500 && + base64Content.length < MAX_FILE_SIZE && + process.env.RUNPOD_MU_API_KEY && + process.env.RUNPOD_MU_POD_ID ) { try { const muResult = await scrapePDFWithRunPodMU( @@ -172,6 +109,7 @@ export async function scrapePDF( }, tempFilePath, timeToRun, + base64Content, ); result = muResult; // Use LlamaParse result if successful } catch (error) { @@ -189,6 +127,17 @@ export async function scrapePDF( Sentry.captureException(error); } } + } else { + // First, try parsing with PdfParse + result = await scrapePDFWithParsePDF( + { + ...meta, + logger: meta.logger.child({ + method: "scrapePDF/scrapePDFWithParsePDF", + }), + }, + tempFilePath, + ); } await unlink(tempFilePath); @@ -197,7 +146,7 @@ export async function scrapePDF( url: response.url, statusCode: response.status, - html: result.html, - markdown: result.markdown, + html: result?.html ?? "", + markdown: result?.markdown ?? "", }; } From 1eca61bffbcc43757c97b9ba4912bc87db07dae2 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 27 Dec 2024 20:59:18 -0300 Subject: [PATCH 4/4] Update index.ts --- .../scraper/scrapeURL/engines/pdf/index.ts | 33 ++++++++----------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/apps/api/src/scraper/scrapeURL/engines/pdf/index.ts b/apps/api/src/scraper/scrapeURL/engines/pdf/index.ts index fbd836c4..0ea1d579 100644 --- a/apps/api/src/scraper/scrapeURL/engines/pdf/index.ts +++ b/apps/api/src/scraper/scrapeURL/engines/pdf/index.ts @@ -8,7 +8,7 @@ import escapeHtml from "escape-html"; import PdfParse from "pdf-parse"; import { downloadFile, fetchFileToBuffer } from "../utils/downloadFile"; import { RemoveFeatureError, UnsupportedFileError } from "../../error"; -import { stat, readFile, unlink } from "node:fs/promises"; +import { readFile, unlink } from "node:fs/promises"; import path from "node:path"; type PDFProcessorResult = { html: string; markdown?: string }; @@ -25,7 +25,6 @@ async function scrapePDFWithRunPodMU( tempFilePath, }); - const result = await robustFetch({ url: "https://api.runpod.ai/v2/" + process.env.RUNPOD_MU_POD_ID + "/runsync", @@ -92,15 +91,14 @@ export async function scrapePDF( const base64Content = (await readFile(tempFilePath)).toString("base64"); - // Then, if output is too short, pass to RunPod MU + // First try RunPod MU if conditions are met if ( - // result.markdown && result.markdown.length < 500 && base64Content.length < MAX_FILE_SIZE && process.env.RUNPOD_MU_API_KEY && process.env.RUNPOD_MU_POD_ID ) { try { - const muResult = await scrapePDFWithRunPodMU( + result = await scrapePDFWithRunPodMU( { ...meta, logger: meta.logger.child({ @@ -111,24 +109,20 @@ export async function scrapePDF( timeToRun, base64Content, ); - result = muResult; // Use LlamaParse result if successful } catch (error) { - if (error instanceof Error && error.message === "RunPod MU timed out") { - meta.logger.warn("RunPod MU timed out -- using parse-pdf result", { - error, - }); - } else if (error instanceof RemoveFeatureError) { + if (error instanceof RemoveFeatureError) { throw error; - } else { - meta.logger.warn( - "RunPod MU failed to parse PDF -- using parse-pdf result", - { error }, - ); - Sentry.captureException(error); } + meta.logger.warn( + "RunPod MU failed to parse PDF -- falling back to parse-pdf", + { error }, + ); + Sentry.captureException(error); } - } else { - // First, try parsing with PdfParse + } + + // If RunPod MU failed or wasn't attempted, use PdfParse + if (!result) { result = await scrapePDFWithParsePDF( { ...meta, @@ -145,7 +139,6 @@ export async function scrapePDF( return { url: response.url, statusCode: response.status, - html: result?.html ?? "", markdown: result?.markdown ?? "", };