saas: save cache in batch

This commit is contained in:
Yanlong Wang 2025-04-28 15:14:36 +08:00
parent 481d1a21af
commit 131375b8a8
No known key found for this signature in database
GPG Key ID: C0A623C0BADF9F37
4 changed files with 77 additions and 12 deletions

View File

@ -82,6 +82,8 @@ export class CrawlerHost extends RPCHost {
abuseBlockMs = 1000 * 3600; abuseBlockMs = 1000 * 3600;
domainProfileRetentionMs = 1000 * 3600 * 24 * 30; domainProfileRetentionMs = 1000 * 3600 * 24 * 30;
batchedCaches: Crawled[] = [];
constructor( constructor(
protected globalLogger: GlobalLogger, protected globalLogger: GlobalLogger,
protected puppeteerControl: PuppeteerControl, protected puppeteerControl: PuppeteerControl,
@ -152,6 +154,27 @@ export class CrawlerHost extends RPCHost {
}); });
}); });
setInterval(() => {
const thisBatch = this.batchedCaches;
this.batchedCaches = [];
if (!thisBatch.length) {
return;
}
const batch = Crawled.DB.batch();
for (const x of thisBatch) {
batch.set(Crawled.COLLECTION.doc(x._id), x.degradeForFireStore(), { merge: true });
}
batch.commit()
.then(() => {
this.logger.debug(`Saved ${thisBatch.length} caches by batch`);
})
.catch((err) => {
this.logger.warn(`Failed to save cache in batch`, { err });
});
}, 1000 * 10 + Math.round(1000 * Math.random())).unref();
} }
override async init() { override async init() {
@ -633,13 +656,14 @@ export class CrawlerHost extends RPCHost {
cache.pageshotAvailable = true; cache.pageshotAvailable = true;
} }
await savingOfSnapshot; await savingOfSnapshot;
const r = await Crawled.save(cache.degradeForFireStore()).catch((err) => { this.batchedCaches.push(cache);
this.logger.error(`Failed to save cache for ${urlToCrawl}`, { err: marshalErrorLike(err) }); // const r = await Crawled.save(cache.degradeForFireStore()).catch((err) => {
// this.logger.error(`Failed to save cache for ${urlToCrawl}`, { err: marshalErrorLike(err) });
return undefined; // return undefined;
}); // });
return r; return cache;
} }
async *iterSnapshots(urlToCrawl: URL, crawlOpts?: ExtraScrappingOptions, crawlerOpts?: CrawlerOptions) { async *iterSnapshots(urlToCrawl: URL, crawlOpts?: ExtraScrappingOptions, crawlerOpts?: CrawlerOptions) {

View File

@ -61,6 +61,8 @@ export class SearcherHost extends RPCHost {
updateAgeOnHas: false, updateAgeOnHas: false,
}); });
batchedCaches: SERPResult[] = [];
constructor( constructor(
protected globalLogger: GlobalLogger, protected globalLogger: GlobalLogger,
protected rateLimitControl: RateLimitControl, protected rateLimitControl: RateLimitControl,
@ -72,6 +74,26 @@ export class SearcherHost extends RPCHost {
protected jinaSerp: InternalJinaSerpService, protected jinaSerp: InternalJinaSerpService,
) { ) {
super(...arguments); super(...arguments);
setInterval(() => {
const thisBatch = this.batchedCaches;
this.batchedCaches = [];
if (!thisBatch.length) {
return;
}
const batch = SERPResult.DB.batch();
for (const x of thisBatch) {
batch.set(SERPResult.COLLECTION.doc(), x.degradeForFireStore());
}
batch.commit()
.then(() => {
this.logger.debug(`Saved ${thisBatch.length} caches by batch`);
})
.catch((err) => {
this.logger.warn(`Failed to cache search result in batch`, { err });
});
}, 1000 * 60 * 10 + Math.round(1000 * Math.random())).unref();
} }
override async init() { override async init() {
@ -780,9 +802,8 @@ export class SearcherHost extends RPCHost {
createdAt: nowDate, createdAt: nowDate,
expireAt: new Date(nowDate.valueOf() + this.cacheRetentionMs) expireAt: new Date(nowDate.valueOf() + this.cacheRetentionMs)
}); });
SERPResult.save(record.degradeForFireStore()).catch((err) => {
this.logger.warn(`Failed to cache search result`, { err }); this.batchedCaches.push(record);
});
} else if (lastError) { } else if (lastError) {
throw lastError; throw lastError;
} }

View File

@ -63,6 +63,8 @@ export class SerpHost extends RPCHost {
updateAgeOnHas: false, updateAgeOnHas: false,
}); });
batchedCaches: SERPResult[] = [];
async getIndex(ctx: Context, auth?: JinaEmbeddingsAuthDTO) { async getIndex(ctx: Context, auth?: JinaEmbeddingsAuthDTO) {
const indexObject: Record<string, string | number | undefined> = Object.create(indexProto); const indexObject: Record<string, string | number | undefined> = Object.create(indexProto);
Object.assign(indexObject, { Object.assign(indexObject, {
@ -92,6 +94,26 @@ export class SerpHost extends RPCHost {
protected serperBing: SerperBingSearchService, protected serperBing: SerperBingSearchService,
) { ) {
super(...arguments); super(...arguments);
setInterval(() => {
const thisBatch = this.batchedCaches;
this.batchedCaches = [];
if (!thisBatch.length) {
return;
}
const batch = SERPResult.DB.batch();
for (const x of thisBatch) {
batch.set(SERPResult.COLLECTION.doc(), x.degradeForFireStore());
}
batch.commit()
.then(() => {
this.logger.debug(`Saved ${thisBatch.length} caches by batch`);
})
.catch((err) => {
this.logger.warn(`Failed to cache search result in batch`, { err });
});
}, 1000 * 60 * 10 + Math.round(1000 * Math.random())).unref();
} }
override async init() { override async init() {
@ -516,9 +538,7 @@ export class SerpHost extends RPCHost {
createdAt: nowDate, createdAt: nowDate,
expireAt: new Date(nowDate.valueOf() + this.cacheRetentionMs) expireAt: new Date(nowDate.valueOf() + this.cacheRetentionMs)
}); });
SERPResult.save(record.degradeForFireStore()).catch((err) => { this.batchedCaches.push(record);
this.logger.warn(`Failed to cache search result`, { err });
});
} else if (lastError) { } else if (lastError) {
throw lastError; throw lastError;
} }

@ -1 +1 @@
Subproject commit c48c226fbb595773cb08baee26a9fce299dc275e Subproject commit a23636b2161908eefd897b6976c10a5924e2cd57