diff --git a/src/api/crawler.ts b/src/api/crawler.ts index 6b6b8cf..c4423eb 100644 --- a/src/api/crawler.ts +++ b/src/api/crawler.ts @@ -82,6 +82,8 @@ export class CrawlerHost extends RPCHost { abuseBlockMs = 1000 * 3600; domainProfileRetentionMs = 1000 * 3600 * 24 * 30; + batchedCaches: Crawled[] = []; + constructor( protected globalLogger: GlobalLogger, protected puppeteerControl: PuppeteerControl, @@ -152,6 +154,27 @@ export class CrawlerHost extends RPCHost { }); }); + + setInterval(() => { + const thisBatch = this.batchedCaches; + this.batchedCaches = []; + if (!thisBatch.length) { + return; + } + const batch = Crawled.DB.batch(); + + for (const x of thisBatch) { + batch.set(Crawled.COLLECTION.doc(x._id), x.degradeForFireStore(), { merge: true }); + } + + batch.commit() + .then(() => { + this.logger.debug(`Saved ${thisBatch.length} caches by batch`); + }) + .catch((err) => { + this.logger.warn(`Failed to save cache in batch`, { err }); + }); + }, 1000 * 10 + Math.round(1000 * Math.random())).unref(); } override async init() { @@ -633,13 +656,14 @@ export class CrawlerHost extends RPCHost { cache.pageshotAvailable = true; } await savingOfSnapshot; - const r = await Crawled.save(cache.degradeForFireStore()).catch((err) => { - this.logger.error(`Failed to save cache for ${urlToCrawl}`, { err: marshalErrorLike(err) }); + this.batchedCaches.push(cache); + // const r = await Crawled.save(cache.degradeForFireStore()).catch((err) => { + // this.logger.error(`Failed to save cache for ${urlToCrawl}`, { err: marshalErrorLike(err) }); - return undefined; - }); + // return undefined; + // }); - return r; + return cache; } async *iterSnapshots(urlToCrawl: URL, crawlOpts?: ExtraScrappingOptions, crawlerOpts?: CrawlerOptions) { diff --git a/src/api/searcher.ts b/src/api/searcher.ts index fe00c30..387ddd4 100644 --- a/src/api/searcher.ts +++ b/src/api/searcher.ts @@ -61,6 +61,8 @@ export class SearcherHost extends RPCHost { updateAgeOnHas: false, }); + batchedCaches: SERPResult[] = []; + constructor( protected globalLogger: GlobalLogger, protected rateLimitControl: RateLimitControl, @@ -72,6 +74,26 @@ export class SearcherHost extends RPCHost { protected jinaSerp: InternalJinaSerpService, ) { super(...arguments); + + setInterval(() => { + const thisBatch = this.batchedCaches; + this.batchedCaches = []; + if (!thisBatch.length) { + return; + } + const batch = SERPResult.DB.batch(); + + for (const x of thisBatch) { + batch.set(SERPResult.COLLECTION.doc(), x.degradeForFireStore()); + } + batch.commit() + .then(() => { + this.logger.debug(`Saved ${thisBatch.length} caches by batch`); + }) + .catch((err) => { + this.logger.warn(`Failed to cache search result in batch`, { err }); + }); + }, 1000 * 60 * 10 + Math.round(1000 * Math.random())).unref(); } override async init() { @@ -780,9 +802,8 @@ export class SearcherHost extends RPCHost { createdAt: nowDate, expireAt: new Date(nowDate.valueOf() + this.cacheRetentionMs) }); - SERPResult.save(record.degradeForFireStore()).catch((err) => { - this.logger.warn(`Failed to cache search result`, { err }); - }); + + this.batchedCaches.push(record); } else if (lastError) { throw lastError; } diff --git a/src/api/serp.ts b/src/api/serp.ts index e11fc6c..2779e5a 100644 --- a/src/api/serp.ts +++ b/src/api/serp.ts @@ -63,6 +63,8 @@ export class SerpHost extends RPCHost { updateAgeOnHas: false, }); + batchedCaches: SERPResult[] = []; + async getIndex(ctx: Context, auth?: JinaEmbeddingsAuthDTO) { const indexObject: Record = Object.create(indexProto); Object.assign(indexObject, { @@ -92,6 +94,26 @@ export class SerpHost extends RPCHost { protected serperBing: SerperBingSearchService, ) { super(...arguments); + + setInterval(() => { + const thisBatch = this.batchedCaches; + this.batchedCaches = []; + if (!thisBatch.length) { + return; + } + const batch = SERPResult.DB.batch(); + + for (const x of thisBatch) { + batch.set(SERPResult.COLLECTION.doc(), x.degradeForFireStore()); + } + batch.commit() + .then(() => { + this.logger.debug(`Saved ${thisBatch.length} caches by batch`); + }) + .catch((err) => { + this.logger.warn(`Failed to cache search result in batch`, { err }); + }); + }, 1000 * 60 * 10 + Math.round(1000 * Math.random())).unref(); } override async init() { @@ -516,9 +538,7 @@ export class SerpHost extends RPCHost { createdAt: nowDate, expireAt: new Date(nowDate.valueOf() + this.cacheRetentionMs) }); - SERPResult.save(record.degradeForFireStore()).catch((err) => { - this.logger.warn(`Failed to cache search result`, { err }); - }); + this.batchedCaches.push(record); } else if (lastError) { throw lastError; } diff --git a/thinapps-shared b/thinapps-shared index c48c226..a23636b 160000 --- a/thinapps-shared +++ b/thinapps-shared @@ -1 +1 @@ -Subproject commit c48c226fbb595773cb08baee26a9fce299dc275e +Subproject commit a23636b2161908eefd897b6976c10a5924e2cd57