wip: moving pdf-extract offthread

This commit is contained in:
Yanlong Wang 2025-04-22 19:39:23 +08:00
parent 029eef6bfc
commit f1fe45fbfe
No known key found for this signature in database
GPG Key ID: C0A623C0BADF9F37
5 changed files with 135 additions and 123 deletions

View File

@ -49,6 +49,8 @@ import { TempFileManager } from '../services/temp-file';
import { MiscService } from '../services/misc'; import { MiscService } from '../services/misc';
import { HTTPServiceError } from 'civkit/http'; import { HTTPServiceError } from 'civkit/http';
import { GeoIPService } from '../services/geoip'; import { GeoIPService } from '../services/geoip';
import { PDFContent, PDFContentCollection } from '../db/pdf';
import { ExtractedPDF, PDFExtractor } from '../services/pdf-extract';
export interface ExtraScrappingOptions extends ScrappingOptions { export interface ExtraScrappingOptions extends ScrappingOptions {
withIframe?: boolean | 'quoted'; withIframe?: boolean | 'quoted';
@ -98,6 +100,8 @@ export class CrawlerHost extends RPCHost {
protected tempFileManager: TempFileManager, protected tempFileManager: TempFileManager,
protected geoIpService: GeoIPService, protected geoIpService: GeoIPService,
protected miscService: MiscService, protected miscService: MiscService,
protected pdfContentCollection: PDFContentCollection,
protected pdfExtractor: PDFExtractor,
) { ) {
super(...arguments); super(...arguments);
@ -1112,35 +1116,135 @@ export class CrawlerHost extends RPCHost {
return this.formatSnapshotWithPDFSideLoad(respondWith, snapshot, presumedURL, urlValidMs, scrappingOptions); return this.formatSnapshotWithPDFSideLoad(respondWith, snapshot, presumedURL, urlValidMs, scrappingOptions);
} }
async cachedPDFExtract(url: string, cacheTolerance: number = 1000 * 3600 * 24, alternativeUrl?: string): Promise<ExtractedPDF | undefined> {
if (!url) {
return undefined;
}
let nameUrl = alternativeUrl || url;
const digest = md5Hasher.hash(nameUrl);
if (url.startsWith('data:')) {
nameUrl = `blob://pdf:${digest}`;
}
const cache: PDFContent | undefined = nameUrl.startsWith('blob:') ? undefined :
(await this.pdfContentCollection.findOne({ urlDigest: digest }, { sort: { createdAt: -1 } }));
if (cache) {
const age = Date.now() - cache?.createdAt.valueOf();
const stale = cache.createdAt.valueOf() < (Date.now() - cacheTolerance);
this.logger.info(`${stale ? 'Stale cache exists' : 'Cache hit'} for PDF ${nameUrl}, normalized digest: ${digest}, ${age}ms old, tolerance ${cacheTolerance}ms`, {
data: url, url: nameUrl, digest, age, stale, cacheTolerance
});
if (!stale) {
if (cache.content && cache.text) {
return {
meta: cache.meta,
content: cache.content,
text: cache.text
};
}
try {
const r = await this.firebaseObjectStorage.downloadFile(`pdfs/${cache._id}`);
let cached = JSON.parse(r.toString('utf-8'));
return {
meta: cached.meta,
content: cached.content,
text: cached.text
};
} catch (err) {
this.logger.warn(`Unable to load cached content for ${nameUrl}`, { err });
return undefined;
}
}
}
let extracted;
try {
extracted = await this.pdfExtractor.extract(url);
} catch (err: any) {
this.logger.warn(`Unable to extract from pdf ${nameUrl}`, { err, url, nameUrl });
throw new AssertionFailureError(`Unable to process ${nameUrl} as pdf: ${err?.message}`);
}
if (!this.threadLocal.ctx.DNT && !nameUrl.startsWith('blob:')) {
const doc = PDFContent.from({
src: nameUrl,
meta: extracted?.meta || {},
urlDigest: digest,
createdAt: new Date(),
expireAt: new Date(Date.now() + this.cacheRetentionMs)
});
await this.firebaseObjectStorage.saveFile(`pdfs/${doc._id}`,
Buffer.from(JSON.stringify(extracted), 'utf-8'), { contentType: 'application/json' });
this.pdfContentCollection.save(
doc
).catch((r) => {
this.logger.warn(`Unable to cache PDF content for ${nameUrl}`, { err: r });
});
}
return extracted;
}
async formatSnapshotWithPDFSideLoad(mode: string, snapshot: PageSnapshot, nominalUrl?: URL, urlValidMs?: number, scrappingOptions?: ScrappingOptions) { async formatSnapshotWithPDFSideLoad(mode: string, snapshot: PageSnapshot, nominalUrl?: URL, urlValidMs?: number, scrappingOptions?: ScrappingOptions) {
const snapshotCopy = _.cloneDeep(snapshot); const snapshotCopy = _.cloneDeep(snapshot);
if (snapshotCopy.pdfs?.length) { if (snapshotCopy.pdfs?.length) {
// in case of Google Web Cache content
const pdfUrl = snapshotCopy.pdfs[0]; const pdfUrl = snapshotCopy.pdfs[0];
let filePdfUrl = pdfUrl;
if (pdfUrl.startsWith('http')) { if (pdfUrl.startsWith('http')) {
const sideLoaded = scrappingOptions?.sideLoad?.impersonate[pdfUrl]; do {
if (sideLoaded?.status === 200 && sideLoaded.body) { const sideLoaded = scrappingOptions?.sideLoad?.impersonate[pdfUrl];
snapshotCopy.pdfs[0] = pathToFileURL(await sideLoaded?.body.filePath).href; if (sideLoaded?.status === 200 && sideLoaded.body) {
return this.snapshotFormatter.formatSnapshot(mode, snapshotCopy, nominalUrl, urlValidMs); filePdfUrl = pathToFileURL(await sideLoaded?.body.filePath).href;
} break;
const r = await this.curlControl.sideLoad(new URL(pdfUrl), scrappingOptions).catch((err) => {
if (err instanceof ServiceBadAttemptError) {
return Promise.reject(new AssertionFailureError(`Failed to load PDF(${pdfUrl}): ${err.message}`));
} }
return Promise.reject(err); const r = await this.curlControl.sideLoad(new URL(pdfUrl), scrappingOptions).catch((err) => {
}); if (err instanceof ServiceBadAttemptError) {
if (r.status !== 200) { return Promise.reject(new AssertionFailureError(`Failed to load PDF(${pdfUrl}): ${err.message}`));
throw new AssertionFailureError(`Failed to load PDF(${pdfUrl}): Server responded status ${r.status}`); }
}
if (!r.contentType.includes('application/pdf')) { return Promise.reject(err);
throw new AssertionFailureError(`Failed to load PDF(${pdfUrl}): Server responded with wrong content type ${r.contentType}`); });
} if (r.status !== 200) {
if (!r.file) { throw new AssertionFailureError(`Failed to load PDF(${pdfUrl}): Server responded status ${r.status}`);
throw new AssertionFailureError(`Failed to load PDF(${pdfUrl}): Server did not return a body`); }
} if (!r.contentType.includes('application/pdf')) {
snapshotCopy.pdfs[0] = pathToFileURL(await r.file.filePath).href; throw new AssertionFailureError(`Failed to load PDF(${pdfUrl}): Server responded with wrong content type ${r.contentType}`);
}
if (!r.file) {
throw new AssertionFailureError(`Failed to load PDF(${pdfUrl}): Server did not return a body`);
}
filePdfUrl = pathToFileURL(await r.file.filePath).href;
} while (false);
}
const pdf = await this.cachedPDFExtract(filePdfUrl,
this.threadLocal.get('cacheTolerance'),
pdfUrl
);
if (pdf) {
snapshot.title = pdf.meta?.Title;
snapshot.text = pdf.text || snapshot.text;
snapshot.parsed = {
content: pdf.content,
textContent: pdf.content,
length: pdf.content?.length,
byline: pdf.meta?.Author,
lang: pdf.meta?.Language || undefined,
title: pdf.meta?.Title,
publishedTime: this.pdfExtractor.parsePdfDate(pdf.meta?.ModDate || pdf.meta?.CreationDate)?.toISOString(),
};
snapshotCopy.traits ??= [];
snapshotCopy.traits.push('pdf');
} }
} }

View File

@ -45,6 +45,13 @@ function isRotatedByAtLeast35Degrees(transform?: [number, number, number, number
return rotationAngle1 >= 35 || rotationAngle2 >= 35; return rotationAngle1 >= 35 || rotationAngle2 >= 35;
} }
export interface ExtractedPDF {
meta?: Record<string, any>;
content: string;
text: string;
}
@singleton() @singleton()
export class PDFExtractor extends AsyncService { export class PDFExtractor extends AsyncService {
@ -270,85 +277,10 @@ export class PDFExtractor extends AsyncService {
mdChunks[0] = mdChunks[0].trimStart(); mdChunks[0] = mdChunks[0].trimStart();
} }
return { meta: meta.info as Record<string, any>, content: mdChunks.join(''), text: rawChunks.join('') }; return { meta: meta.info as Record<string, any>, content: mdChunks.join(''), text: rawChunks.join('') } as ExtractedPDF;
} }
async cachedExtract(url: string, cacheTolerance: number = 1000 * 3600 * 24, alternativeUrl?: string) {
if (!url) {
return undefined;
}
let nameUrl = alternativeUrl || url;
const digest = md5Hasher.hash(nameUrl);
if (this.isDataUrl(url)) {
nameUrl = `blob://pdf:${digest}`;
}
const cache: PDFContent | undefined = nameUrl.startsWith('blob:') ? undefined :
(await PDFContent.fromFirestoreQuery(PDFContent.COLLECTION.where('urlDigest', '==', digest).orderBy('createdAt', 'desc').limit(1)))?.[0];
if (cache) {
const age = Date.now() - cache?.createdAt.valueOf();
const stale = cache.createdAt.valueOf() < (Date.now() - cacheTolerance);
this.logger.info(`${stale ? 'Stale cache exists' : 'Cache hit'} for PDF ${nameUrl}, normalized digest: ${digest}, ${age}ms old, tolerance ${cacheTolerance}ms`, {
data: url, url: nameUrl, digest, age, stale, cacheTolerance
});
if (!stale) {
if (cache.content && cache.text) {
return {
meta: cache.meta,
content: cache.content,
text: cache.text
};
}
try {
const r = await this.firebaseObjectStorage.downloadFile(`pdfs/${cache._id}`);
let cached = JSON.parse(r.toString('utf-8'));
return {
meta: cached.meta,
content: cached.content,
text: cached.text
};
} catch (err) {
this.logger.warn(`Unable to load cached content for ${nameUrl}`, { err });
return undefined;
}
}
}
let extracted;
try {
extracted = await this.extract(url);
} catch (err: any) {
this.logger.warn(`Unable to extract from pdf ${nameUrl}`, { err, url, nameUrl });
throw new AssertionFailureError(`Unable to process ${nameUrl} as pdf: ${err?.message}`);
}
if (!this.asyncLocalContext.ctx.DNT && !nameUrl.startsWith('blob:')) {
const theID = randomUUID();
await this.firebaseObjectStorage.saveFile(`pdfs/${theID}`,
Buffer.from(JSON.stringify(extracted), 'utf-8'), { contentType: 'application/json' });
PDFContent.save(
PDFContent.from({
_id: theID,
src: nameUrl,
meta: extracted?.meta || {},
urlDigest: digest,
createdAt: new Date(),
expireAt: new Date(Date.now() + this.cacheRetentionMs)
}).degradeForFireStore()
).catch((r) => {
this.logger.warn(`Unable to cache PDF content for ${nameUrl}`, { err: r });
});
}
return extracted;
}
parsePdfDate(pdfDate: string | undefined) { parsePdfDate(pdfDate: string | undefined) {
if (!pdfDate) { if (!pdfDate) {

View File

@ -72,6 +72,7 @@ export interface PageSnapshot {
lastMutationIdle?: number; lastMutationIdle?: number;
lastContentResourceLoaded?: number; lastContentResourceLoaded?: number;
lastMediaResourceLoaded?: number; lastMediaResourceLoaded?: number;
traits?: string[];
} }
export interface ExtendedSnapshot extends PageSnapshot { export interface ExtendedSnapshot extends PageSnapshot {

View File

@ -1,4 +1,3 @@
import { singleton } from 'tsyringe'; import { singleton } from 'tsyringe';
import { GlobalLogger } from '../logger'; import { GlobalLogger } from '../logger';
import { SecretExposer } from '../../shared/services/secrets'; import { SecretExposer } from '../../shared/services/secrets';

View File

@ -84,7 +84,6 @@ export class SnapshotFormatter extends AsyncService {
protected globalLogger: GlobalLogger, protected globalLogger: GlobalLogger,
protected jsdomControl: JSDomControl, protected jsdomControl: JSDomControl,
protected altTextService: AltTextService, protected altTextService: AltTextService,
protected pdfExtractor: PDFExtractor,
protected threadLocal: AsyncContext, protected threadLocal: AsyncContext,
protected firebaseObjectStorage: FirebaseStorageBucketControl, protected firebaseObjectStorage: FirebaseStorageBucketControl,
) { ) {
@ -151,29 +150,6 @@ export class SnapshotFormatter extends AsyncService {
Object.defineProperty(f, 'textRepresentation', { value: snapshot.html, enumerable: false, configurable: true }); Object.defineProperty(f, 'textRepresentation', { value: snapshot.html, enumerable: false, configurable: true });
} }
let pdfMode = false;
// in case of Google Web Cache content
if (snapshot.pdfs?.length && (!snapshot.title || snapshot.title.startsWith('cache:'))) {
const pdf = await this.pdfExtractor.cachedExtract(snapshot.pdfs[0],
this.threadLocal.get('cacheTolerance'),
snapshot.pdfs[0].startsWith('http') ? undefined : snapshot.href,
);
if (pdf) {
pdfMode = true;
snapshot.title = pdf.meta?.Title;
snapshot.text = pdf.text || snapshot.text;
snapshot.parsed = {
content: pdf.content,
textContent: pdf.content,
length: pdf.content?.length,
byline: pdf.meta?.Author,
lang: pdf.meta?.Language || undefined,
title: pdf.meta?.Title,
publishedTime: this.pdfExtractor.parsePdfDate(pdf.meta?.ModDate || pdf.meta?.CreationDate)?.toISOString(),
};
}
}
if (mode.includes('text')) { if (mode.includes('text')) {
modeOK = true; modeOK = true;
Object.assign(f, { Object.assign(f, {
@ -213,7 +189,7 @@ export class SnapshotFormatter extends AsyncService {
const uid = this.threadLocal.get('uid'); const uid = this.threadLocal.get('uid');
do { do {
if (pdfMode) { if (snapshot.traits?.includes('pdf')) {
contentText = (snapshot.parsed?.content || snapshot.text || '').trim(); contentText = (snapshot.parsed?.content || snapshot.text || '').trim();
break; break;
} }