From 1e5e94f3f578091d5c3ba09a5d0f8145500c7390 Mon Sep 17 00:00:00 2001 From: Yanlong Wang Date: Mon, 21 Apr 2025 19:14:01 +0800 Subject: [PATCH 1/2] saas(search): switch to internal serp --- src/api/{searcher-serper.ts => searcher.ts} | 169 +++++++++++--------- src/services/serp/internal.ts | 77 +++++++++ src/stand-alone/search.ts | 2 +- thinapps-shared | 2 +- 4 files changed, 174 insertions(+), 76 deletions(-) rename src/api/{searcher-serper.ts => searcher.ts} (87%) create mode 100644 src/services/serp/internal.ts diff --git a/src/api/searcher-serper.ts b/src/api/searcher.ts similarity index 87% rename from src/api/searcher-serper.ts rename to src/api/searcher.ts index 8cb8c67..7486331 100644 --- a/src/api/searcher-serper.ts +++ b/src/api/searcher.ts @@ -9,10 +9,9 @@ import _ from 'lodash'; import { RateLimitControl, RateLimitDesc, RateLimitTriggeredError } from '../shared/services/rate-limit'; import { CrawlerHost, ExtraScrappingOptions } from './crawler'; -import { SerperSearchResult } from '../db/searched'; import { CrawlerOptions, RESPOND_TIMING } from '../dto/crawler-options'; import { SnapshotFormatter, FormattedPage as RealFormattedPage } from '../services/snapshot-formatter'; -import { GoogleSearchExplicitOperatorsDto, SerperSearchService } from '../services/serper-search'; +import { GoogleSearchExplicitOperatorsDto } from '../services/serper-search'; import { GlobalLogger } from '../services/logger'; import { AsyncLocalContext } from '../services/async-context'; @@ -20,19 +19,16 @@ import { Context, Ctx, Method, Param, RPCReflect } from '../services/registry'; import { OutputServerEventStream } from '../lib/transform-server-event-stream'; import { JinaEmbeddingsAuthDTO } from '../dto/jina-embeddings-auth'; import { InsufficientBalanceError } from '../services/errors'; -import { - SerperImageSearchResponse, - SerperNewsSearchResponse, - SerperSearchQueryParams, - SerperSearchResponse, - SerperWebSearchResponse, - WORLD_COUNTRIES, - WORLD_LANGUAGES -} from '../shared/3rd-party/serper-search'; + +import { SerperBingSearchService, SerperGoogleSearchService } from '../services/serp/serper'; import { toAsyncGenerator } from '../utils/misc'; import type { JinaEmbeddingsTokenAccount } from '../shared/db/jina-embeddings-token-account'; import { LRUCache } from 'lru-cache'; import { API_CALL_STATUS } from '../shared/db/api-roll'; +import { SERPResult } from '../db/searched'; +import { SerperSearchQueryParams, WORLD_COUNTRIES, WORLD_LANGUAGES } from '../shared/3rd-party/serper-search'; +import { InternalJinaSerpService } from '../services/serp/internal'; +import { WebSearchEntry } from '../services/serp/compat'; const WORLD_COUNTRY_CODES = Object.keys(WORLD_COUNTRIES).map((x) => x.toLowerCase()); @@ -69,9 +65,11 @@ export class SearcherHost extends RPCHost { protected globalLogger: GlobalLogger, protected rateLimitControl: RateLimitControl, protected threadLocal: AsyncLocalContext, - protected serperSearchService: SerperSearchService, protected crawler: CrawlerHost, protected snapshotFormatter: SnapshotFormatter, + protected serperGoogle: SerperGoogleSearchService, + protected serperBing: SerperBingSearchService, + protected jinaSerp: InternalJinaSerpService, ) { super(...arguments); } @@ -318,9 +316,14 @@ export class SearcherHost extends RPCHost { throw new AssertionFailureError(`No search results available for query ${searchQuery}`); } + if (crawlOpts.timeoutMs && crawlOpts.timeoutMs < 30_000) { + delete crawlOpts.timeoutMs; + } + + let lastScrapped: any[] | undefined; const targetResultCount = crawlWithoutContent ? count : count + 2; - const trimmedResults = results.filter((x) => Boolean(x.link)).slice(0, targetResultCount).map((x) => this.mapToFinalResults(x)); + const trimmedResults: any[] = results.filter((x) => Boolean(x.link)).slice(0, targetResultCount).map((x) => this.mapToFinalResults(x)); trimmedResults.toString = function () { let r = this.map((x, i) => x ? Reflect.apply(x.toString, x, [i]) : '').join('\n\n').trimEnd() + '\n'; if (fallbackQuery) { @@ -521,7 +524,7 @@ export class SearcherHost extends RPCHost { // Extract results based on variant let tryTimes = 1; - const results = await this.doSearch(params, noCache); + const results = await this.cachedSearch(params.variant, params, noCache); if (results.length || !useFallback) { return { results, query: params.q, tryTimes }; } @@ -545,7 +548,7 @@ export class SearcherHost extends RPCHost { tryTimes += 1; this.logger.info(`Retrying search with fallback query: "${query}"`); const fallbackParams = { ...params, q: query }; - const fallbackResults = await this.doSearch(fallbackParams, noCache); + const fallbackResults = await this.cachedSearch(params.variant, fallbackParams, noCache); if (fallbackResults.length > 0) { return { results: fallbackResults, query: fallbackParams.q, tryTimes }; } @@ -556,7 +559,7 @@ export class SearcherHost extends RPCHost { this.logger.info(`Retrying search with fallback query: "${query}"`); const fallbackParams = { ...params, q: query }; tryTimes += 1; - const fallbackResults = await this.doSearch(fallbackParams, noCache); + const fallbackResults = await this.cachedSearch(params.variant, fallbackParams, noCache); if (fallbackResults.length > 0) { return { results: fallbackResults, query, tryTimes }; @@ -566,22 +569,6 @@ export class SearcherHost extends RPCHost { return { results, query: originalQuery, tryTimes }; } - async doSearch( - params: SerperSearchQueryParams & { variant: 'web' | 'images' | 'news'; provider?: string; }, - noCache: boolean = false, - ) { - const response = await this.cachedSearch(params, noCache); - - let results = []; - switch (params.variant) { - case 'images': results = (response as SerperImageSearchResponse).images; break; - case 'news': results = (response as SerperNewsSearchResponse).news; break; - case 'web': default: results = (response as SerperWebSearchResponse).organic; break; - } - - return results; - } - async *fetchSearchResults( mode: string | 'markdown' | 'html' | 'text' | 'screenshot' | 'favicon' | 'content', searchResults?: FormattedPage[], @@ -706,13 +693,36 @@ export class SearcherHost extends RPCHost { } } - async cachedSearch(query: SerperSearchQueryParams & { variant: 'web' | 'images' | 'news'; provider?: string; }, noCache: boolean = false) { - const queryDigest = objHashMd5B64Of(query); + *iterProviders(preference?: string) { + if (preference === 'bing') { + yield this.serperBing; + yield this.jinaSerp; + yield this.serperGoogle; + + return; + } + + if (preference === 'google') { + yield this.jinaSerp; + yield this.serperGoogle; + yield this.serperGoogle; + + return; + } + + yield this.jinaSerp; + yield this.serperGoogle; + yield this.serperGoogle; + } + + async cachedSearch(variant: 'web' | 'news' | 'images', query: Record, noCache?: boolean): Promise { + const queryDigest = objHashMd5B64Of({ ...query, variant }); + const provider = query.provider; Reflect.deleteProperty(query, 'provider'); let cache; if (!noCache) { - cache = (await SerperSearchResult.fromFirestoreQuery( - SerperSearchResult.COLLECTION.where('queryDigest', '==', queryDigest) + cache = (await SERPResult.fromFirestoreQuery( + SERPResult.COLLECTION.where('queryDigest', '==', queryDigest) .orderBy('createdAt', 'desc') .limit(1) ))[0]; @@ -724,70 +734,81 @@ export class SearcherHost extends RPCHost { }); if (!stale) { - return cache.response as SerperSearchResponse; + return cache.response as any; } } } try { - let r; - const variant = query.variant; - Reflect.deleteProperty(query, 'variant'); - switch (variant) { - case 'images': { - r = await this.serperSearchService.imageSearch(query); - break; - } - case 'news': { - r = await this.serperSearchService.newsSearch(query); - break; - } - case 'web': - default: { - r = await this.serperSearchService.webSearch(query); - break; + let r: any[] | undefined; + let lastError; + outerLoop: + for (const client of this.iterProviders(provider)) { + const t0 = Date.now(); + try { + switch (variant) { + case 'images': { + r = await Reflect.apply(client.imageSearch, client, [query]); + break; + } + case 'news': { + r = await Reflect.apply(client.newsSearch, client, [query]); + break; + } + case 'web': + default: { + r = await Reflect.apply(client.webSearch, client, [query]); + break; + } + } + const dt = Date.now() - t0; + this.logger.info(`Search took ${dt}ms, ${client.constructor.name}(${variant})`, { searchDt: dt, variant, client: client.constructor.name }); + break outerLoop; + } catch (err) { + lastError = err; + const dt = Date.now() - t0; + this.logger.warn(`Failed to do ${variant} search using ${client.constructor.name}`, { err, variant, searchDt: dt, }); } } - const nowDate = new Date(); - const record = SerperSearchResult.from({ - query, - queryDigest, - response: r, - createdAt: nowDate, - expireAt: new Date(nowDate.valueOf() + this.cacheRetentionMs) - }); - SerperSearchResult.save(record.degradeForFireStore()).catch((err) => { - this.logger.warn(`Failed to cache search result`, { err }); - }); + if (r?.length) { + const nowDate = new Date(); + const record = SERPResult.from({ + query, + queryDigest, + response: r, + createdAt: nowDate, + expireAt: new Date(nowDate.valueOf() + this.cacheRetentionMs) + }); + SERPResult.save(record.degradeForFireStore()).catch((err) => { + this.logger.warn(`Failed to cache search result`, { err }); + }); + } else if (lastError) { + throw lastError; + } - return r; + return r as WebSearchEntry[]; } catch (err: any) { if (cache) { this.logger.warn(`Failed to fetch search result, but a stale cache is available. falling back to stale cache`, { err: marshalErrorLike(err) }); - return cache.response as SerperSearchResponse; + return cache.response as any; } throw err; } - } - mapToFinalResults(input: - | SerperImageSearchResponse['images'][0] - | SerperWebSearchResponse['organic'][0] - | SerperNewsSearchResponse['news'][0], - ) { + mapToFinalResults(input: WebSearchEntry) { const whitelistedProps = [ - 'imageUrl', 'imageWidth', 'imageHeight', 'source', 'date' + 'imageUrl', 'imageWidth', 'imageHeight', 'source', 'date', 'siteLinks' ]; const result = { title: input.title, url: input.link, description: Reflect.get(input, 'snippet'), ..._.pick(input, whitelistedProps), - } as FormattedPage; + }; return result; } diff --git a/src/services/serp/internal.ts b/src/services/serp/internal.ts new file mode 100644 index 0000000..1303369 --- /dev/null +++ b/src/services/serp/internal.ts @@ -0,0 +1,77 @@ + +import { singleton } from 'tsyringe'; +import { GlobalLogger } from '../logger'; +import { SecretExposer } from '../../shared/services/secrets'; +import { AsyncLocalContext } from '../async-context'; +import { SerperSearchQueryParams } from '../../shared/3rd-party/serper-search'; +import { BlackHoleDetector } from '../blackhole-detector'; +import { AsyncService } from 'civkit/async-service'; +import { JinaSerpApiHTTP } from '../../shared/3rd-party/internal-serp'; +import { WebSearchEntry } from './compat'; + +@singleton() +export class InternalJinaSerpService extends AsyncService { + + logger = this.globalLogger.child({ service: this.constructor.name }); + + client!: JinaSerpApiHTTP; + + constructor( + protected globalLogger: GlobalLogger, + protected secretExposer: SecretExposer, + protected threadLocal: AsyncLocalContext, + protected blackHoleDetector: BlackHoleDetector, + ) { + super(...arguments); + } + + override async init() { + await this.dependencyReady(); + this.emit('ready'); + + this.client = new JinaSerpApiHTTP(this.secretExposer.JINA_SERP_API_KEY); + } + + + async doSearch(variant: 'web' | 'images' | 'news', query: SerperSearchQueryParams) { + this.logger.debug(`Doing external search`, query); + let results; + switch (variant) { + // case 'images': { + // const r = await this.client.imageSearch(query); + + // results = r.parsed.images; + // break; + // } + // case 'news': { + // const r = await this.client.newsSearch(query); + + // results = r.parsed.news; + // break; + // } + case 'web': + default: { + const r = await this.client.webSearch(query); + + results = r.parsed.results?.map((x) => ({ ...x, link: x.url })); + break; + } + } + + this.blackHoleDetector.itWorked(); + + return results as WebSearchEntry[]; + } + + + async webSearch(query: SerperSearchQueryParams) { + return this.doSearch('web', query); + } + async imageSearch(query: SerperSearchQueryParams) { + return this.doSearch('images', query); + } + async newsSearch(query: SerperSearchQueryParams) { + return this.doSearch('news', query); + } + +} diff --git a/src/stand-alone/search.ts b/src/stand-alone/search.ts index f817223..673a3cb 100644 --- a/src/stand-alone/search.ts +++ b/src/stand-alone/search.ts @@ -4,7 +4,7 @@ import { container, singleton } from 'tsyringe'; import { KoaServer } from 'civkit/civ-rpc/koa'; import http2 from 'http2'; import http from 'http'; -import { SearcherHost } from '../api/searcher-serper'; +import { SearcherHost } from '../api/searcher'; import { FsWalk, WalkOutEntity } from 'civkit/fswalk'; import path from 'path'; import fs from 'fs'; diff --git a/thinapps-shared b/thinapps-shared index 08ded7b..3238f91 160000 --- a/thinapps-shared +++ b/thinapps-shared @@ -1 +1 @@ -Subproject commit 08ded7b8eceee7e931d52e77c87103f28c3ba9e8 +Subproject commit 3238f911b51c28960d94d3683076e48c17a57610 From 034be009a4f585eadae8e2f189fd1f04f7233145 Mon Sep 17 00:00:00 2001 From: Yanlong Wang Date: Mon, 21 Apr 2025 19:21:24 +0800 Subject: [PATCH 2/2] bump: deps --- thinapps-shared | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thinapps-shared b/thinapps-shared index 3238f91..1a7dca4 160000 --- a/thinapps-shared +++ b/thinapps-shared @@ -1 +1 @@ -Subproject commit 3238f911b51c28960d94d3683076e48c17a57610 +Subproject commit 1a7dca40c52569d455237497c7285bd25eb2e3d2