chore: add LRU cache for high rate limit user

This commit is contained in:
Aaron Ji 2025-03-31 19:28:03 +08:00
parent 9fbd751b65
commit d50aa2e923

View File

@ -1,6 +1,7 @@
import { singleton } from 'tsyringe'; import { singleton } from 'tsyringe';
import { import {
assignTransferProtocolMeta, RPCHost, RPCReflection, AssertionFailureError, assignMeta, RawString, assignTransferProtocolMeta, RPCHost, RPCReflection, AssertionFailureError, assignMeta, RawString,
AuthenticationFailedError,
} from 'civkit/civ-rpc'; } from 'civkit/civ-rpc';
import { marshalErrorLike } from 'civkit/lang'; import { marshalErrorLike } from 'civkit/lang';
import { objHashMd5B64Of } from 'civkit/hash'; import { objHashMd5B64Of } from 'civkit/hash';
@ -22,6 +23,8 @@ import { JinaEmbeddingsAuthDTO } from '../dto/jina-embeddings-auth';
import { InsufficientBalanceError } from '../services/errors'; import { InsufficientBalanceError } from '../services/errors';
import { SerperImageSearchResponse, SerperNewsSearchResponse, SerperSearchQueryParams, SerperSearchResponse, SerperWebSearchResponse, WORLD_COUNTRIES, WORLD_LANGUAGES } from '../shared/3rd-party/serper-search'; import { SerperImageSearchResponse, SerperNewsSearchResponse, SerperSearchQueryParams, SerperSearchResponse, SerperWebSearchResponse, WORLD_COUNTRIES, WORLD_LANGUAGES } from '../shared/3rd-party/serper-search';
import { toAsyncGenerator } from '../utils/misc'; import { toAsyncGenerator } from '../utils/misc';
import { LRUCache } from 'lru-cache';
import { JinaEmbeddingsTokenAccount } from '../shared/db/jina-embeddings-token-account';
const WORLD_COUNTRY_CODES = Object.keys(WORLD_COUNTRIES).map((x) => x.toLowerCase()); const WORLD_COUNTRY_CODES = Object.keys(WORLD_COUNTRIES).map((x) => x.toLowerCase());
@ -30,6 +33,12 @@ interface FormattedPage extends RealFormattedPage {
date?: string; date?: string;
} }
type RateLimitCache = {
error?: Error;
timeout?: NodeJS.Timeout;
banned: boolean;
};
@singleton() @singleton()
export class SearcherHost extends RPCHost { export class SearcherHost extends RPCHost {
logger = this.globalLogger.child({ service: this.constructor.name }); logger = this.globalLogger.child({ service: this.constructor.name });
@ -42,6 +51,20 @@ export class SearcherHost extends RPCHost {
targetResultCount = 5; targetResultCount = 5;
rateLimitCache = new LRUCache<{}, RateLimitCache>({
max: 256,
ttl: 60 * 60 * 1000,
updateAgeOnGet: false,
updateAgeOnHas: false,
});
userCache = new LRUCache<{}, JinaEmbeddingsTokenAccount>({
max: 256,
ttl: 60 * 60 * 1000,
updateAgeOnGet: false,
updateAgeOnHas: false,
});
constructor( constructor(
protected globalLogger: GlobalLogger, protected globalLogger: GlobalLogger,
protected rateLimitControl: RateLimitControl, protected rateLimitControl: RateLimitControl,
@ -106,13 +129,17 @@ export class SearcherHost extends RPCHost {
count = (num !== undefined ? num : count) ?? 10; count = (num !== undefined ? num : count) ?? 10;
const uid = await auth.solveUID(); const uid = await auth.solveUID();
const authToken = auth.bearerToken;
// Return content by default // Return content by default
const crawlWithoutContent = crawlerOptions.respondWith.includes('no-content'); const crawlWithoutContent = crawlerOptions.respondWith.includes('no-content');
const withFavicon = Boolean(ctx.get('X-With-Favicons')); const withFavicon = Boolean(ctx.get('X-With-Favicons'));
this.threadLocal.set('collect-favicon', withFavicon); this.threadLocal.set('collect-favicon', withFavicon);
crawlerOptions.respondTiming ??= RESPOND_TIMING.VISIBLE_CONTENT; crawlerOptions.respondTiming ??= RESPOND_TIMING.VISIBLE_CONTENT;
let chargeAmount = 0; const rpcReflectPayload = {
chargeAmount: 0
};
const noSlashPath = decodeURIComponent(ctx.path).slice(1); const noSlashPath = decodeURIComponent(ctx.path).slice(1);
if (!noSlashPath && !q) { if (!noSlashPath && !q) {
const index = await this.crawler.getIndex(auth); const index = await this.crawler.getIndex(auth);
@ -129,36 +156,31 @@ export class SearcherHost extends RPCHost {
); );
} }
const user = await auth.assertUser(); if (!authToken) {
if (!(user.wallet.total_balance > 0)) { throw new AuthenticationFailedError({
throw new InsufficientBalanceError(`Account balance not enough to run this query, please recharge.`); message: 'Invalid API key, please get a new one from https://jina.ai'
});
} }
const rateLimitPolicy = auth.getRateLimits(rpcReflect.name.toUpperCase()) || [ let user: JinaEmbeddingsTokenAccount | undefined = this.userCache.get(authToken);
parseInt(user.metadata?.speed_level) >= 2 ? const userPromise = this.assertUser(auth);
RateLimitDesc.from({ if (!user) {
occurrence: 400, user = await userPromise;
periodSeconds: 60 } else {
}) : // resolve uncaught error
RateLimitDesc.from({ userPromise.catch(() => {});
occurrence: 40, }
periodSeconds: 60
}) let rateLimitResult: RateLimitCache | undefined = this.rateLimitCache.get(authToken);
]; const rateLimitPromise = this.assertRateLimit(rpcReflect, auth, user, rpcReflectPayload);
if (rateLimitResult !== undefined) {
const apiRoll = await this.rateLimitControl.simpleRPCUidBasedLimit( // resolve uncaught error
rpcReflect, uid!, [rpcReflect.name.toUpperCase()], rateLimitPromise.catch(() => {});
...rateLimitPolicy // throw rate limit error when cache is error result from last time.
); if (rateLimitResult.error instanceof Error) throw rateLimitResult;
} else {
rpcReflect.finally(() => { await rateLimitPromise;
if (chargeAmount) {
auth.reportUsage(chargeAmount, `reader-${rpcReflect.name}`).catch((err) => {
this.logger.warn(`Unable to report usage for ${uid}`, { err: marshalErrorLike(err) });
});
apiRoll.chargeAmount = chargeAmount;
} }
});
delete crawlerOptions.html; delete crawlerOptions.html;
@ -257,7 +279,7 @@ export class SearcherHost extends RPCHost {
break; break;
} }
chargeAmount = this.assignChargeAmount(scrapped, count, chargeAmountScaler); rpcReflectPayload.chargeAmount = this.assignChargeAmount(scrapped, count, chargeAmountScaler);
lastScrapped = scrapped; lastScrapped = scrapped;
sseStream.write({ sseStream.write({
event: 'data', event: 'data',
@ -291,7 +313,7 @@ export class SearcherHost extends RPCHost {
return; return;
} }
await assigningOfGeneralMixins; await assigningOfGeneralMixins;
chargeAmount = this.assignChargeAmount(lastScrapped, count, chargeAmountScaler); rpcReflectPayload.chargeAmount = this.assignChargeAmount(lastScrapped, count, chargeAmountScaler);
rpcReflect.return(lastScrapped); rpcReflect.return(lastScrapped);
earlyReturn = true; earlyReturn = true;
}, ((crawlerOptions.timeout || 0) * 1000) || this.reasonableDelayMs); }, ((crawlerOptions.timeout || 0) * 1000) || this.reasonableDelayMs);
@ -312,7 +334,7 @@ export class SearcherHost extends RPCHost {
clearTimeout(earlyReturnTimer); clearTimeout(earlyReturnTimer);
} }
await assigningOfGeneralMixins; await assigningOfGeneralMixins;
chargeAmount = this.assignChargeAmount(scrapped, count, chargeAmountScaler); rpcReflectPayload.chargeAmount = this.assignChargeAmount(scrapped, count, chargeAmountScaler);
return scrapped; return scrapped;
} }
@ -327,7 +349,7 @@ export class SearcherHost extends RPCHost {
if (!earlyReturn) { if (!earlyReturn) {
await assigningOfGeneralMixins; await assigningOfGeneralMixins;
chargeAmount = this.assignChargeAmount(lastScrapped, count, chargeAmountScaler); rpcReflectPayload.chargeAmount = this.assignChargeAmount(lastScrapped, count, chargeAmountScaler);
} }
return lastScrapped; return lastScrapped;
@ -343,7 +365,7 @@ export class SearcherHost extends RPCHost {
return; return;
} }
await assigningOfGeneralMixins; await assigningOfGeneralMixins;
chargeAmount = this.assignChargeAmount(lastScrapped, count, chargeAmountScaler); rpcReflectPayload.chargeAmount = this.assignChargeAmount(lastScrapped, count, chargeAmountScaler);
rpcReflect.return(assignTransferProtocolMeta(`${lastScrapped}`, { contentType: 'text/plain', envelope: null })); rpcReflect.return(assignTransferProtocolMeta(`${lastScrapped}`, { contentType: 'text/plain', envelope: null }));
earlyReturn = true; earlyReturn = true;
}, ((crawlerOptions.timeout || 0) * 1000) || this.reasonableDelayMs); }, ((crawlerOptions.timeout || 0) * 1000) || this.reasonableDelayMs);
@ -366,7 +388,7 @@ export class SearcherHost extends RPCHost {
clearTimeout(earlyReturnTimer); clearTimeout(earlyReturnTimer);
} }
await assigningOfGeneralMixins; await assigningOfGeneralMixins;
chargeAmount = this.assignChargeAmount(scrapped, count, chargeAmountScaler); rpcReflectPayload.chargeAmount = this.assignChargeAmount(scrapped, count, chargeAmountScaler);
return assignTransferProtocolMeta(`${scrapped}`, { contentType: 'text/plain', envelope: null }); return assignTransferProtocolMeta(`${scrapped}`, { contentType: 'text/plain', envelope: null });
} }
@ -381,7 +403,7 @@ export class SearcherHost extends RPCHost {
if (!earlyReturn) { if (!earlyReturn) {
await assigningOfGeneralMixins; await assigningOfGeneralMixins;
chargeAmount = this.assignChargeAmount(lastScrapped, count, chargeAmountScaler); rpcReflectPayload.chargeAmount = this.assignChargeAmount(lastScrapped, count, chargeAmountScaler);
} }
return assignTransferProtocolMeta(`${lastScrapped}`, { contentType: 'text/plain', envelope: null }); return assignTransferProtocolMeta(`${lastScrapped}`, { contentType: 'text/plain', envelope: null });
@ -603,6 +625,75 @@ export class SearcherHost extends RPCHost {
Object.setPrototypeOf(result, searchResultProto); Object.setPrototypeOf(result, searchResultProto);
} }
async assertUser(auth: JinaEmbeddingsAuthDTO) {
let user: JinaEmbeddingsTokenAccount | undefined = undefined;
try {
user = await auth.assertUser();
if (!(user.wallet.total_balance > 0)) {
throw new InsufficientBalanceError(`Account balance not enough to run this query, please recharge.`);
}
this.userCache.set(auth.bearerToken!, user);
return user;
} catch (e: any) {
throw e;
}
}
async assertRateLimit(rpcReflect: RPCReflection, auth: JinaEmbeddingsAuthDTO, user: JinaEmbeddingsTokenAccount, rpcReflectPayload: {chargeAmount: number}) {
const uid = auth.uid!;
const rateLimitPolicy = auth.getRateLimits(rpcReflect.name.toUpperCase()) || [
parseInt(user.metadata?.speed_level) >= 2 ?
RateLimitDesc.from({
occurrence: 400,
periodSeconds: 60
}) :
RateLimitDesc.from({
occurrence: 40,
periodSeconds: 60
})
];
const isBigRateLimit = rateLimitPolicy.some(x => x.occurrence >= 500);
let cacheValue: RateLimitCache | undefined = undefined;
try {
const apiRoll = await this.rateLimitControl.simpleRPCUidBasedLimit(
rpcReflect, uid, [rpcReflect.name.toUpperCase()],
...rateLimitPolicy
);
rpcReflect.finally(() => {
const chargeAmount = rpcReflectPayload.chargeAmount;
if (chargeAmount) {
auth.reportUsage(chargeAmount, `reader-${rpcReflect.name}`).catch((err) => {
this.logger.warn(`Unable to report usage for ${uid}`, { err: marshalErrorLike(err) });
});
apiRoll.chargeAmount = chargeAmount;
}
});
cacheValue = {
banned: false,
};
} catch (e: any) {
const cache: RateLimitCache | undefined = this.rateLimitCache.get(auth.bearerToken!);
if (cache && cache.banned !== false) {
clearTimeout(cache.timeout);
}
cacheValue = {
error: e,
timeout: setTimeout(() => {
this.rateLimitCache.set(auth.bearerToken!, {banned: false});
}, e.retryAfter * 1000),
banned: true,
};
throw e;
} finally {
if (isBigRateLimit) {
this.rateLimitCache.set(auth.bearerToken!, cacheValue);
}
}
}
} }
const dataItems = [ const dataItems = [