From d50aa2e9233f5ca9c41da128b7a4dfc3c33a7262 Mon Sep 17 00:00:00 2001 From: Aaron Ji Date: Mon, 31 Mar 2025 19:28:03 +0800 Subject: [PATCH] chore: add LRU cache for high rate limit user --- src/api/searcher-serper.ts | 161 +++++++++++++++++++++++++++++-------- 1 file changed, 126 insertions(+), 35 deletions(-) diff --git a/src/api/searcher-serper.ts b/src/api/searcher-serper.ts index 2c18e1a..2ee3361 100644 --- a/src/api/searcher-serper.ts +++ b/src/api/searcher-serper.ts @@ -1,6 +1,7 @@ import { singleton } from 'tsyringe'; import { assignTransferProtocolMeta, RPCHost, RPCReflection, AssertionFailureError, assignMeta, RawString, + AuthenticationFailedError, } from 'civkit/civ-rpc'; import { marshalErrorLike } from 'civkit/lang'; import { objHashMd5B64Of } from 'civkit/hash'; @@ -22,6 +23,8 @@ import { JinaEmbeddingsAuthDTO } from '../dto/jina-embeddings-auth'; import { InsufficientBalanceError } from '../services/errors'; import { SerperImageSearchResponse, SerperNewsSearchResponse, SerperSearchQueryParams, SerperSearchResponse, SerperWebSearchResponse, WORLD_COUNTRIES, WORLD_LANGUAGES } from '../shared/3rd-party/serper-search'; import { toAsyncGenerator } from '../utils/misc'; +import { LRUCache } from 'lru-cache'; +import { JinaEmbeddingsTokenAccount } from '../shared/db/jina-embeddings-token-account'; const WORLD_COUNTRY_CODES = Object.keys(WORLD_COUNTRIES).map((x) => x.toLowerCase()); @@ -30,6 +33,12 @@ interface FormattedPage extends RealFormattedPage { date?: string; } +type RateLimitCache = { + error?: Error; + timeout?: NodeJS.Timeout; + banned: boolean; +}; + @singleton() export class SearcherHost extends RPCHost { logger = this.globalLogger.child({ service: this.constructor.name }); @@ -42,6 +51,20 @@ export class SearcherHost extends RPCHost { targetResultCount = 5; + rateLimitCache = new LRUCache<{}, RateLimitCache>({ + max: 256, + ttl: 60 * 60 * 1000, + updateAgeOnGet: false, + updateAgeOnHas: false, + }); + + userCache = new LRUCache<{}, JinaEmbeddingsTokenAccount>({ + max: 256, + ttl: 60 * 60 * 1000, + updateAgeOnGet: false, + updateAgeOnHas: false, + }); + constructor( protected globalLogger: GlobalLogger, protected rateLimitControl: RateLimitControl, @@ -106,13 +129,17 @@ export class SearcherHost extends RPCHost { count = (num !== undefined ? num : count) ?? 10; const uid = await auth.solveUID(); + const authToken = auth.bearerToken; // Return content by default const crawlWithoutContent = crawlerOptions.respondWith.includes('no-content'); const withFavicon = Boolean(ctx.get('X-With-Favicons')); this.threadLocal.set('collect-favicon', withFavicon); crawlerOptions.respondTiming ??= RESPOND_TIMING.VISIBLE_CONTENT; - let chargeAmount = 0; + const rpcReflectPayload = { + chargeAmount: 0 + }; + const noSlashPath = decodeURIComponent(ctx.path).slice(1); if (!noSlashPath && !q) { const index = await this.crawler.getIndex(auth); @@ -129,36 +156,31 @@ export class SearcherHost extends RPCHost { ); } - const user = await auth.assertUser(); - if (!(user.wallet.total_balance > 0)) { - throw new InsufficientBalanceError(`Account balance not enough to run this query, please recharge.`); + if (!authToken) { + throw new AuthenticationFailedError({ + message: 'Invalid API key, please get a new one from https://jina.ai' + }); } - const rateLimitPolicy = auth.getRateLimits(rpcReflect.name.toUpperCase()) || [ - parseInt(user.metadata?.speed_level) >= 2 ? - RateLimitDesc.from({ - occurrence: 400, - periodSeconds: 60 - }) : - RateLimitDesc.from({ - occurrence: 40, - periodSeconds: 60 - }) - ]; + let user: JinaEmbeddingsTokenAccount | undefined = this.userCache.get(authToken); + const userPromise = this.assertUser(auth); + if (!user) { + user = await userPromise; + } else { + // resolve uncaught error + userPromise.catch(() => {}); + } - const apiRoll = await this.rateLimitControl.simpleRPCUidBasedLimit( - rpcReflect, uid!, [rpcReflect.name.toUpperCase()], - ...rateLimitPolicy - ); - - rpcReflect.finally(() => { - if (chargeAmount) { - auth.reportUsage(chargeAmount, `reader-${rpcReflect.name}`).catch((err) => { - this.logger.warn(`Unable to report usage for ${uid}`, { err: marshalErrorLike(err) }); - }); - apiRoll.chargeAmount = chargeAmount; - } - }); + let rateLimitResult: RateLimitCache | undefined = this.rateLimitCache.get(authToken); + const rateLimitPromise = this.assertRateLimit(rpcReflect, auth, user, rpcReflectPayload); + if (rateLimitResult !== undefined) { + // resolve uncaught error + rateLimitPromise.catch(() => {}); + // throw rate limit error when cache is error result from last time. + if (rateLimitResult.error instanceof Error) throw rateLimitResult; + } else { + await rateLimitPromise; + } delete crawlerOptions.html; @@ -257,7 +279,7 @@ export class SearcherHost extends RPCHost { break; } - chargeAmount = this.assignChargeAmount(scrapped, count, chargeAmountScaler); + rpcReflectPayload.chargeAmount = this.assignChargeAmount(scrapped, count, chargeAmountScaler); lastScrapped = scrapped; sseStream.write({ event: 'data', @@ -291,7 +313,7 @@ export class SearcherHost extends RPCHost { return; } await assigningOfGeneralMixins; - chargeAmount = this.assignChargeAmount(lastScrapped, count, chargeAmountScaler); + rpcReflectPayload.chargeAmount = this.assignChargeAmount(lastScrapped, count, chargeAmountScaler); rpcReflect.return(lastScrapped); earlyReturn = true; }, ((crawlerOptions.timeout || 0) * 1000) || this.reasonableDelayMs); @@ -312,7 +334,7 @@ export class SearcherHost extends RPCHost { clearTimeout(earlyReturnTimer); } await assigningOfGeneralMixins; - chargeAmount = this.assignChargeAmount(scrapped, count, chargeAmountScaler); + rpcReflectPayload.chargeAmount = this.assignChargeAmount(scrapped, count, chargeAmountScaler); return scrapped; } @@ -327,7 +349,7 @@ export class SearcherHost extends RPCHost { if (!earlyReturn) { await assigningOfGeneralMixins; - chargeAmount = this.assignChargeAmount(lastScrapped, count, chargeAmountScaler); + rpcReflectPayload.chargeAmount = this.assignChargeAmount(lastScrapped, count, chargeAmountScaler); } return lastScrapped; @@ -343,7 +365,7 @@ export class SearcherHost extends RPCHost { return; } await assigningOfGeneralMixins; - chargeAmount = this.assignChargeAmount(lastScrapped, count, chargeAmountScaler); + rpcReflectPayload.chargeAmount = this.assignChargeAmount(lastScrapped, count, chargeAmountScaler); rpcReflect.return(assignTransferProtocolMeta(`${lastScrapped}`, { contentType: 'text/plain', envelope: null })); earlyReturn = true; }, ((crawlerOptions.timeout || 0) * 1000) || this.reasonableDelayMs); @@ -366,7 +388,7 @@ export class SearcherHost extends RPCHost { clearTimeout(earlyReturnTimer); } await assigningOfGeneralMixins; - chargeAmount = this.assignChargeAmount(scrapped, count, chargeAmountScaler); + rpcReflectPayload.chargeAmount = this.assignChargeAmount(scrapped, count, chargeAmountScaler); return assignTransferProtocolMeta(`${scrapped}`, { contentType: 'text/plain', envelope: null }); } @@ -381,7 +403,7 @@ export class SearcherHost extends RPCHost { if (!earlyReturn) { await assigningOfGeneralMixins; - chargeAmount = this.assignChargeAmount(lastScrapped, count, chargeAmountScaler); + rpcReflectPayload.chargeAmount = this.assignChargeAmount(lastScrapped, count, chargeAmountScaler); } return assignTransferProtocolMeta(`${lastScrapped}`, { contentType: 'text/plain', envelope: null }); @@ -603,6 +625,75 @@ export class SearcherHost extends RPCHost { Object.setPrototypeOf(result, searchResultProto); } + + async assertUser(auth: JinaEmbeddingsAuthDTO) { + let user: JinaEmbeddingsTokenAccount | undefined = undefined; + try { + user = await auth.assertUser(); + if (!(user.wallet.total_balance > 0)) { + throw new InsufficientBalanceError(`Account balance not enough to run this query, please recharge.`); + } + this.userCache.set(auth.bearerToken!, user); + return user; + } catch (e: any) { + throw e; + } + } + + async assertRateLimit(rpcReflect: RPCReflection, auth: JinaEmbeddingsAuthDTO, user: JinaEmbeddingsTokenAccount, rpcReflectPayload: {chargeAmount: number}) { + const uid = auth.uid!; + const rateLimitPolicy = auth.getRateLimits(rpcReflect.name.toUpperCase()) || [ + parseInt(user.metadata?.speed_level) >= 2 ? + RateLimitDesc.from({ + occurrence: 400, + periodSeconds: 60 + }) : + RateLimitDesc.from({ + occurrence: 40, + periodSeconds: 60 + }) + ]; + + const isBigRateLimit = rateLimitPolicy.some(x => x.occurrence >= 500); + let cacheValue: RateLimitCache | undefined = undefined; + try { + const apiRoll = await this.rateLimitControl.simpleRPCUidBasedLimit( + rpcReflect, uid, [rpcReflect.name.toUpperCase()], + ...rateLimitPolicy + ); + + rpcReflect.finally(() => { + const chargeAmount = rpcReflectPayload.chargeAmount; + if (chargeAmount) { + auth.reportUsage(chargeAmount, `reader-${rpcReflect.name}`).catch((err) => { + this.logger.warn(`Unable to report usage for ${uid}`, { err: marshalErrorLike(err) }); + }); + apiRoll.chargeAmount = chargeAmount; + } + }); + + cacheValue = { + banned: false, + }; + } catch (e: any) { + const cache: RateLimitCache | undefined = this.rateLimitCache.get(auth.bearerToken!); + if (cache && cache.banned !== false) { + clearTimeout(cache.timeout); + } + cacheValue = { + error: e, + timeout: setTimeout(() => { + this.rateLimitCache.set(auth.bearerToken!, {banned: false}); + }, e.retryAfter * 1000), + banned: true, + }; + throw e; + } finally { + if (isBigRateLimit) { + this.rateLimitCache.set(auth.bearerToken!, cacheValue); + } + } + } } const dataItems = [