saas(search): switch to internal serp

This commit is contained in:
Yanlong Wang 2025-04-21 19:14:01 +08:00
parent 0cf8371d1c
commit 1e5e94f3f5
No known key found for this signature in database
GPG Key ID: C0A623C0BADF9F37
4 changed files with 174 additions and 76 deletions

View File

@ -9,10 +9,9 @@ import _ from 'lodash';
import { RateLimitControl, RateLimitDesc, RateLimitTriggeredError } from '../shared/services/rate-limit'; import { RateLimitControl, RateLimitDesc, RateLimitTriggeredError } from '../shared/services/rate-limit';
import { CrawlerHost, ExtraScrappingOptions } from './crawler'; import { CrawlerHost, ExtraScrappingOptions } from './crawler';
import { SerperSearchResult } from '../db/searched';
import { CrawlerOptions, RESPOND_TIMING } from '../dto/crawler-options'; import { CrawlerOptions, RESPOND_TIMING } from '../dto/crawler-options';
import { SnapshotFormatter, FormattedPage as RealFormattedPage } from '../services/snapshot-formatter'; import { SnapshotFormatter, FormattedPage as RealFormattedPage } from '../services/snapshot-formatter';
import { GoogleSearchExplicitOperatorsDto, SerperSearchService } from '../services/serper-search'; import { GoogleSearchExplicitOperatorsDto } from '../services/serper-search';
import { GlobalLogger } from '../services/logger'; import { GlobalLogger } from '../services/logger';
import { AsyncLocalContext } from '../services/async-context'; import { AsyncLocalContext } from '../services/async-context';
@ -20,19 +19,16 @@ import { Context, Ctx, Method, Param, RPCReflect } from '../services/registry';
import { OutputServerEventStream } from '../lib/transform-server-event-stream'; import { OutputServerEventStream } from '../lib/transform-server-event-stream';
import { JinaEmbeddingsAuthDTO } from '../dto/jina-embeddings-auth'; import { JinaEmbeddingsAuthDTO } from '../dto/jina-embeddings-auth';
import { InsufficientBalanceError } from '../services/errors'; import { InsufficientBalanceError } from '../services/errors';
import {
SerperImageSearchResponse, import { SerperBingSearchService, SerperGoogleSearchService } from '../services/serp/serper';
SerperNewsSearchResponse,
SerperSearchQueryParams,
SerperSearchResponse,
SerperWebSearchResponse,
WORLD_COUNTRIES,
WORLD_LANGUAGES
} from '../shared/3rd-party/serper-search';
import { toAsyncGenerator } from '../utils/misc'; import { toAsyncGenerator } from '../utils/misc';
import type { JinaEmbeddingsTokenAccount } from '../shared/db/jina-embeddings-token-account'; import type { JinaEmbeddingsTokenAccount } from '../shared/db/jina-embeddings-token-account';
import { LRUCache } from 'lru-cache'; import { LRUCache } from 'lru-cache';
import { API_CALL_STATUS } from '../shared/db/api-roll'; import { API_CALL_STATUS } from '../shared/db/api-roll';
import { SERPResult } from '../db/searched';
import { SerperSearchQueryParams, WORLD_COUNTRIES, WORLD_LANGUAGES } from '../shared/3rd-party/serper-search';
import { InternalJinaSerpService } from '../services/serp/internal';
import { WebSearchEntry } from '../services/serp/compat';
const WORLD_COUNTRY_CODES = Object.keys(WORLD_COUNTRIES).map((x) => x.toLowerCase()); const WORLD_COUNTRY_CODES = Object.keys(WORLD_COUNTRIES).map((x) => x.toLowerCase());
@ -69,9 +65,11 @@ export class SearcherHost extends RPCHost {
protected globalLogger: GlobalLogger, protected globalLogger: GlobalLogger,
protected rateLimitControl: RateLimitControl, protected rateLimitControl: RateLimitControl,
protected threadLocal: AsyncLocalContext, protected threadLocal: AsyncLocalContext,
protected serperSearchService: SerperSearchService,
protected crawler: CrawlerHost, protected crawler: CrawlerHost,
protected snapshotFormatter: SnapshotFormatter, protected snapshotFormatter: SnapshotFormatter,
protected serperGoogle: SerperGoogleSearchService,
protected serperBing: SerperBingSearchService,
protected jinaSerp: InternalJinaSerpService,
) { ) {
super(...arguments); super(...arguments);
} }
@ -318,9 +316,14 @@ export class SearcherHost extends RPCHost {
throw new AssertionFailureError(`No search results available for query ${searchQuery}`); throw new AssertionFailureError(`No search results available for query ${searchQuery}`);
} }
if (crawlOpts.timeoutMs && crawlOpts.timeoutMs < 30_000) {
delete crawlOpts.timeoutMs;
}
let lastScrapped: any[] | undefined; let lastScrapped: any[] | undefined;
const targetResultCount = crawlWithoutContent ? count : count + 2; const targetResultCount = crawlWithoutContent ? count : count + 2;
const trimmedResults = results.filter((x) => Boolean(x.link)).slice(0, targetResultCount).map((x) => this.mapToFinalResults(x)); const trimmedResults: any[] = results.filter((x) => Boolean(x.link)).slice(0, targetResultCount).map((x) => this.mapToFinalResults(x));
trimmedResults.toString = function () { trimmedResults.toString = function () {
let r = this.map((x, i) => x ? Reflect.apply(x.toString, x, [i]) : '').join('\n\n').trimEnd() + '\n'; let r = this.map((x, i) => x ? Reflect.apply(x.toString, x, [i]) : '').join('\n\n').trimEnd() + '\n';
if (fallbackQuery) { if (fallbackQuery) {
@ -521,7 +524,7 @@ export class SearcherHost extends RPCHost {
// Extract results based on variant // Extract results based on variant
let tryTimes = 1; let tryTimes = 1;
const results = await this.doSearch(params, noCache); const results = await this.cachedSearch(params.variant, params, noCache);
if (results.length || !useFallback) { if (results.length || !useFallback) {
return { results, query: params.q, tryTimes }; return { results, query: params.q, tryTimes };
} }
@ -545,7 +548,7 @@ export class SearcherHost extends RPCHost {
tryTimes += 1; tryTimes += 1;
this.logger.info(`Retrying search with fallback query: "${query}"`); this.logger.info(`Retrying search with fallback query: "${query}"`);
const fallbackParams = { ...params, q: query }; const fallbackParams = { ...params, q: query };
const fallbackResults = await this.doSearch(fallbackParams, noCache); const fallbackResults = await this.cachedSearch(params.variant, fallbackParams, noCache);
if (fallbackResults.length > 0) { if (fallbackResults.length > 0) {
return { results: fallbackResults, query: fallbackParams.q, tryTimes }; return { results: fallbackResults, query: fallbackParams.q, tryTimes };
} }
@ -556,7 +559,7 @@ export class SearcherHost extends RPCHost {
this.logger.info(`Retrying search with fallback query: "${query}"`); this.logger.info(`Retrying search with fallback query: "${query}"`);
const fallbackParams = { ...params, q: query }; const fallbackParams = { ...params, q: query };
tryTimes += 1; tryTimes += 1;
const fallbackResults = await this.doSearch(fallbackParams, noCache); const fallbackResults = await this.cachedSearch(params.variant, fallbackParams, noCache);
if (fallbackResults.length > 0) { if (fallbackResults.length > 0) {
return { results: fallbackResults, query, tryTimes }; return { results: fallbackResults, query, tryTimes };
@ -566,22 +569,6 @@ export class SearcherHost extends RPCHost {
return { results, query: originalQuery, tryTimes }; return { results, query: originalQuery, tryTimes };
} }
async doSearch(
params: SerperSearchQueryParams & { variant: 'web' | 'images' | 'news'; provider?: string; },
noCache: boolean = false,
) {
const response = await this.cachedSearch(params, noCache);
let results = [];
switch (params.variant) {
case 'images': results = (response as SerperImageSearchResponse).images; break;
case 'news': results = (response as SerperNewsSearchResponse).news; break;
case 'web': default: results = (response as SerperWebSearchResponse).organic; break;
}
return results;
}
async *fetchSearchResults( async *fetchSearchResults(
mode: string | 'markdown' | 'html' | 'text' | 'screenshot' | 'favicon' | 'content', mode: string | 'markdown' | 'html' | 'text' | 'screenshot' | 'favicon' | 'content',
searchResults?: FormattedPage[], searchResults?: FormattedPage[],
@ -706,13 +693,36 @@ export class SearcherHost extends RPCHost {
} }
} }
async cachedSearch(query: SerperSearchQueryParams & { variant: 'web' | 'images' | 'news'; provider?: string; }, noCache: boolean = false) { *iterProviders(preference?: string) {
const queryDigest = objHashMd5B64Of(query); if (preference === 'bing') {
yield this.serperBing;
yield this.jinaSerp;
yield this.serperGoogle;
return;
}
if (preference === 'google') {
yield this.jinaSerp;
yield this.serperGoogle;
yield this.serperGoogle;
return;
}
yield this.jinaSerp;
yield this.serperGoogle;
yield this.serperGoogle;
}
async cachedSearch(variant: 'web' | 'news' | 'images', query: Record<string, any>, noCache?: boolean): Promise<WebSearchEntry[]> {
const queryDigest = objHashMd5B64Of({ ...query, variant });
const provider = query.provider;
Reflect.deleteProperty(query, 'provider'); Reflect.deleteProperty(query, 'provider');
let cache; let cache;
if (!noCache) { if (!noCache) {
cache = (await SerperSearchResult.fromFirestoreQuery( cache = (await SERPResult.fromFirestoreQuery(
SerperSearchResult.COLLECTION.where('queryDigest', '==', queryDigest) SERPResult.COLLECTION.where('queryDigest', '==', queryDigest)
.orderBy('createdAt', 'desc') .orderBy('createdAt', 'desc')
.limit(1) .limit(1)
))[0]; ))[0];
@ -724,70 +734,81 @@ export class SearcherHost extends RPCHost {
}); });
if (!stale) { if (!stale) {
return cache.response as SerperSearchResponse; return cache.response as any;
} }
} }
} }
try { try {
let r; let r: any[] | undefined;
const variant = query.variant; let lastError;
Reflect.deleteProperty(query, 'variant'); outerLoop:
for (const client of this.iterProviders(provider)) {
const t0 = Date.now();
try {
switch (variant) { switch (variant) {
case 'images': { case 'images': {
r = await this.serperSearchService.imageSearch(query); r = await Reflect.apply(client.imageSearch, client, [query]);
break; break;
} }
case 'news': { case 'news': {
r = await this.serperSearchService.newsSearch(query); r = await Reflect.apply(client.newsSearch, client, [query]);
break; break;
} }
case 'web': case 'web':
default: { default: {
r = await this.serperSearchService.webSearch(query); r = await Reflect.apply(client.webSearch, client, [query]);
break; break;
} }
} }
const dt = Date.now() - t0;
this.logger.info(`Search took ${dt}ms, ${client.constructor.name}(${variant})`, { searchDt: dt, variant, client: client.constructor.name });
break outerLoop;
} catch (err) {
lastError = err;
const dt = Date.now() - t0;
this.logger.warn(`Failed to do ${variant} search using ${client.constructor.name}`, { err, variant, searchDt: dt, });
}
}
if (r?.length) {
const nowDate = new Date(); const nowDate = new Date();
const record = SerperSearchResult.from({ const record = SERPResult.from({
query, query,
queryDigest, queryDigest,
response: r, response: r,
createdAt: nowDate, createdAt: nowDate,
expireAt: new Date(nowDate.valueOf() + this.cacheRetentionMs) expireAt: new Date(nowDate.valueOf() + this.cacheRetentionMs)
}); });
SerperSearchResult.save(record.degradeForFireStore()).catch((err) => { SERPResult.save(record.degradeForFireStore()).catch((err) => {
this.logger.warn(`Failed to cache search result`, { err }); this.logger.warn(`Failed to cache search result`, { err });
}); });
} else if (lastError) {
throw lastError;
}
return r; return r as WebSearchEntry[];
} catch (err: any) { } catch (err: any) {
if (cache) { if (cache) {
this.logger.warn(`Failed to fetch search result, but a stale cache is available. falling back to stale cache`, { err: marshalErrorLike(err) }); this.logger.warn(`Failed to fetch search result, but a stale cache is available. falling back to stale cache`, { err: marshalErrorLike(err) });
return cache.response as SerperSearchResponse; return cache.response as any;
} }
throw err; throw err;
} }
} }
mapToFinalResults(input: mapToFinalResults(input: WebSearchEntry) {
| SerperImageSearchResponse['images'][0]
| SerperWebSearchResponse['organic'][0]
| SerperNewsSearchResponse['news'][0],
) {
const whitelistedProps = [ const whitelistedProps = [
'imageUrl', 'imageWidth', 'imageHeight', 'source', 'date' 'imageUrl', 'imageWidth', 'imageHeight', 'source', 'date', 'siteLinks'
]; ];
const result = { const result = {
title: input.title, title: input.title,
url: input.link, url: input.link,
description: Reflect.get(input, 'snippet'), description: Reflect.get(input, 'snippet'),
..._.pick(input, whitelistedProps), ..._.pick(input, whitelistedProps),
} as FormattedPage; };
return result; return result;
} }

View File

@ -0,0 +1,77 @@
import { singleton } from 'tsyringe';
import { GlobalLogger } from '../logger';
import { SecretExposer } from '../../shared/services/secrets';
import { AsyncLocalContext } from '../async-context';
import { SerperSearchQueryParams } from '../../shared/3rd-party/serper-search';
import { BlackHoleDetector } from '../blackhole-detector';
import { AsyncService } from 'civkit/async-service';
import { JinaSerpApiHTTP } from '../../shared/3rd-party/internal-serp';
import { WebSearchEntry } from './compat';
@singleton()
export class InternalJinaSerpService extends AsyncService {
logger = this.globalLogger.child({ service: this.constructor.name });
client!: JinaSerpApiHTTP;
constructor(
protected globalLogger: GlobalLogger,
protected secretExposer: SecretExposer,
protected threadLocal: AsyncLocalContext,
protected blackHoleDetector: BlackHoleDetector,
) {
super(...arguments);
}
override async init() {
await this.dependencyReady();
this.emit('ready');
this.client = new JinaSerpApiHTTP(this.secretExposer.JINA_SERP_API_KEY);
}
async doSearch(variant: 'web' | 'images' | 'news', query: SerperSearchQueryParams) {
this.logger.debug(`Doing external search`, query);
let results;
switch (variant) {
// case 'images': {
// const r = await this.client.imageSearch(query);
// results = r.parsed.images;
// break;
// }
// case 'news': {
// const r = await this.client.newsSearch(query);
// results = r.parsed.news;
// break;
// }
case 'web':
default: {
const r = await this.client.webSearch(query);
results = r.parsed.results?.map((x) => ({ ...x, link: x.url }));
break;
}
}
this.blackHoleDetector.itWorked();
return results as WebSearchEntry[];
}
async webSearch(query: SerperSearchQueryParams) {
return this.doSearch('web', query);
}
async imageSearch(query: SerperSearchQueryParams) {
return this.doSearch('images', query);
}
async newsSearch(query: SerperSearchQueryParams) {
return this.doSearch('news', query);
}
}

View File

@ -4,7 +4,7 @@ import { container, singleton } from 'tsyringe';
import { KoaServer } from 'civkit/civ-rpc/koa'; import { KoaServer } from 'civkit/civ-rpc/koa';
import http2 from 'http2'; import http2 from 'http2';
import http from 'http'; import http from 'http';
import { SearcherHost } from '../api/searcher-serper'; import { SearcherHost } from '../api/searcher';
import { FsWalk, WalkOutEntity } from 'civkit/fswalk'; import { FsWalk, WalkOutEntity } from 'civkit/fswalk';
import path from 'path'; import path from 'path';
import fs from 'fs'; import fs from 'fs';

@ -1 +1 @@
Subproject commit 08ded7b8eceee7e931d52e77c87103f28c3ba9e8 Subproject commit 3238f911b51c28960d94d3683076e48c17a57610