chore: refactor code

This commit is contained in:
Aaron Ji 2025-04-01 10:53:47 +08:00
parent d50aa2e923
commit 0febf765df

View File

@ -36,9 +36,14 @@ interface FormattedPage extends RealFormattedPage {
type RateLimitCache = { type RateLimitCache = {
error?: Error; error?: Error;
timeout?: NodeJS.Timeout; timeout?: NodeJS.Timeout;
banned: boolean; rejected: boolean;
}; };
type UserCache = {
error?: Error;
user?: JinaEmbeddingsTokenAccount;
}
@singleton() @singleton()
export class SearcherHost extends RPCHost { export class SearcherHost extends RPCHost {
logger = this.globalLogger.child({ service: this.constructor.name }); logger = this.globalLogger.child({ service: this.constructor.name });
@ -58,7 +63,7 @@ export class SearcherHost extends RPCHost {
updateAgeOnHas: false, updateAgeOnHas: false,
}); });
userCache = new LRUCache<{}, JinaEmbeddingsTokenAccount>({ userCache = new LRUCache<{}, UserCache>({
max: 256, max: 256,
ttl: 60 * 60 * 1000, ttl: 60 * 60 * 1000,
updateAgeOnGet: false, updateAgeOnGet: false,
@ -162,24 +167,31 @@ export class SearcherHost extends RPCHost {
}); });
} }
let user: JinaEmbeddingsTokenAccount | undefined = this.userCache.get(authToken); let userCache: UserCache | undefined = this.userCache.get(authToken);
let rateLimitResult: RateLimitCache | undefined = this.rateLimitCache.get(authToken);
let user: JinaEmbeddingsTokenAccount;
const userPromise = this.assertUser(auth); const userPromise = this.assertUser(auth);
if (!user) { if (!userCache?.user) {
user = await userPromise; user = await userPromise;
} else { } else {
// resolve uncaught error // resolve uncaught error
userPromise.catch(() => {}); userPromise.catch(() => {});
if (userCache.error instanceof Error) {
throw userCache.error;
}
user = userCache.user;
} }
let rateLimitResult: RateLimitCache | undefined = this.rateLimitCache.get(authToken);
const rateLimitPromise = this.assertRateLimit(rpcReflect, auth, user, rpcReflectPayload); const rateLimitPromise = this.assertRateLimit(rpcReflect, auth, user, rpcReflectPayload);
if (rateLimitResult !== undefined) { if (!rateLimitResult) {
await rateLimitPromise;
} else {
// resolve uncaught error // resolve uncaught error
rateLimitPromise.catch(() => {}); rateLimitPromise.catch(() => {});
// throw rate limit error when cache is error result from last time. // throw rate limit error when cache is error result from last time.
if (rateLimitResult.error instanceof Error) throw rateLimitResult; if (rateLimitResult.error instanceof Error) throw rateLimitResult.error;
} else {
await rateLimitPromise;
} }
delete crawlerOptions.html; delete crawlerOptions.html;
@ -627,16 +639,29 @@ export class SearcherHost extends RPCHost {
} }
async assertUser(auth: JinaEmbeddingsAuthDTO) { async assertUser(auth: JinaEmbeddingsAuthDTO) {
let user: JinaEmbeddingsTokenAccount | undefined = undefined; let cacheValue: UserCache | undefined = undefined;
const cacheUser = !!this.rateLimitCache.get(auth.bearerToken!);
try { try {
user = await auth.assertUser(); const user = await auth.assertUser();
if (!(user.wallet.total_balance > 0)) { if (!(user.wallet.total_balance > 0)) {
throw new InsufficientBalanceError(`Account balance not enough to run this query, please recharge.`); throw new InsufficientBalanceError(`Account balance not enough to run this query, please recharge.`);
} }
this.userCache.set(auth.bearerToken!, user); cacheValue = {
user,
error: undefined,
};
return user; return user;
} catch (e: any) { } catch (e: any) {
cacheValue = {
error: e,
user: undefined,
}
throw e; throw e;
} finally {
if (cacheUser) {
this.userCache.set(auth.bearerToken!, cacheValue);
}
} }
} }
@ -654,7 +679,7 @@ export class SearcherHost extends RPCHost {
}) })
]; ];
const isBigRateLimit = rateLimitPolicy.some(x => x.occurrence >= 500); const isHighRateLimit = rateLimitPolicy.some(x => x.occurrence >= 2);
let cacheValue: RateLimitCache | undefined = undefined; let cacheValue: RateLimitCache | undefined = undefined;
try { try {
const apiRoll = await this.rateLimitControl.simpleRPCUidBasedLimit( const apiRoll = await this.rateLimitControl.simpleRPCUidBasedLimit(
@ -673,27 +698,55 @@ export class SearcherHost extends RPCHost {
}); });
cacheValue = { cacheValue = {
banned: false, rejected: false,
}; };
} catch (e: any) { } catch (e: any) {
const cache: RateLimitCache | undefined = this.rateLimitCache.get(auth.bearerToken!); const cache: RateLimitCache | undefined = this.rateLimitCache.get(auth.bearerToken!);
if (cache && cache.banned !== false) { if (cache && cache.rejected !== false) {
clearTimeout(cache.timeout); clearTimeout(cache.timeout);
} }
const timeoutId = setTimeout(() => {
this.rateLimitCache.set(auth.bearerToken!, {rejected: false});
}, e.retryAfter * 1000);
cacheValue = { cacheValue = {
error: e, error: e,
timeout: setTimeout(() => { timeout: timeoutId,
this.rateLimitCache.set(auth.bearerToken!, {banned: false}); rejected: true,
}, e.retryAfter * 1000),
banned: true,
}; };
throw e; throw e;
} finally { } finally {
if (isBigRateLimit) { if (isHighRateLimit) {
this.rateLimitCache.set(auth.bearerToken!, cacheValue); this.rateLimitCache.set(auth.bearerToken!, cacheValue);
} }
} }
} }
/**
* @deprecated
* @param cacheStack
* @param key
* @param assertFn
* @returns
*/
async assertCache(cacheStack: any, key: string, assertFn: Function) {
let cache = cacheStack.get(key);
const assertPromise = assertFn();
let assertResult: any;
if (!cache) {
assertResult = await assertPromise;
} else {
assertPromise.catch(() => {});
if (cache.error instanceof Error) {
throw cache.error;
}
assertResult = cache;
}
return assertResult;
}
} }
const dataItems = [ const dataItems = [