From f1fe45fbfeb94e7a7ce45c74c1835161be2dfd29 Mon Sep 17 00:00:00 2001 From: Yanlong Wang Date: Tue, 22 Apr 2025 19:39:23 +0800 Subject: [PATCH] wip: moving pdf-extract offthread --- src/api/crawler.ts | 146 ++++++++++++++++++++++++----- src/services/pdf-extract.ts | 84 ++--------------- src/services/puppeteer.ts | 1 + src/services/serp/internal.ts | 1 - src/services/snapshot-formatter.ts | 26 +---- 5 files changed, 135 insertions(+), 123 deletions(-) diff --git a/src/api/crawler.ts b/src/api/crawler.ts index 6b6b8cf..608ef2a 100644 --- a/src/api/crawler.ts +++ b/src/api/crawler.ts @@ -49,6 +49,8 @@ import { TempFileManager } from '../services/temp-file'; import { MiscService } from '../services/misc'; import { HTTPServiceError } from 'civkit/http'; import { GeoIPService } from '../services/geoip'; +import { PDFContent, PDFContentCollection } from '../db/pdf'; +import { ExtractedPDF, PDFExtractor } from '../services/pdf-extract'; export interface ExtraScrappingOptions extends ScrappingOptions { withIframe?: boolean | 'quoted'; @@ -98,6 +100,8 @@ export class CrawlerHost extends RPCHost { protected tempFileManager: TempFileManager, protected geoIpService: GeoIPService, protected miscService: MiscService, + protected pdfContentCollection: PDFContentCollection, + protected pdfExtractor: PDFExtractor, ) { super(...arguments); @@ -1112,35 +1116,135 @@ export class CrawlerHost extends RPCHost { return this.formatSnapshotWithPDFSideLoad(respondWith, snapshot, presumedURL, urlValidMs, scrappingOptions); } + async cachedPDFExtract(url: string, cacheTolerance: number = 1000 * 3600 * 24, alternativeUrl?: string): Promise { + if (!url) { + return undefined; + } + let nameUrl = alternativeUrl || url; + const digest = md5Hasher.hash(nameUrl); + + if (url.startsWith('data:')) { + nameUrl = `blob://pdf:${digest}`; + } + + const cache: PDFContent | undefined = nameUrl.startsWith('blob:') ? undefined : + (await this.pdfContentCollection.findOne({ urlDigest: digest }, { sort: { createdAt: -1 } })); + + if (cache) { + const age = Date.now() - cache?.createdAt.valueOf(); + const stale = cache.createdAt.valueOf() < (Date.now() - cacheTolerance); + this.logger.info(`${stale ? 'Stale cache exists' : 'Cache hit'} for PDF ${nameUrl}, normalized digest: ${digest}, ${age}ms old, tolerance ${cacheTolerance}ms`, { + data: url, url: nameUrl, digest, age, stale, cacheTolerance + }); + + if (!stale) { + if (cache.content && cache.text) { + return { + meta: cache.meta, + content: cache.content, + text: cache.text + }; + } + + try { + const r = await this.firebaseObjectStorage.downloadFile(`pdfs/${cache._id}`); + let cached = JSON.parse(r.toString('utf-8')); + + return { + meta: cached.meta, + content: cached.content, + text: cached.text + }; + } catch (err) { + this.logger.warn(`Unable to load cached content for ${nameUrl}`, { err }); + + return undefined; + } + } + } + + let extracted; + + try { + extracted = await this.pdfExtractor.extract(url); + } catch (err: any) { + this.logger.warn(`Unable to extract from pdf ${nameUrl}`, { err, url, nameUrl }); + throw new AssertionFailureError(`Unable to process ${nameUrl} as pdf: ${err?.message}`); + } + + if (!this.threadLocal.ctx.DNT && !nameUrl.startsWith('blob:')) { + const doc = PDFContent.from({ + src: nameUrl, + meta: extracted?.meta || {}, + urlDigest: digest, + createdAt: new Date(), + expireAt: new Date(Date.now() + this.cacheRetentionMs) + }); + await this.firebaseObjectStorage.saveFile(`pdfs/${doc._id}`, + Buffer.from(JSON.stringify(extracted), 'utf-8'), { contentType: 'application/json' }); + this.pdfContentCollection.save( + doc + ).catch((r) => { + this.logger.warn(`Unable to cache PDF content for ${nameUrl}`, { err: r }); + }); + } + + return extracted; + } + async formatSnapshotWithPDFSideLoad(mode: string, snapshot: PageSnapshot, nominalUrl?: URL, urlValidMs?: number, scrappingOptions?: ScrappingOptions) { const snapshotCopy = _.cloneDeep(snapshot); if (snapshotCopy.pdfs?.length) { + // in case of Google Web Cache content const pdfUrl = snapshotCopy.pdfs[0]; + let filePdfUrl = pdfUrl; if (pdfUrl.startsWith('http')) { - const sideLoaded = scrappingOptions?.sideLoad?.impersonate[pdfUrl]; - if (sideLoaded?.status === 200 && sideLoaded.body) { - snapshotCopy.pdfs[0] = pathToFileURL(await sideLoaded?.body.filePath).href; - return this.snapshotFormatter.formatSnapshot(mode, snapshotCopy, nominalUrl, urlValidMs); - } - - const r = await this.curlControl.sideLoad(new URL(pdfUrl), scrappingOptions).catch((err) => { - if (err instanceof ServiceBadAttemptError) { - return Promise.reject(new AssertionFailureError(`Failed to load PDF(${pdfUrl}): ${err.message}`)); + do { + const sideLoaded = scrappingOptions?.sideLoad?.impersonate[pdfUrl]; + if (sideLoaded?.status === 200 && sideLoaded.body) { + filePdfUrl = pathToFileURL(await sideLoaded?.body.filePath).href; + break; } - return Promise.reject(err); - }); - if (r.status !== 200) { - throw new AssertionFailureError(`Failed to load PDF(${pdfUrl}): Server responded status ${r.status}`); - } - if (!r.contentType.includes('application/pdf')) { - throw new AssertionFailureError(`Failed to load PDF(${pdfUrl}): Server responded with wrong content type ${r.contentType}`); - } - if (!r.file) { - throw new AssertionFailureError(`Failed to load PDF(${pdfUrl}): Server did not return a body`); - } - snapshotCopy.pdfs[0] = pathToFileURL(await r.file.filePath).href; + const r = await this.curlControl.sideLoad(new URL(pdfUrl), scrappingOptions).catch((err) => { + if (err instanceof ServiceBadAttemptError) { + return Promise.reject(new AssertionFailureError(`Failed to load PDF(${pdfUrl}): ${err.message}`)); + } + + return Promise.reject(err); + }); + if (r.status !== 200) { + throw new AssertionFailureError(`Failed to load PDF(${pdfUrl}): Server responded status ${r.status}`); + } + if (!r.contentType.includes('application/pdf')) { + throw new AssertionFailureError(`Failed to load PDF(${pdfUrl}): Server responded with wrong content type ${r.contentType}`); + } + if (!r.file) { + throw new AssertionFailureError(`Failed to load PDF(${pdfUrl}): Server did not return a body`); + } + filePdfUrl = pathToFileURL(await r.file.filePath).href; + } while (false); + } + const pdf = await this.cachedPDFExtract(filePdfUrl, + this.threadLocal.get('cacheTolerance'), + pdfUrl + ); + if (pdf) { + snapshot.title = pdf.meta?.Title; + snapshot.text = pdf.text || snapshot.text; + snapshot.parsed = { + content: pdf.content, + textContent: pdf.content, + length: pdf.content?.length, + byline: pdf.meta?.Author, + lang: pdf.meta?.Language || undefined, + title: pdf.meta?.Title, + publishedTime: this.pdfExtractor.parsePdfDate(pdf.meta?.ModDate || pdf.meta?.CreationDate)?.toISOString(), + }; + + snapshotCopy.traits ??= []; + snapshotCopy.traits.push('pdf'); } } diff --git a/src/services/pdf-extract.ts b/src/services/pdf-extract.ts index e1bbfaa..a818971 100644 --- a/src/services/pdf-extract.ts +++ b/src/services/pdf-extract.ts @@ -45,6 +45,13 @@ function isRotatedByAtLeast35Degrees(transform?: [number, number, number, number return rotationAngle1 >= 35 || rotationAngle2 >= 35; } + +export interface ExtractedPDF { + meta?: Record; + content: string; + text: string; +} + @singleton() export class PDFExtractor extends AsyncService { @@ -270,85 +277,10 @@ export class PDFExtractor extends AsyncService { mdChunks[0] = mdChunks[0].trimStart(); } - return { meta: meta.info as Record, content: mdChunks.join(''), text: rawChunks.join('') }; + return { meta: meta.info as Record, content: mdChunks.join(''), text: rawChunks.join('') } as ExtractedPDF; } - async cachedExtract(url: string, cacheTolerance: number = 1000 * 3600 * 24, alternativeUrl?: string) { - if (!url) { - return undefined; - } - let nameUrl = alternativeUrl || url; - const digest = md5Hasher.hash(nameUrl); - if (this.isDataUrl(url)) { - nameUrl = `blob://pdf:${digest}`; - } - - const cache: PDFContent | undefined = nameUrl.startsWith('blob:') ? undefined : - (await PDFContent.fromFirestoreQuery(PDFContent.COLLECTION.where('urlDigest', '==', digest).orderBy('createdAt', 'desc').limit(1)))?.[0]; - - if (cache) { - const age = Date.now() - cache?.createdAt.valueOf(); - const stale = cache.createdAt.valueOf() < (Date.now() - cacheTolerance); - this.logger.info(`${stale ? 'Stale cache exists' : 'Cache hit'} for PDF ${nameUrl}, normalized digest: ${digest}, ${age}ms old, tolerance ${cacheTolerance}ms`, { - data: url, url: nameUrl, digest, age, stale, cacheTolerance - }); - - if (!stale) { - if (cache.content && cache.text) { - return { - meta: cache.meta, - content: cache.content, - text: cache.text - }; - } - - try { - const r = await this.firebaseObjectStorage.downloadFile(`pdfs/${cache._id}`); - let cached = JSON.parse(r.toString('utf-8')); - - return { - meta: cached.meta, - content: cached.content, - text: cached.text - }; - } catch (err) { - this.logger.warn(`Unable to load cached content for ${nameUrl}`, { err }); - - return undefined; - } - } - } - - let extracted; - - try { - extracted = await this.extract(url); - } catch (err: any) { - this.logger.warn(`Unable to extract from pdf ${nameUrl}`, { err, url, nameUrl }); - throw new AssertionFailureError(`Unable to process ${nameUrl} as pdf: ${err?.message}`); - } - - if (!this.asyncLocalContext.ctx.DNT && !nameUrl.startsWith('blob:')) { - const theID = randomUUID(); - await this.firebaseObjectStorage.saveFile(`pdfs/${theID}`, - Buffer.from(JSON.stringify(extracted), 'utf-8'), { contentType: 'application/json' }); - PDFContent.save( - PDFContent.from({ - _id: theID, - src: nameUrl, - meta: extracted?.meta || {}, - urlDigest: digest, - createdAt: new Date(), - expireAt: new Date(Date.now() + this.cacheRetentionMs) - }).degradeForFireStore() - ).catch((r) => { - this.logger.warn(`Unable to cache PDF content for ${nameUrl}`, { err: r }); - }); - } - - return extracted; - } parsePdfDate(pdfDate: string | undefined) { if (!pdfDate) { diff --git a/src/services/puppeteer.ts b/src/services/puppeteer.ts index 0e8d8a0..a0958e2 100644 --- a/src/services/puppeteer.ts +++ b/src/services/puppeteer.ts @@ -72,6 +72,7 @@ export interface PageSnapshot { lastMutationIdle?: number; lastContentResourceLoaded?: number; lastMediaResourceLoaded?: number; + traits?: string[]; } export interface ExtendedSnapshot extends PageSnapshot { diff --git a/src/services/serp/internal.ts b/src/services/serp/internal.ts index 1303369..03c1ad4 100644 --- a/src/services/serp/internal.ts +++ b/src/services/serp/internal.ts @@ -1,4 +1,3 @@ - import { singleton } from 'tsyringe'; import { GlobalLogger } from '../logger'; import { SecretExposer } from '../../shared/services/secrets'; diff --git a/src/services/snapshot-formatter.ts b/src/services/snapshot-formatter.ts index 3064a88..98b5cac 100644 --- a/src/services/snapshot-formatter.ts +++ b/src/services/snapshot-formatter.ts @@ -84,7 +84,6 @@ export class SnapshotFormatter extends AsyncService { protected globalLogger: GlobalLogger, protected jsdomControl: JSDomControl, protected altTextService: AltTextService, - protected pdfExtractor: PDFExtractor, protected threadLocal: AsyncContext, protected firebaseObjectStorage: FirebaseStorageBucketControl, ) { @@ -151,29 +150,6 @@ export class SnapshotFormatter extends AsyncService { Object.defineProperty(f, 'textRepresentation', { value: snapshot.html, enumerable: false, configurable: true }); } - let pdfMode = false; - // in case of Google Web Cache content - if (snapshot.pdfs?.length && (!snapshot.title || snapshot.title.startsWith('cache:'))) { - const pdf = await this.pdfExtractor.cachedExtract(snapshot.pdfs[0], - this.threadLocal.get('cacheTolerance'), - snapshot.pdfs[0].startsWith('http') ? undefined : snapshot.href, - ); - if (pdf) { - pdfMode = true; - snapshot.title = pdf.meta?.Title; - snapshot.text = pdf.text || snapshot.text; - snapshot.parsed = { - content: pdf.content, - textContent: pdf.content, - length: pdf.content?.length, - byline: pdf.meta?.Author, - lang: pdf.meta?.Language || undefined, - title: pdf.meta?.Title, - publishedTime: this.pdfExtractor.parsePdfDate(pdf.meta?.ModDate || pdf.meta?.CreationDate)?.toISOString(), - }; - } - } - if (mode.includes('text')) { modeOK = true; Object.assign(f, { @@ -213,7 +189,7 @@ export class SnapshotFormatter extends AsyncService { const uid = this.threadLocal.get('uid'); do { - if (pdfMode) { + if (snapshot.traits?.includes('pdf')) { contentText = (snapshot.parsed?.content || snapshot.text || '').trim(); break; }