fix: bulk fix multiple issues

This commit is contained in:
Yanlong Wang 2024-05-31 17:30:57 +08:00
parent 62ccacfe13
commit 9ac40606d5
No known key found for this signature in database
GPG Key ID: C0A623C0BADF9F37
8 changed files with 162 additions and 77 deletions

View File

@ -19,7 +19,7 @@ import { randomUUID } from 'crypto';
import { JinaEmbeddingsAuthDTO } from '../shared/dto/jina-embeddings-auth'; import { JinaEmbeddingsAuthDTO } from '../shared/dto/jina-embeddings-auth';
import { countGPTToken as estimateToken } from '../shared/utils/openai'; import { countGPTToken as estimateToken } from '../shared/utils/openai';
import { CrawlerOptions } from '../dto/scrapping-options'; import { CrawlerOptions, CrawlerOptionsHeaderOnly } from '../dto/scrapping-options';
import { JinaEmbeddingsTokenAccount } from '../shared/db/jina-embeddings-token-account'; import { JinaEmbeddingsTokenAccount } from '../shared/db/jina-embeddings-token-account';
import { PDFExtractor } from '../services/pdf-extract'; import { PDFExtractor } from '../services/pdf-extract';
@ -230,7 +230,9 @@ export class CrawlerHost extends RPCHost {
let pdfMode = false; let pdfMode = false;
if (snapshot.pdfs?.length && !snapshot.title) { if (snapshot.pdfs?.length && !snapshot.title) {
const pdf = await this.pdfExtractor.cachedExtract(snapshot.pdfs[0]); const pdf = await this.pdfExtractor.cachedExtract(snapshot.pdfs[0],
this.threadLocal.get('cacheTolerance')
);
if (pdf) { if (pdf) {
pdfMode = true; pdfMode = true;
snapshot.title = pdf.meta?.Title; snapshot.title = pdf.meta?.Title;
@ -432,7 +434,7 @@ ${suffixMixins.length ? `\n${suffixMixins.join('\n\n')}\n` : ''}`;
runtime: { runtime: {
memory: '4GiB', memory: '4GiB',
timeoutSeconds: 300, timeoutSeconds: 300,
concurrency: 4, concurrency: 22,
}, },
tags: ['Crawler'], tags: ['Crawler'],
httpMethod: ['get', 'post'], httpMethod: ['get', 'post'],
@ -442,9 +444,9 @@ ${suffixMixins.length ? `\n${suffixMixins.join('\n\n')}\n` : ''}`;
@CloudHTTPv2({ @CloudHTTPv2({
runtime: { runtime: {
memory: '4GiB', memory: '4GiB',
cpu: 2, cpu: 4,
timeoutSeconds: 300, timeoutSeconds: 300,
concurrency: 11, concurrency: 22,
maxInstances: 455, maxInstances: 455,
}, },
openapi: { openapi: {
@ -543,11 +545,11 @@ ${suffixMixins.length ? `\n${suffixMixins.join('\n\n')}\n` : ''}`;
res: Response, res: Response,
}, },
auth: JinaEmbeddingsAuthDTO, auth: JinaEmbeddingsAuthDTO,
crawlerOptions: CrawlerOptions, crawlerOptions: CrawlerOptionsHeaderOnly,
) { ) {
const uid = await auth.solveUID(); const uid = await auth.solveUID();
let chargeAmount = 0; let chargeAmount = 0;
const noSlashURL = ctx.req.url.slice(1).trimStart(); const noSlashURL = ctx.req.url.slice(1);
if (!noSlashURL) { if (!noSlashURL) {
const latestUser = uid ? await auth.assertUser() : undefined; const latestUser = uid ? await auth.assertUser() : undefined;
if (!ctx.req.accepts('text/plain') && (ctx.req.accepts('text/json') || ctx.req.accepts('application/json'))) { if (!ctx.req.accepts('text/plain') && (ctx.req.accepts('text/json') || ctx.req.accepts('application/json'))) {
@ -911,6 +913,7 @@ ${suffixMixins.length ? `\n${suffixMixins.join('\n\n')}\n` : ''}`;
this.threadLocal.set('withGeneratedAlt', opts.withGeneratedAlt); this.threadLocal.set('withGeneratedAlt', opts.withGeneratedAlt);
this.threadLocal.set('withLinksSummary', opts.withLinksSummary); this.threadLocal.set('withLinksSummary', opts.withLinksSummary);
this.threadLocal.set('withImagesSummary', opts.withImagesSummary); this.threadLocal.set('withImagesSummary', opts.withImagesSummary);
this.threadLocal.set('cacheTolerance', opts.cacheTolerance);
const crawlOpts: ExtraScrappingOptions = { const crawlOpts: ExtraScrappingOptions = {
proxyUrl: opts.proxyUrl, proxyUrl: opts.proxyUrl,

View File

@ -118,6 +118,20 @@ export class DataCrunchingHost extends RPCHost {
}, },
tags: ['DataCrunching'], tags: ['DataCrunching'],
}) })
async dispatchPageCacheCrunching() {
for await (const { fileName, date, offset } of this.iterPageCacheChunks()) {
this.logger.info(`Dispatching ${fileName}...`);
// sse.write({ data: `Dispatching ${fileName}...` });
await getFunctions().taskQueue('crunchPageCacheWorker').enqueue({ date, offset }, {
dispatchDeadlineSeconds: 1800,
uri: await getFunctionUrl('crunchPageCacheWorker'),
});
}
return true;
}
// @CloudHTTPv2({ // @CloudHTTPv2({
// runtime: { // runtime: {
// cpu: 2, // cpu: 2,
@ -128,29 +142,28 @@ export class DataCrunchingHost extends RPCHost {
// }, // },
// tags: ['DataCrunching'], // tags: ['DataCrunching'],
// }) // })
async dispatchPageCacheCrunching( // async dispatchPageCacheCrunching(
@RPCReflect() rpcReflect: RPCReflection, // @RPCReflect() rpcReflect: RPCReflection
) { // ) {
const sse = new OutputServerEventStream({ highWaterMark: 4096 }); // const sse = new OutputServerEventStream({ highWaterMark: 4096 });
rpcReflect.return(sse); // rpcReflect.return(sse);
rpcReflect.catch((err) => { // rpcReflect.catch((err) => {
sse.end({ data: `Error: ${err.message}` }); // sse.end({ data: `Error: ${err.message}` });
}); // });
for await (const { fileName, date, offset } of this.iterPageCacheChunks()) { // for await (const { fileName, date, offset } of this.iterPageCacheChunks()) {
this.logger.info(`Dispatching ${fileName}...`); // this.logger.info(`Dispatching ${fileName}...`);
sse.write({ data: `Dispatching ${fileName}...` }); // sse.write({ data: `Dispatching ${fileName}...` });
await getFunctions().taskQueue('crunchPageCacheWorker').enqueue({ date, offset }, { // await getFunctions().taskQueue('crunchPageCacheWorker').enqueue({ date, offset }, {
dispatchDeadlineSeconds: 1800, // dispatchDeadlineSeconds: 1800,
uri: await getFunctionUrl('crunchPageCacheWorker'), // uri: await getFunctionUrl('crunchPageCacheWorker'),
}); // });
} // }
sse.end({ data: 'done' }); // sse.end({ data: 'done' });
sse.resume();
return true; // return true;
} // }
async* iterPageCacheRecords(date?: string, inputOffset?: number | string) { async* iterPageCacheRecords(date?: string, inputOffset?: number | string) {
const startOfToday = dayjs().utc().startOf('day'); const startOfToday = dayjs().utc().startOf('day');
@ -234,8 +247,6 @@ export class DataCrunchingHost extends RPCHost {
if (nRecords) { if (nRecords) {
yield { fileName, date: theDay.toISOString(), offset }; yield { fileName, date: theDay.toISOString(), offset };
} }
continue;
} }
} }

View File

@ -53,6 +53,7 @@ export class SearcherHost extends RPCHost {
@CloudHTTPv2({ @CloudHTTPv2({
name: 'search2', name: 'search2',
runtime: { runtime: {
cpu: 4,
memory: '4GiB', memory: '4GiB',
timeoutSeconds: 300, timeoutSeconds: 300,
concurrency: 4, concurrency: 4,
@ -64,10 +65,10 @@ export class SearcherHost extends RPCHost {
}) })
@CloudHTTPv2({ @CloudHTTPv2({
runtime: { runtime: {
cpu: 4, cpu: 8,
memory: '8GiB', memory: '8GiB',
timeoutSeconds: 300, timeoutSeconds: 300,
concurrency: 4, concurrency: 6,
maxInstances: 200, maxInstances: 200,
}, },
openapi: { openapi: {
@ -265,7 +266,12 @@ export class SearcherHost extends RPCHost {
let lastScrapped: any[] | undefined; let lastScrapped: any[] | undefined;
let earlyReturn = false; let earlyReturn = false;
if (!ctx.req.accepts('text/plain') && (ctx.req.accepts('text/json') || ctx.req.accepts('application/json'))) { if (!ctx.req.accepts('text/plain') && (ctx.req.accepts('text/json') || ctx.req.accepts('application/json'))) {
const earlyReturnTimer = setTimeout(() => { let earlyReturnTimer: ReturnType<typeof setTimeout> | undefined;
const setEarlyReturnTimer = () => {
if (earlyReturnTimer) {
return;
}
earlyReturnTimer = setTimeout(() => {
if (!lastScrapped) { if (!lastScrapped) {
return; return;
} }
@ -273,20 +279,27 @@ export class SearcherHost extends RPCHost {
rpcReflect.return(lastScrapped); rpcReflect.return(lastScrapped);
earlyReturn = true; earlyReturn = true;
}, this.reasonableDelayMs); }, this.reasonableDelayMs);
};
for await (const scrapped of it) { for await (const scrapped of it) {
lastScrapped = scrapped; lastScrapped = scrapped;
if (_.some(scrapped, (x) => this.pageQualified(x))) {
setEarlyReturnTimer();
}
if (!this.searchResultsQualified(scrapped)) { if (!this.searchResultsQualified(scrapped)) {
continue; continue;
} }
if (earlyReturnTimer) {
clearTimeout(earlyReturnTimer); clearTimeout(earlyReturnTimer);
}
chargeAmount = this.getChargeAmount(scrapped); chargeAmount = this.getChargeAmount(scrapped);
return scrapped; return scrapped;
} }
if (earlyReturnTimer) {
clearTimeout(earlyReturnTimer); clearTimeout(earlyReturnTimer);
}
if (!lastScrapped) { if (!lastScrapped) {
throw new AssertionFailureError(`No content available for query ${searchQuery}`); throw new AssertionFailureError(`No content available for query ${searchQuery}`);
@ -299,7 +312,12 @@ export class SearcherHost extends RPCHost {
return lastScrapped; return lastScrapped;
} }
const earlyReturnTimer = setTimeout(() => { let earlyReturnTimer: ReturnType<typeof setTimeout> | undefined;
const setEarlyReturnTimer = () => {
if (earlyReturnTimer) {
return;
}
earlyReturnTimer = setTimeout(() => {
if (!lastScrapped) { if (!lastScrapped) {
return; return;
} }
@ -307,21 +325,31 @@ export class SearcherHost extends RPCHost {
rpcReflect.return(assignTransferProtocolMeta(`${lastScrapped}`, { contentType: 'text/plain', envelope: null })); rpcReflect.return(assignTransferProtocolMeta(`${lastScrapped}`, { contentType: 'text/plain', envelope: null }));
earlyReturn = true; earlyReturn = true;
}, this.reasonableDelayMs); }, this.reasonableDelayMs);
};
for await (const scrapped of it) { for await (const scrapped of it) {
lastScrapped = scrapped; lastScrapped = scrapped;
if (_.some(scrapped, (x) => this.pageQualified(x))) {
setEarlyReturnTimer();
}
if (!this.searchResultsQualified(scrapped)) { if (!this.searchResultsQualified(scrapped)) {
continue; continue;
} }
if (earlyReturnTimer) {
clearTimeout(earlyReturnTimer); clearTimeout(earlyReturnTimer);
}
chargeAmount = this.getChargeAmount(scrapped); chargeAmount = this.getChargeAmount(scrapped);
return assignTransferProtocolMeta(`${scrapped}`, { contentType: 'text/plain', envelope: null }); return assignTransferProtocolMeta(`${scrapped}`, { contentType: 'text/plain', envelope: null });
} }
if (earlyReturnTimer) {
clearTimeout(earlyReturnTimer); clearTimeout(earlyReturnTimer);
}
if (!lastScrapped) { if (!lastScrapped) {
throw new AssertionFailureError(`No content available for query ${searchQuery}`); throw new AssertionFailureError(`No content available for query ${searchQuery}`);
@ -331,7 +359,6 @@ export class SearcherHost extends RPCHost {
chargeAmount = this.getChargeAmount(lastScrapped); chargeAmount = this.getChargeAmount(lastScrapped);
} }
return assignTransferProtocolMeta(`${lastScrapped}`, { contentType: 'text/plain', envelope: null }); return assignTransferProtocolMeta(`${lastScrapped}`, { contentType: 'text/plain', envelope: null });
} }

View File

@ -52,44 +52,44 @@ export class CrawlerOptions extends AutoCastable {
const ctx = Reflect.get(input, RPC_CALL_ENVIRONMENT) as { const ctx = Reflect.get(input, RPC_CALL_ENVIRONMENT) as {
req: Request, req: Request,
res: Response, res: Response,
}; } | undefined;
const customMode = ctx.req.get('x-respond-with') || ctx.req.get('x-return-format'); const customMode = ctx?.req.get('x-respond-with') || ctx?.req.get('x-return-format');
if (customMode !== undefined) { if (customMode !== undefined) {
instance.respondWith = customMode; instance.respondWith = customMode;
} }
const withGeneratedAlt = ctx.req.get('x-with-generated-alt'); const withGeneratedAlt = ctx?.req.get('x-with-generated-alt');
if (withGeneratedAlt !== undefined) { if (withGeneratedAlt !== undefined) {
instance.withGeneratedAlt = Boolean(withGeneratedAlt); instance.withGeneratedAlt = Boolean(withGeneratedAlt);
} }
const withLinksSummary = ctx.req.get('x-with-links-summary'); const withLinksSummary = ctx?.req.get('x-with-links-summary');
if (withLinksSummary !== undefined) { if (withLinksSummary !== undefined) {
instance.withLinksSummary = Boolean(withLinksSummary); instance.withLinksSummary = Boolean(withLinksSummary);
} }
const withImagesSummary = ctx.req.get('x-with-images-summary'); const withImagesSummary = ctx?.req.get('x-with-images-summary');
if (withImagesSummary !== undefined) { if (withImagesSummary !== undefined) {
instance.withImagesSummary = Boolean(withImagesSummary); instance.withImagesSummary = Boolean(withImagesSummary);
} }
const noCache = ctx.req.get('x-no-cache'); const noCache = ctx?.req.get('x-no-cache');
if (noCache !== undefined) { if (noCache !== undefined) {
instance.noCache = Boolean(noCache); instance.noCache = Boolean(noCache);
}
if (instance.noCache && instance.cacheTolerance === undefined) { if (instance.noCache && instance.cacheTolerance === undefined) {
instance.cacheTolerance = 0; instance.cacheTolerance = 0;
} }
} let cacheTolerance = parseInt(ctx?.req.get('x-cache-tolerance') || '');
let cacheTolerance = parseInt(ctx.req.get('x-cache-tolerance') || '');
if (!isNaN(cacheTolerance)) { if (!isNaN(cacheTolerance)) {
instance.cacheTolerance = cacheTolerance; instance.cacheTolerance = cacheTolerance;
} }
const targetSelector = ctx.req.get('x-target-selector'); const targetSelector = ctx?.req.get('x-target-selector');
instance.targetSelector ??= targetSelector; instance.targetSelector ??= targetSelector;
const waitForSelector = ctx.req.get('x-wait-for-selector'); const waitForSelector = ctx?.req.get('x-wait-for-selector');
instance.waitForSelector ??= waitForSelector || instance.targetSelector; instance.waitForSelector ??= waitForSelector || instance.targetSelector;
const cookies: CookieParam[] = []; const cookies: CookieParam[] = [];
const setCookieHeaders = ctx.req.headers['x-set-cookie'] || (instance.setCookies as any as string[]); const setCookieHeaders = ctx?.req.headers['x-set-cookie'] || (instance.setCookies as any as string[]);
if (Array.isArray(setCookieHeaders)) { if (Array.isArray(setCookieHeaders)) {
for (const setCookie of setCookieHeaders) { for (const setCookie of setCookieHeaders) {
cookies.push({ cookies.push({
@ -102,9 +102,23 @@ export class CrawlerOptions extends AutoCastable {
}); });
} }
const proxyUrl = ctx.req.get('x-proxy-url'); const proxyUrl = ctx?.req.get('x-proxy-url');
instance.proxyUrl ??= proxyUrl; instance.proxyUrl ??= proxyUrl;
if (instance.cacheTolerance) {
instance.cacheTolerance = instance.cacheTolerance * 1000;
}
return instance;
}
}
export class CrawlerOptionsHeaderOnly extends CrawlerOptions {
static override from(input: any) {
const instance = super.from({
[RPC_CALL_ENVIRONMENT]: Reflect.get(input, RPC_CALL_ENVIRONMENT),
}) as CrawlerOptionsHeaderOnly;
return instance; return instance;
} }
} }

View File

@ -13,6 +13,7 @@ Object.assign(exports, registry.exportGrouped({
memory: '4GiB', memory: '4GiB',
timeoutSeconds: 540, timeoutSeconds: 540,
})); }));
registry.allHandsOnDeck().catch(() => void 0);
registry.title = 'reader'; registry.title = 'reader';
registry.version = '0.1.0'; registry.version = '0.1.0';

View File

@ -6,6 +6,8 @@ import { AsyncService, HashManager } from 'civkit';
import { Logger } from '../shared/services/logger'; import { Logger } from '../shared/services/logger';
import { PDFContent } from '../db/pdf'; import { PDFContent } from '../db/pdf';
import dayjs from 'dayjs'; import dayjs from 'dayjs';
import { FirebaseStorageBucketControl } from '../shared';
import { randomUUID } from 'crypto';
const utc = require('dayjs/plugin/utc'); // Import the UTC plugin const utc = require('dayjs/plugin/utc'); // Import the UTC plugin
dayjs.extend(utc); // Extend dayjs with the UTC plugin dayjs.extend(utc); // Extend dayjs with the UTC plugin
const timezone = require('dayjs/plugin/timezone'); const timezone = require('dayjs/plugin/timezone');
@ -46,6 +48,7 @@ export class PDFExtractor extends AsyncService {
constructor( constructor(
protected globalLogger: Logger, protected globalLogger: Logger,
protected firebaseObjectStorage: FirebaseStorageBucketControl,
) { ) {
super(...arguments); super(...arguments);
} }
@ -225,24 +228,48 @@ export class PDFExtractor extends AsyncService {
return { meta: meta.info as Record<string, any>, content: mdChunks.join(''), text: rawChunks.join('') }; return { meta: meta.info as Record<string, any>, content: mdChunks.join(''), text: rawChunks.join('') };
} }
async cachedExtract(url: string | URL) { async cachedExtract(url: string | URL, cacheTolerance: number = 1000 * 3600 * 24) {
if (!url) { if (!url) {
return undefined; return undefined;
} }
const digest = md5Hasher.hash(url.toString()); const digest = md5Hasher.hash(url.toString());
const shortDigest = Buffer.from(digest, 'hex').toString('base64url');
const existing = await PDFContent.fromFirestore(shortDigest); const cache: PDFContent | undefined = (await PDFContent.fromFirestoreQuery(PDFContent.COLLECTION.where('urlDigest', '==', digest).orderBy('createdAt', 'desc').limit(1)))?.[0];
if (existing) { if (cache) {
const age = Date.now() - cache?.createdAt.valueOf();
const stale = cache.createdAt.valueOf() < (Date.now() - cacheTolerance);
this.logger.info(`${stale ? 'Stale cache exists' : 'Cache hit'} for PDF ${url}, normalized digest: ${digest}, ${age}ms old, tolerance ${cacheTolerance}ms`, {
url, digest, age, stale, cacheTolerance
});
if (!stale) {
if (cache.content && cache.text) {
return { return {
meta: existing.meta, meta: cache.meta,
content: existing.content, content: cache.content,
text: existing.text text: cache.text
}; };
} }
try {
const r = await this.firebaseObjectStorage.downloadFile(`pdfs/${cache._id}`);
let cached = JSON.parse(r.toString('utf-8'));
return {
meta: cached.meta,
content: cached.content,
text: cached.text
};
} catch (err) {
this.logger.warn(`Unable to load cached content for ${url}`, { err });
return undefined;
}
}
}
let extracted; let extracted;
try { try {
@ -253,14 +280,16 @@ export class PDFExtractor extends AsyncService {
// Don't try again until the next day // Don't try again until the next day
const expireMixin = extracted ? {} : { expireAt: new Date(Date.now() + 1000 * 3600 * 24) }; const expireMixin = extracted ? {} : { expireAt: new Date(Date.now() + 1000 * 3600 * 24) };
const theID = randomUUID();
await this.firebaseObjectStorage.saveFile(`pdfs/${theID}`,
Buffer.from(JSON.stringify(extracted), 'utf-8'), { contentType: 'application/json' });
await PDFContent.COLLECTION.doc(shortDigest).set( await PDFContent.COLLECTION.doc(theID).set(
{ {
_id: shortDigest,
src: url.toString(), src: url.toString(),
meta: extracted?.meta || {}, meta: extracted?.meta || {},
content: extracted?.content || '',
text: extracted?.text || '', text: extracted?.text || '',
content: extracted?.content || '',
urlDigest: digest, urlDigest: digest,
createdAt: new Date(), createdAt: new Date(),
...expireMixin ...expireMixin

@ -1 +1 @@
Subproject commit b0b597800a36e2aa8ee3d52715aa7c998b388f47 Subproject commit a3a13b13fbef8e9f5d388bde6fca6b459e6f92a6