From 12ba1bcfad9d1334a3598fbbd396d88a89ed3c82 Mon Sep 17 00:00:00 2001 From: Yanlong Wang Date: Wed, 2 Apr 2025 14:58:13 +0800 Subject: [PATCH] feat: serp endpoint (#1180) * wip * wip * fix * wip * fix: add jitter to user cache * cd * fix * fix * fix: user cache age comparison * fix: try to partition apiroll query * bump: deps * wip * cd * feat: fallback for serp * fix * cd * fix * fix * serp: stop hiding expense * serp: enable fallback by default --- .github/workflows/cd.yml | 8 +- .vscode/launch.json | 22 + src/api/crawler.ts | 69 +++- src/api/searcher.ts | 503 ----------------------- src/api/serp.ts | 505 +++++++++++++++++++++++ src/db/searched.ts | 4 + src/dto/crawler-options.ts | 2 + src/dto/jina-embeddings-auth.ts | 25 +- src/services/geoip.ts | 9 + src/services/misc.ts | 10 +- src/services/puppeteer.ts | 7 +- src/services/serp/compat.ts | 12 + src/services/serp/google.ts | 314 +++++++++++++++ src/services/serp/puppeteer.ts | 692 ++++++++++++++++++++++++++++++++ src/services/serp/serper.ts | 165 ++++++++ src/stand-alone/serp.ts | 160 ++++++++ thinapps-shared | 2 +- 17 files changed, 1988 insertions(+), 521 deletions(-) delete mode 100644 src/api/searcher.ts create mode 100644 src/api/serp.ts create mode 100644 src/services/serp/compat.ts create mode 100644 src/services/serp/google.ts create mode 100644 src/services/serp/puppeteer.ts create mode 100644 src/services/serp/serper.ts create mode 100644 src/stand-alone/serp.ts diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index a963167..a039641 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -75,9 +75,15 @@ jobs: - name: Deploy SEARCH with Tag run: | gcloud beta run deploy search --image us-docker.pkg.dev/reader-6b7dc/jina-reader/reader@${{steps.container.outputs.imageid}} --tag ${{ env.RELEASE_VERSION }} --command '' --args build/stand-alone/search.js --region us-central1 --async --min-instances 0 --deploy-health-check --use-http2 + - name: Deploy SERP with Tag + run: | + gcloud beta run deploy serp --image us-docker.pkg.dev/reader-6b7dc/jina-reader/reader@${{steps.container.outputs.imageid}} --tag ${{ env.RELEASE_VERSION }} --command '' --args build/stand-alone/serp.js --region us-central1 --async --min-instances 0 --deploy-health-check --use-http2 - name: Deploy CRAWL-EU with Tag run: | gcloud beta run deploy crawl-eu --image us-docker.pkg.dev/reader-6b7dc/jina-reader/reader@${{steps.container.outputs.imageid}} --tag ${{ env.RELEASE_VERSION }} --command '' --args build/stand-alone/crawl.js --region europe-west1 --async --min-instances 0 --deploy-health-check --use-http2 - name: Deploy SEARCH-EU with Tag run: | - gcloud beta run deploy search-eu --image us-docker.pkg.dev/reader-6b7dc/jina-reader/reader@${{steps.container.outputs.imageid}} --tag ${{ env.RELEASE_VERSION }} --command '' --args build/stand-alone/search.js --region europe-west1 --async --min-instances 0 --deploy-health-check --use-http2 \ No newline at end of file + gcloud beta run deploy search-eu --image us-docker.pkg.dev/reader-6b7dc/jina-reader/reader@${{steps.container.outputs.imageid}} --tag ${{ env.RELEASE_VERSION }} --command '' --args build/stand-alone/search.js --region europe-west1 --async --min-instances 0 --deploy-health-check --use-http2 + - name: Deploy SERP-JP with Tag + run: | + gcloud beta run deploy serp-jp --image us-docker.pkg.dev/reader-6b7dc/jina-reader/reader@${{steps.container.outputs.imageid}} --tag ${{ env.RELEASE_VERSION }} --command '' --args build/stand-alone/serp.js --region asia-northeast1 --async --min-instances 0 --deploy-health-check --use-http2 \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index 350938a..c8a4a5c 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -102,5 +102,27 @@ "preLaunchTask": "Backend:build:watch", "killBehavior": "forceful" }, + { + "name": "Debug Stand Alone SERP", + "request": "launch", + "runtimeArgs": [ + "--env-file=.secret.local", + ], + "env": { + "GCLOUD_PROJECT": "reader-6b7dc", + "PREFERRED_PROXY_COUNTRY": "hk", + "OVERRIDE_GOOGLE_DOMAIN": "www.google.com.hk", + "LD_PRELOAD": "/usr/local/lib/libcurl-impersonate-chrome.dylib" + }, + "cwd": "${workspaceFolder}", + "program": "build/stand-alone/serp.js", + "skipFiles": [ + "/**" + ], + "type": "node", + "outputCapture": "std", + "preLaunchTask": "Backend:build:watch", + "killBehavior": "forceful" + }, ] } \ No newline at end of file diff --git a/src/api/crawler.ts b/src/api/crawler.ts index a82c0d3..bafdd4e 100644 --- a/src/api/crawler.ts +++ b/src/api/crawler.ts @@ -48,6 +48,7 @@ import { RobotsTxtService } from '../services/robots-text'; import { TempFileManager } from '../services/temp-file'; import { MiscService } from '../services/misc'; import { HTTPServiceError } from 'civkit'; +import { GeoIPService } from '../services/geoip'; export interface ExtraScrappingOptions extends ScrappingOptions { withIframe?: boolean | 'quoted'; @@ -58,6 +59,7 @@ export interface ExtraScrappingOptions extends ScrappingOptions { engine?: string; allocProxy?: string; private?: boolean; + countryHint?: string; } const indexProto = { @@ -94,6 +96,7 @@ export class CrawlerHost extends RPCHost { protected threadLocal: AsyncLocalContext, protected robotsTxtService: RobotsTxtService, protected tempFileManager: TempFileManager, + protected geoIpService: GeoIPService, protected miscService: MiscService, ) { super(...arguments); @@ -511,15 +514,16 @@ export class CrawlerHost extends RPCHost { }); } - const result = await this.miscService.assertNormalizedUrl(url); - if (this.puppeteerControl.circuitBreakerHosts.has(result.hostname.toLowerCase())) { + const { url: safeURL, ips } = await this.miscService.assertNormalizedUrl(url); + if (this.puppeteerControl.circuitBreakerHosts.has(safeURL.hostname.toLowerCase())) { throw new SecurityCompromiseError({ - message: `Circular hostname: ${result.protocol}`, + message: `Circular hostname: ${safeURL.protocol}`, path: 'url' }); } + crawlerOptions._hintIps = ips; - return result; + return safeURL; } getUrlDigest(urlToCrawl: URL) { @@ -886,7 +890,11 @@ export class CrawlerHost extends RPCHost { } } } else if (crawlOpts?.allocProxy && crawlOpts.allocProxy !== 'none' && !crawlOpts.proxyUrl) { - crawlOpts.proxyUrl = (await this.proxyProvider.alloc(crawlOpts.allocProxy)).href; + const proxyUrl = await this.proxyProvider.alloc(this.figureOutBestProxyCountry(crawlOpts)); + if (proxyUrl.protocol === 'socks5h:') { + proxyUrl.protocol = 'socks5:'; + } + crawlOpts.proxyUrl = proxyUrl.href; } try { @@ -1030,6 +1038,7 @@ export class CrawlerHost extends RPCHost { proxyResources: (opts.proxyUrl || opts.proxy?.endsWith('+')) ? true : false, private: Boolean(opts.doNotTrack), }; + if (crawlOpts.targetSelector?.length) { if (typeof crawlOpts.targetSelector === 'string') { crawlOpts.targetSelector = [crawlOpts.targetSelector]; @@ -1046,6 +1055,18 @@ export class CrawlerHost extends RPCHost { } } + if (opts._hintIps?.length) { + const hints = await this.geoIpService.lookupCities(opts._hintIps); + const board: Record = {}; + for (const x of hints) { + if (x.country?.code) { + board[x.country.code] = (board[x.country.code] || 0) + 1; + } + } + const hintCountry = _.maxBy(Array.from(Object.entries(board)), 1)?.[0]; + crawlOpts.countryHint = hintCountry?.toLowerCase(); + } + if (opts.locale) { crawlOpts.extraHeaders ??= {}; crawlOpts.extraHeaders['Accept-Language'] = opts.locale; @@ -1221,6 +1242,7 @@ export class CrawlerHost extends RPCHost { }; } + retryDet = new WeakSet(); @retryWith((err) => { if (err instanceof ServiceBadApproachError) { return false; @@ -1239,7 +1261,14 @@ export class CrawlerHost extends RPCHost { if (opts?.allocProxy === 'none') { return this.curlControl.sideLoad(url, opts); } - const proxy = await this.proxyProvider.alloc(opts?.allocProxy); + + const proxy = await this.proxyProvider.alloc(this.figureOutBestProxyCountry(opts)); + if (opts) { + if (this.retryDet.has(opts) && proxy.protocol === 'socks5h:') { + proxy.protocol = 'socks5:'; + } + this.retryDet.add(opts); + } const r = await this.curlControl.sideLoad(url, { ...opts, proxyUrl: proxy.href, @@ -1252,6 +1281,34 @@ export class CrawlerHost extends RPCHost { return { ...r, proxy }; } + protected figureOutBestProxyCountry(opts?: ExtraScrappingOptions) { + if (!opts) { + return 'auto'; + } + + let draft; + + if (opts.allocProxy) { + if (this.proxyProvider.supports(opts.allocProxy)) { + draft = opts.allocProxy; + } else if (opts.allocProxy === 'none') { + return 'none'; + } + } + + if (opts.countryHint) { + if (this.proxyProvider.supports(opts.countryHint)) { + draft ??= opts.countryHint; + } else if (opts.countryHint === 'cn') { + draft ??= 'hk'; + } + } + + draft ??= opts.allocProxy || 'auto'; + + return draft; + } + knownUrlThatSideLoadingWouldCrashTheBrowser(url: URL) { if (url.hostname === 'chromewebstore.google.com') { return true; diff --git a/src/api/searcher.ts b/src/api/searcher.ts deleted file mode 100644 index ad765e0..0000000 --- a/src/api/searcher.ts +++ /dev/null @@ -1,503 +0,0 @@ -import { singleton } from 'tsyringe'; -import _ from 'lodash'; - -import { - assignTransferProtocolMeta, RPCHost, RPCReflection, - AssertionFailureError, - RawString, -} from 'civkit/civ-rpc'; -import { marshalErrorLike } from 'civkit/lang'; -import { objHashMd5B64Of } from 'civkit/hash'; - -import { RateLimitControl, RateLimitDesc } from '../shared/services/rate-limit'; -import { WebSearchApiResponse, SearchResult as WebSearchResult } from '../shared/3rd-party/brave-types'; -import { WebSearchQueryParams } from '../shared/3rd-party/brave-search'; - -import { CrawlerHost, ExtraScrappingOptions } from './crawler'; -import { SearchResult } from '../db/searched'; -import { JinaEmbeddingsAuthDTO } from '../dto/jina-embeddings-auth'; -import { CrawlerOptions } from '../dto/crawler-options'; -import { BraveSearchExplicitOperatorsDto, BraveSearchService } from '../services/brave-search'; - -import { SnapshotFormatter, FormattedPage } from '../services/snapshot-formatter'; -import { GlobalLogger } from '../services/logger'; -import { AsyncLocalContext } from '../services/async-context'; -import { OutputServerEventStream } from '../lib/transform-server-event-stream'; -import { Context, Ctx, Method, Param, RPCReflect } from '../services/registry'; -import { InsufficientBalanceError } from '../services/errors'; - - -@singleton() -export class SearcherHost extends RPCHost { - logger = this.globalLogger.child({ service: this.constructor.name }); - - cacheRetentionMs = 1000 * 3600 * 24 * 7; - cacheValidMs = 1000 * 3600; - pageCacheToleranceMs = 1000 * 3600 * 24; - - reasonableDelayMs = 15_000; - - targetResultCount = 5; - - constructor( - protected globalLogger: GlobalLogger, - protected rateLimitControl: RateLimitControl, - protected threadLocal: AsyncLocalContext, - protected braveSearchService: BraveSearchService, - protected crawler: CrawlerHost, - protected snapshotFormatter: SnapshotFormatter, - ) { - super(...arguments); - } - - override async init() { - await this.dependencyReady(); - - this.emit('ready'); - } - - @Method({ - name: 'searchIndex', - ext: { - http: { - action: ['get', 'post'], - path: '/search' - } - }, - tags: ['search'], - returnType: [String, OutputServerEventStream], - }) - @Method({ - ext: { - http: { - action: ['get', 'post'], - path: '::q' - } - }, - tags: ['search'], - returnType: [String, OutputServerEventStream, RawString], - }) - async search( - @RPCReflect() rpcReflect: RPCReflection, - @Ctx() ctx: Context, - auth: JinaEmbeddingsAuthDTO, - @Param('count', { default: 5, validate: (v) => v >= 0 && v <= 10 }) - count: number, - crawlerOptions: CrawlerOptions, - braveSearchExplicitOperators: BraveSearchExplicitOperatorsDto, - @Param('q') q?: string, - ) { - const uid = await auth.solveUID(); - let chargeAmount = 0; - const noSlashPath = decodeURIComponent(ctx.path).slice(1); - if (!noSlashPath && !q) { - const index = await this.crawler.getIndex(auth); - if (!uid) { - index.note = 'Authentication is required to use this endpoint. Please provide a valid API key via Authorization header.'; - } - if (!ctx.accepts('text/plain') && (ctx.accepts('text/json') || ctx.accepts('application/json'))) { - - return index; - } - - return assignTransferProtocolMeta(`${index}`, - { contentType: 'text/plain', envelope: null } - ); - } - - const user = await auth.assertUser(); - if (!(user.wallet.total_balance > 0)) { - throw new InsufficientBalanceError(`Account balance not enough to run this query, please recharge.`); - } - - const rateLimitPolicy = auth.getRateLimits(rpcReflect.name.toUpperCase()) || [ - parseInt(user.metadata?.speed_level) >= 2 ? - RateLimitDesc.from({ - occurrence: 100, - periodSeconds: 60 - }) : - RateLimitDesc.from({ - occurrence: 40, - periodSeconds: 60 - }) - ]; - - const apiRoll = await this.rateLimitControl.simpleRPCUidBasedLimit( - rpcReflect, uid!, [rpcReflect.name.toUpperCase()], - ...rateLimitPolicy - ); - - rpcReflect.finally(() => { - if (chargeAmount) { - auth.reportUsage(chargeAmount, `reader-${rpcReflect.name}`).catch((err) => { - this.logger.warn(`Unable to report usage for ${uid}`, { err: marshalErrorLike(err) }); - }); - apiRoll.chargeAmount = chargeAmount; - } - }); - - delete crawlerOptions.html; - - const crawlOpts = await this.crawler.configure(crawlerOptions); - const searchQuery = braveSearchExplicitOperators.addTo(q || noSlashPath); - const r = await this.cachedWebSearch({ - q: searchQuery, - count: count ? Math.floor(count + 2) : 20 - }, crawlerOptions.noCache); - - if (!r.web?.results.length) { - throw new AssertionFailureError(`No search results available for query ${searchQuery}`); - } - - if (crawlOpts.timeoutMs && crawlOpts.timeoutMs < 30_000) { - delete crawlOpts.timeoutMs; - } - - const it = this.fetchSearchResults(crawlerOptions.respondWith, r.web?.results.slice(0, count + 2), crawlOpts, - CrawlerOptions.from({ ...crawlerOptions, cacheTolerance: crawlerOptions.cacheTolerance ?? this.pageCacheToleranceMs }), - count, - ); - - if (!ctx.accepts('text/plain') && ctx.accepts('text/event-stream')) { - const sseStream = new OutputServerEventStream(); - rpcReflect.return(sseStream); - - try { - for await (const scrapped of it) { - if (!scrapped) { - continue; - } - - chargeAmount = this.assignChargeAmount(scrapped); - sseStream.write({ - event: 'data', - data: scrapped, - }); - } - } catch (err: any) { - this.logger.error(`Failed to collect search result for query ${searchQuery}`, - { err: marshalErrorLike(err) } - ); - sseStream.write({ - event: 'error', - data: marshalErrorLike(err), - }); - } - - sseStream.end(); - - return sseStream; - } - - let lastScrapped: any[] | undefined; - let earlyReturn = false; - if (!ctx.accepts('text/plain') && (ctx.accepts('text/json') || ctx.accepts('application/json'))) { - let earlyReturnTimer: ReturnType | undefined; - const setEarlyReturnTimer = () => { - if (earlyReturnTimer) { - return; - } - earlyReturnTimer = setTimeout(() => { - if (!lastScrapped) { - return; - } - chargeAmount = this.assignChargeAmount(lastScrapped); - rpcReflect.return(lastScrapped); - earlyReturn = true; - }, ((crawlerOptions.timeout || 0) * 1000) || this.reasonableDelayMs); - }; - - for await (const scrapped of it) { - lastScrapped = scrapped; - if (_.some(scrapped, (x) => this.pageQualified(x))) { - setEarlyReturnTimer(); - } - if (!this.searchResultsQualified(scrapped, count)) { - continue; - } - if (earlyReturnTimer) { - clearTimeout(earlyReturnTimer); - } - chargeAmount = this.assignChargeAmount(scrapped); - - return scrapped; - } - - if (earlyReturnTimer) { - clearTimeout(earlyReturnTimer); - } - - if (!lastScrapped) { - throw new AssertionFailureError(`No content available for query ${searchQuery}`); - } - - if (!earlyReturn) { - chargeAmount = this.assignChargeAmount(lastScrapped); - } - - return lastScrapped; - } - - let earlyReturnTimer: ReturnType | undefined; - const setEarlyReturnTimer = () => { - if (earlyReturnTimer) { - return; - } - earlyReturnTimer = setTimeout(() => { - if (!lastScrapped) { - return; - } - chargeAmount = this.assignChargeAmount(lastScrapped); - rpcReflect.return(assignTransferProtocolMeta(`${lastScrapped}`, { contentType: 'text/plain', envelope: null })); - earlyReturn = true; - }, ((crawlerOptions.timeout || 0) * 1000) || this.reasonableDelayMs); - }; - - for await (const scrapped of it) { - lastScrapped = scrapped; - - if (_.some(scrapped, (x) => this.pageQualified(x))) { - setEarlyReturnTimer(); - } - - if (!this.searchResultsQualified(scrapped, count)) { - continue; - } - - if (earlyReturnTimer) { - clearTimeout(earlyReturnTimer); - } - - chargeAmount = this.assignChargeAmount(scrapped); - - return assignTransferProtocolMeta(`${scrapped}`, { contentType: 'text/plain', envelope: null }); - } - - if (earlyReturnTimer) { - clearTimeout(earlyReturnTimer); - } - - if (!lastScrapped) { - throw new AssertionFailureError(`No content available for query ${searchQuery}`); - } - - if (!earlyReturn) { - chargeAmount = this.assignChargeAmount(lastScrapped); - } - - return assignTransferProtocolMeta(`${lastScrapped}`, { contentType: 'text/plain', envelope: null }); - } - - async *fetchSearchResults( - mode: string | 'markdown' | 'html' | 'text' | 'screenshot', - searchResults?: WebSearchResult[], - options?: ExtraScrappingOptions, - crawlerOptions?: CrawlerOptions, - count?: number, - ) { - if (!searchResults) { - return; - } - if (count === 0) { - const resultArray = searchResults.map((upstreamSearchResult, i) => ({ - url: upstreamSearchResult.url, - title: upstreamSearchResult.title, - description: upstreamSearchResult.description, - content: ['html', 'text', 'screenshot'].includes(mode) ? undefined : '', - toString() { - return `[${i + 1}] Title: ${this.title} -[${i + 1}] URL Source: ${this.url} -[${i + 1}] Description: ${this.description} -`; - } - - })) as FormattedPage[]; - resultArray.toString = function () { - return this.map((x, i) => x ? x.toString() : '').join('\n\n').trimEnd() + '\n'; - }; - yield resultArray; - return; - } - const urls = searchResults.map((x) => new URL(x.url)); - const snapshotMap = new WeakMap(); - for await (const scrapped of this.crawler.scrapMany(urls, options, crawlerOptions)) { - const mapped = scrapped.map((x, i) => { - const upstreamSearchResult = searchResults[i]; - if (!x) { - return { - url: upstreamSearchResult.url, - title: upstreamSearchResult.title, - description: upstreamSearchResult.description, - content: ['html', 'text', 'screenshot'].includes(mode) ? undefined : '' - }; - } - if (snapshotMap.has(x)) { - return snapshotMap.get(x); - } - return this.snapshotFormatter.formatSnapshot(mode, x, urls[i]).then((r) => { - r.title ??= upstreamSearchResult.title; - r.description = upstreamSearchResult.description; - snapshotMap.set(x, r); - - return r; - }).catch((err) => { - this.logger.error(`Failed to format snapshot for ${urls[i].href}`, { err: marshalErrorLike(err) }); - - return { - url: upstreamSearchResult.url, - title: upstreamSearchResult.title, - description: upstreamSearchResult.description, - content: x.text, - }; - }); - }); - - const resultArray = await Promise.all(mapped) as FormattedPage[]; - - yield this.reOrganizeSearchResults(resultArray, count); - } - } - - reOrganizeSearchResults(searchResults: FormattedPage[], count?: number) { - const targetResultCount = count || this.targetResultCount; - const [qualifiedPages, unqualifiedPages] = _.partition(searchResults, (x) => this.pageQualified(x)); - const acceptSet = new Set(qualifiedPages); - - const n = targetResultCount - qualifiedPages.length; - for (const x of unqualifiedPages.slice(0, n >= 0 ? n : 0)) { - acceptSet.add(x); - } - - const filtered = searchResults.filter((x) => acceptSet.has(x)).slice(0, targetResultCount); - - const resultArray = filtered.map((x, i) => { - return { - ...x, - toString(this: any) { - if (!this.content && this.description) { - if (this.title || x.textRepresentation) { - const textRep = x.textRepresentation ? `\n[${i + 1}] Content: \n${x.textRepresentation}` : ''; - return `[${i + 1}] Title: ${this.title} -[${i + 1}] URL Source: ${this.url} -[${i + 1}] Description: ${this.description}${textRep} -`; - } - - return `[${i + 1}] No content available for ${this.url}`; - } - - const mixins = []; - if (this.description) { - mixins.push(`[${i + 1}] Description: ${this.description}`); - } - if (this.publishedTime) { - mixins.push(`[${i + 1}] Published Time: ${this.publishedTime}`); - } - - const suffixMixins = []; - if (this.images) { - const imageSummaryChunks = [`[${i + 1}] Images:`]; - for (const [k, v] of Object.entries(this.images)) { - imageSummaryChunks.push(`- ![${k}](${v})`); - } - if (imageSummaryChunks.length === 1) { - imageSummaryChunks.push('This page does not seem to contain any images.'); - } - suffixMixins.push(imageSummaryChunks.join('\n')); - } - if (this.links) { - const linkSummaryChunks = [`[${i + 1}] Links/Buttons:`]; - for (const [k, v] of Object.entries(this.links)) { - linkSummaryChunks.push(`- [${k}](${v})`); - } - if (linkSummaryChunks.length === 1) { - linkSummaryChunks.push('This page does not seem to contain any buttons/links.'); - } - suffixMixins.push(linkSummaryChunks.join('\n')); - } - - return `[${i + 1}] Title: ${this.title} -[${i + 1}] URL Source: ${this.url}${mixins.length ? `\n${mixins.join('\n')}` : ''} -[${i + 1}] Markdown Content: -${this.content} -${suffixMixins.length ? `\n${suffixMixins.join('\n')}\n` : ''}`; - } - }; - }); - - resultArray.toString = function () { - return this.map((x, i) => x ? x.toString() : `[${i + 1}] No content available for ${this[i].url}`).join('\n\n').trimEnd() + '\n'; - }; - - return resultArray; - } - - assignChargeAmount(formatted: FormattedPage[]) { - return _.sum( - formatted.map((x) => this.crawler.assignChargeAmount(x) || 0) - ); - } - - pageQualified(formattedPage: FormattedPage) { - return formattedPage.title && - formattedPage.content || - formattedPage.screenshotUrl || - formattedPage.pageshotUrl || - formattedPage.text || - formattedPage.html; - } - - searchResultsQualified(results: FormattedPage[], targetResultCount = this.targetResultCount) { - return _.every(results, (x) => this.pageQualified(x)) && results.length >= targetResultCount; - } - - async cachedWebSearch(query: WebSearchQueryParams, noCache: boolean = false) { - const queryDigest = objHashMd5B64Of(query); - let cache; - if (!noCache) { - cache = (await SearchResult.fromFirestoreQuery( - SearchResult.COLLECTION.where('queryDigest', '==', queryDigest) - .orderBy('createdAt', 'desc') - .limit(1) - ))[0]; - if (cache) { - const age = Date.now() - cache.createdAt.valueOf(); - const stale = cache.createdAt.valueOf() < (Date.now() - this.cacheValidMs); - this.logger.info(`${stale ? 'Stale cache exists' : 'Cache hit'} for search query "${query.q}", normalized digest: ${queryDigest}, ${age}ms old`, { - query, digest: queryDigest, age, stale - }); - - if (!stale) { - return cache.response as WebSearchApiResponse; - } - } - } - - try { - const r = await this.braveSearchService.webSearch(query); - - const nowDate = new Date(); - const record = SearchResult.from({ - query, - queryDigest, - response: r, - createdAt: nowDate, - expireAt: new Date(nowDate.valueOf() + this.cacheRetentionMs) - }); - SearchResult.save(record.degradeForFireStore()).catch((err) => { - this.logger.warn(`Failed to cache search result`, { err }); - }); - - return r; - } catch (err: any) { - if (cache) { - this.logger.warn(`Failed to fetch search result, but a stale cache is available. falling back to stale cache`, { err: marshalErrorLike(err) }); - - return cache.response as WebSearchApiResponse; - } - - throw err; - } - - } -} diff --git a/src/api/serp.ts b/src/api/serp.ts new file mode 100644 index 0000000..37106c1 --- /dev/null +++ b/src/api/serp.ts @@ -0,0 +1,505 @@ +import { singleton } from 'tsyringe'; +import { + RPCHost, RPCReflection, assignMeta, RawString, + ParamValidationError, + assignTransferProtocolMeta, +} from 'civkit/civ-rpc'; +import { marshalErrorLike } from 'civkit/lang'; +import _ from 'lodash'; + +import { RateLimitControl, RateLimitDesc } from '../shared/services/rate-limit'; + +import { GlobalLogger } from '../services/logger'; +import { AsyncLocalContext } from '../services/async-context'; +import { Context, Ctx, Method, Param, RPCReflect } from '../services/registry'; +import { OutputServerEventStream } from '../lib/transform-server-event-stream'; +import { JinaEmbeddingsAuthDTO } from '../dto/jina-embeddings-auth'; +import { InsufficientBalanceError, RateLimitTriggeredError } from '../services/errors'; +import { WORLD_COUNTRIES, WORLD_LANGUAGES } from '../shared/3rd-party/serper-search'; +import { GoogleSERP } from '../services/serp/google'; +import { WebSearchEntry } from '../services/serp/compat'; +import { CrawlerOptions } from '../dto/crawler-options'; +import { ScrappingOptions } from '../services/serp/puppeteer'; +import { objHashMd5B64Of } from 'civkit/hash'; +import { SERPResult } from '../db/searched'; +import { SerperBingSearchService, SerperGoogleSearchService } from '../services/serp/serper'; +import type { JinaEmbeddingsTokenAccount } from '../shared/db/jina-embeddings-token-account'; +import { LRUCache } from 'lru-cache'; + +const WORLD_COUNTRY_CODES = Object.keys(WORLD_COUNTRIES).map((x) => x.toLowerCase()); + +type RateLimitCache = { + blockedUntil?: Date; + user?: JinaEmbeddingsTokenAccount; +}; + +const indexProto = { + toString: function (): string { + return _(this) + .toPairs() + .map(([k, v]) => k ? `[${_.upperFirst(_.lowerCase(k))}] ${v}` : '') + .value() + .join('\n') + '\n'; + } +}; + +@singleton() +export class SerpHost extends RPCHost { + logger = this.globalLogger.child({ service: this.constructor.name }); + + cacheRetentionMs = 1000 * 3600 * 24 * 7; + cacheValidMs = 1000 * 3600; + pageCacheToleranceMs = 1000 * 3600 * 24; + + reasonableDelayMs = 15_000; + + targetResultCount = 5; + + highFreqKeyCache = new LRUCache({ + max: 256, + ttl: 60 * 60 * 1000, + updateAgeOnGet: false, + updateAgeOnHas: false, + }); + + async getIndex(ctx: Context, auth?: JinaEmbeddingsAuthDTO) { + const indexObject: Record = Object.create(indexProto); + Object.assign(indexObject, { + usage1: 'https://r.jina.ai/YOUR_URL', + usage2: 'https://s.jina.ai/YOUR_SEARCH_QUERY', + usage3: `${ctx.origin}/search/YOUR_SEARCH_QUERY`, + homepage: 'https://jina.ai/reader', + sourceCode: 'https://github.com/jina-ai/reader', + }); + + if (auth && auth.user) { + indexObject[''] = undefined; + indexObject.authenticatedAs = `${auth.user.user_id} (${auth.user.full_name})`; + indexObject.balanceLeft = auth.user.wallet.total_balance; + } else { + indexObject.note = 'Authentication is required to use this endpoint. Please provide a valid API key via Authorization header.'; + } + + return indexObject; + } + + constructor( + protected globalLogger: GlobalLogger, + protected rateLimitControl: RateLimitControl, + protected threadLocal: AsyncLocalContext, + protected googleSerp: GoogleSERP, + protected serperGoogle: SerperGoogleSearchService, + protected serperBing: SerperBingSearchService, + ) { + super(...arguments); + } + + override async init() { + await this.dependencyReady(); + + this.emit('ready'); + } + + @Method({ + name: 'searchIndex', + ext: { + http: { + action: ['get', 'post'], + path: '/' + } + }, + tags: ['search'], + returnType: [String, OutputServerEventStream, RawString], + }) + @Method({ + ext: { + http: { + action: ['get', 'post'], + } + }, + tags: ['search'], + returnType: [String, OutputServerEventStream, RawString], + }) + async search( + @RPCReflect() rpcReflect: RPCReflection, + @Ctx() ctx: Context, + crawlerOptions: CrawlerOptions, + auth: JinaEmbeddingsAuthDTO, + @Param('type', { type: new Set(['web', 'images', 'news']), default: 'web' }) + variant: 'web' | 'images' | 'news', + @Param('q') q?: string, + @Param('provider', { type: new Set(['google', 'bing']) }) + searchEngine?: 'google' | 'bing', + @Param('num', { validate: (v: number) => v >= 0 && v <= 20 }) + num?: number, + @Param('gl', { validate: (v: string) => WORLD_COUNTRY_CODES.includes(v?.toLowerCase()) }) gl?: string, + @Param('hl', { validate: (v: string) => WORLD_LANGUAGES.some(l => l.code === v) }) hl?: string, + @Param('location') location?: string, + @Param('page') page?: number, + @Param('fallback', { default: true }) fallback?: boolean, + ) { + const authToken = auth.bearerToken; + let highFreqKey: RateLimitCache | undefined; + if (authToken && this.highFreqKeyCache.has(authToken)) { + highFreqKey = this.highFreqKeyCache.get(authToken)!; + auth.user = highFreqKey.user; + auth.uid = highFreqKey.user?.user_id; + } + + const uid = await auth.solveUID(); + if (!q) { + if (ctx.path === '/') { + const indexObject = this.getIndex(ctx, auth); + if (!ctx.accepts('text/plain') && (ctx.accepts('text/json') || ctx.accepts('application/json'))) { + return indexObject; + } + + return assignTransferProtocolMeta(`${indexObject}`, + { contentType: 'text/plain; charset=utf-8', envelope: null } + ); + } + throw new ParamValidationError({ + path: 'q', + message: `Required but not provided` + }); + } + // Return content by default + const user = await auth.assertUser(); + if (!(user.wallet.total_balance > 0)) { + throw new InsufficientBalanceError(`Account balance not enough to run this query, please recharge.`); + } + + if (highFreqKey?.blockedUntil) { + const now = new Date(); + const blockedTimeRemaining = (highFreqKey.blockedUntil.valueOf() - now.valueOf()); + if (blockedTimeRemaining > 0) { + throw RateLimitTriggeredError.from({ + message: `Per UID rate limit exceeded (async)`, + retryAfter: Math.ceil(blockedTimeRemaining / 1000), + }); + } + } + + const PREMIUM_KEY_LIMIT = 400; + const rateLimitPolicy = auth.getRateLimits('SEARCH') || [ + parseInt(user.metadata?.speed_level) >= 2 ? + RateLimitDesc.from({ + occurrence: PREMIUM_KEY_LIMIT, + periodSeconds: 60 + }) : + RateLimitDesc.from({ + occurrence: 40, + periodSeconds: 60 + }) + ]; + + const apiRollPromise = this.rateLimitControl.simpleRPCUidBasedLimit( + rpcReflect, uid!, ['SEARCH'], + ...rateLimitPolicy + ); + + if (!highFreqKey) { + // Normal path + await apiRollPromise; + + if (rateLimitPolicy.some( + (x) => { + const rpm = x.occurrence / (x.periodSeconds / 60); + if (rpm >= PREMIUM_KEY_LIMIT) { + return true; + } + + return false; + }) + ) { + this.highFreqKeyCache.set(auth.bearerToken!, { + user, + }); + } + } else { + // High freq key path + apiRollPromise.then( + // Rate limit not triggered, make sure not blocking. + () => { + delete highFreqKey.blockedUntil; + }, + // Rate limit triggered + (err) => { + if (!(err instanceof RateLimitTriggeredError)) { + return; + } + const now = Date.now(); + let tgtDate; + if (err.retryAfter) { + tgtDate = new Date(now + err.retryAfter * 1000); + } else if (err.retryAfterDate) { + tgtDate = err.retryAfterDate; + } + + if (tgtDate) { + const dt = tgtDate.valueOf() - now; + highFreqKey.blockedUntil = tgtDate; + setTimeout(() => { + if (highFreqKey.blockedUntil === tgtDate) { + delete highFreqKey.blockedUntil; + } + }, dt).unref(); + } + } + ).finally(async () => { + // Always asynchronously update user(wallet); + const user = await auth.getBrief().catch(() => undefined); + if (user) { + highFreqKey.user = user; + } + }); + } + + let chargeAmount = 0; + rpcReflect.finally(async () => { + if (chargeAmount) { + auth.reportUsage(chargeAmount, `reader-serp`).catch((err) => { + this.logger.warn(`Unable to report usage for ${uid}`, { err: marshalErrorLike(err) }); + }); + const apiRoll = await apiRollPromise; + apiRoll.chargeAmount = chargeAmount; + } + }); + + let chargeAmountScaler = 1; + if (searchEngine === 'bing') { + chargeAmountScaler = 3; + } + if (variant !== 'web') { + chargeAmountScaler = 5; + } + + let realQuery = q; + let results = await this.cachedSearch(variant, { + provider: searchEngine, + q, + num, + gl, + hl, + location, + page, + }, crawlerOptions); + + + if (fallback && !results?.length && (!page || page === 1)) { + let tryTimes = 1; + const containsRTL = /[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF\u0590-\u05FF\uFB1D-\uFB4F\u0700-\u074F\u0780-\u07BF\u07C0-\u07FF]/.test(q); + const terms = q.split(/\s+/g).filter((x) => !!x); + while (terms.length > 1) { + containsRTL ? terms.shift() : terms.pop(); // reduce the query by one term at a time + realQuery = terms.join(' ').trim(); + if (!realQuery) { + break; + } + this.logger.info(`Retrying search with fallback query: "${realQuery}"`); + results = await this.cachedSearch(variant, { + provider: searchEngine, + q: realQuery, + num, + gl, + hl, + location, + }, crawlerOptions); + tryTimes += 1; + if (results?.length) { + break; + } + } + chargeAmountScaler *= tryTimes; + } + + if (!results?.length) { + results = []; + } + + const finalResults = results.map((x: any) => this.mapToFinalResults(x)); + + await Promise.all(finalResults.map((x: any) => this.assignGeneralMixin(x))); + + this.assignChargeAmount(finalResults, chargeAmountScaler); + assignMeta(finalResults, { + query: realQuery, + fallback: realQuery === q ? undefined : realQuery, + }); + + return finalResults; + } + + + assignChargeAmount(items: unknown[], scaler: number) { + const numCharge = Math.ceil(items.length / 10) * 10000 * scaler; + assignMeta(items, { usage: { tokens: numCharge } }); + + return numCharge; + } + + async getFavicon(domain: string) { + const url = `https://www.google.com/s2/favicons?sz=32&domain_url=${domain}`; + + try { + const response = await fetch(url); + if (!response.ok) { + return ''; + } + const ab = await response.arrayBuffer(); + const buffer = Buffer.from(ab); + const base64 = buffer.toString('base64'); + return `data:image/png;base64,${base64}`; + } catch (error: any) { + this.logger.warn(`Failed to get favicon base64 string`, { err: marshalErrorLike(error) }); + return ''; + } + } + + async configure(opts: CrawlerOptions) { + const crawlOpts: ScrappingOptions = { + proxyUrl: opts.proxyUrl, + cookies: opts.setCookies, + overrideUserAgent: opts.userAgent, + timeoutMs: opts.timeout ? opts.timeout * 1000 : undefined, + locale: opts.locale, + referer: opts.referer, + viewport: opts.viewport, + proxyResources: (opts.proxyUrl || opts.proxy?.endsWith('+')) ? true : false, + allocProxy: opts.proxy?.endsWith('+') ? opts.proxy.slice(0, -1) : opts.proxy, + }; + + if (opts.locale) { + crawlOpts.extraHeaders ??= {}; + crawlOpts.extraHeaders['Accept-Language'] = opts.locale; + } + + return crawlOpts; + } + + mapToFinalResults(input: WebSearchEntry) { + const whitelistedProps = [ + 'imageUrl', 'imageWidth', 'imageHeight', 'source', 'date', 'siteLinks' + ]; + const result = { + title: input.title, + url: input.link, + description: Reflect.get(input, 'snippet'), + ..._.pick(input, whitelistedProps), + }; + + return result; + } + + *iterProviders(preference?: string) { + if (preference === 'bing') { + yield this.serperBing; + yield this.serperGoogle; + yield this.googleSerp; + + return; + } + + if (preference === 'google') { + yield this.googleSerp; + yield this.googleSerp; + yield this.serperGoogle; + + return; + } + + yield this.serperGoogle; + yield this.googleSerp; + yield this.googleSerp; + } + + async cachedSearch(variant: 'web' | 'news' | 'images', query: Record, opts: CrawlerOptions) { + const queryDigest = objHashMd5B64Of({ ...query, variant }); + const provider = query.provider; + Reflect.deleteProperty(query, 'provider'); + const noCache = opts.noCache; + let cache; + if (!noCache) { + cache = (await SERPResult.fromFirestoreQuery( + SERPResult.COLLECTION.where('queryDigest', '==', queryDigest) + .orderBy('createdAt', 'desc') + .limit(1) + ))[0]; + if (cache) { + const age = Date.now() - cache.createdAt.valueOf(); + const stale = cache.createdAt.valueOf() < (Date.now() - this.cacheValidMs); + this.logger.info(`${stale ? 'Stale cache exists' : 'Cache hit'} for search query "${query.q}", normalized digest: ${queryDigest}, ${age}ms old`, { + query, digest: queryDigest, age, stale + }); + + if (!stale) { + return cache.response as any; + } + } + } + const scrappingOptions = await this.configure(opts); + + try { + let r: any[] | undefined; + let lastError; + outerLoop: + for (const client of this.iterProviders(provider)) { + try { + switch (variant) { + case 'images': { + r = await Reflect.apply(client.imageSearch, client, [query, scrappingOptions]); + break outerLoop; + } + case 'news': { + r = await Reflect.apply(client.newsSearch, client, [query, scrappingOptions]); + break outerLoop; + } + case 'web': + default: { + r = await Reflect.apply(client.webSearch, client, [query, scrappingOptions]); + break outerLoop; + } + } + } catch (err) { + lastError = err; + this.logger.warn(`Failed to do ${variant} search using ${client.constructor.name}`, { err }); + } + } + + if (r?.length) { + const nowDate = new Date(); + const record = SERPResult.from({ + query, + queryDigest, + response: r, + createdAt: nowDate, + expireAt: new Date(nowDate.valueOf() + this.cacheRetentionMs) + }); + SERPResult.save(record.degradeForFireStore()).catch((err) => { + this.logger.warn(`Failed to cache search result`, { err }); + }); + } else if (lastError) { + throw lastError; + } + + return r; + } catch (err: any) { + if (cache) { + this.logger.warn(`Failed to fetch search result, but a stale cache is available. falling back to stale cache`, { err: marshalErrorLike(err) }); + + return cache.response as any; + } + + throw err; + } + } + + async assignGeneralMixin(result: Partial) { + const collectFavicon = this.threadLocal.get('collect-favicon'); + + if (collectFavicon && result.link) { + const url = new URL(result.link); + Reflect.set(result, 'favicon', await this.getFavicon(url.origin)); + } + } +} diff --git a/src/db/searched.ts b/src/db/searched.ts index 8098bd8..aa05ad0 100644 --- a/src/db/searched.ts +++ b/src/db/searched.ts @@ -62,3 +62,7 @@ export class SearchResult extends FirestoreRecord { export class SerperSearchResult extends SearchResult { static override collectionName = 'serperSearchResults'; } + +export class SERPResult extends SearchResult { + static override collectionName = 'SERPResults'; +} \ No newline at end of file diff --git a/src/dto/crawler-options.ts b/src/dto/crawler-options.ts index ce5ccef..2004706 100644 --- a/src/dto/crawler-options.ts +++ b/src/dto/crawler-options.ts @@ -429,6 +429,8 @@ export class CrawlerOptions extends AutoCastable { }) respondTiming?: RESPOND_TIMING; + _hintIps?: string[]; + static override from(input: any) { const instance = super.from(input) as CrawlerOptions; const ctx = Reflect.get(input, RPC_CALL_ENVIRONMENT) as Context | undefined; diff --git a/src/dto/jina-embeddings-auth.ts b/src/dto/jina-embeddings-auth.ts index 120bd5e..246087d 100644 --- a/src/dto/jina-embeddings-auth.ts +++ b/src/dto/jina-embeddings-auth.ts @@ -1,8 +1,9 @@ import _ from 'lodash'; import { Also, AuthenticationFailedError, AuthenticationRequiredError, - DownstreamServiceFailureError, RPC_CALL_ENVIRONMENT, + RPC_CALL_ENVIRONMENT, AutoCastable, + DownstreamServiceError, } from 'civkit/civ-rpc'; import { htmlEscape } from 'civkit/escape'; import { marshalErrorLike } from 'civkit/lang'; @@ -96,12 +97,14 @@ export class JinaEmbeddingsAuthDTO extends AutoCastable { }); } + let firestoreDegradation = false; let account; try { account = await JinaEmbeddingsTokenAccount.fromFirestore(this.bearerToken); } catch (err) { // FireStore would not accept any string as input and may throw if not happy with it - void 0; + firestoreDegradation = true; + logger.warn(`Firestore issue`, { err }); } @@ -109,7 +112,7 @@ export class JinaEmbeddingsAuthDTO extends AutoCastable { const jitter = Math.ceil(Math.random() * 30 * 1000); if (account && !ignoreCache) { - if (account && (age < (180_000 - jitter))) { + if ((age < (180_000 - jitter)) && (account.wallet?.total_balance > 0)) { this.user = account; this.uid = this.user?.user_id; @@ -117,6 +120,20 @@ export class JinaEmbeddingsAuthDTO extends AutoCastable { } } + if (firestoreDegradation) { + logger.debug(`Using remote UC cached user`); + const r = await this.jinaEmbeddingsDashboard.authorization(this.bearerToken); + const brief = r.data; + const draftAccount = JinaEmbeddingsTokenAccount.from({ + ...account, ...brief, _id: this.bearerToken, + lastSyncedAt: new Date() + }); + this.user = draftAccount; + this.uid = this.user?.user_id; + + return draftAccount; + } + try { const r = await this.jinaEmbeddingsDashboard.validateToken(this.bearerToken); const brief = r.data; @@ -148,7 +165,7 @@ export class JinaEmbeddingsAuthDTO extends AutoCastable { } - throw new DownstreamServiceFailureError(`Failed to authenticate: ${err}`); + throw new DownstreamServiceError(`Failed to authenticate: ${err}`); } } diff --git a/src/services/geoip.ts b/src/services/geoip.ts index 5ff38ee..f6501d7 100644 --- a/src/services/geoip.ts +++ b/src/services/geoip.ts @@ -4,6 +4,7 @@ import { CityResponse, Reader } from 'maxmind'; import { AsyncService, AutoCastable, Prop, runOnce } from 'civkit'; import { GlobalLogger } from './logger'; import path from 'path'; +import { Threaded } from './threaded'; export enum GEOIP_SUPPORTED_LANGUAGES { EN = 'en', @@ -85,6 +86,7 @@ export class GeoIPService extends AsyncService { } + @Threaded() async lookupCity(ip: string, lang: GEOIP_SUPPORTED_LANGUAGES = GEOIP_SUPPORTED_LANGUAGES.EN) { await this._lazyload(); @@ -116,6 +118,13 @@ export class GeoIPService extends AsyncService { }); } + @Threaded() + async lookupCities(ips: string[], lang: GEOIP_SUPPORTED_LANGUAGES = GEOIP_SUPPORTED_LANGUAGES.EN) { + const r = (await Promise.all(ips.map((ip) => this.lookupCity(ip, lang)))).filter(Boolean) as GeoIPCityResponse[]; + + return r; + } + } const instance = container.resolve(GeoIPService); diff --git a/src/services/misc.ts b/src/services/misc.ts index 993ae7f..82ea5b5 100644 --- a/src/services/misc.ts +++ b/src/services/misc.ts @@ -57,7 +57,11 @@ export class MiscService extends AsyncService { } const normalizedHostname = result.hostname.startsWith('[') ? result.hostname.slice(1, -1) : result.hostname; + let ips: string[] = []; const isIp = isIP(normalizedHostname); + if (isIp) { + ips.push(normalizedHostname); + } if ( (result.hostname === 'localhost') || (isIp && isIPInNonPublicRange(normalizedHostname)) @@ -88,12 +92,16 @@ export class MiscService extends AsyncService { path: 'url' }); } + ips.push(x.address); } } } - return result; + return { + url: result, + ips + }; } } \ No newline at end of file diff --git a/src/services/puppeteer.ts b/src/services/puppeteer.ts index 49fdfc6..72f588d 100644 --- a/src/services/puppeteer.ts +++ b/src/services/puppeteer.ts @@ -562,7 +562,8 @@ export class PuppeteerControl extends AsyncService { headless: !Boolean(process.env.DEBUG_BROWSER), executablePath: process.env.OVERRIDE_CHROME_EXECUTABLE_PATH, args: [ - '--disable-dev-shm-usage', '--disable-blink-features=AutomationControlled' + '--disable-dev-shm-usage', + '--disable-blink-features=AutomationControlled' ] }).catch((err: any) => { this.logger.error(`Unknown firebase issue, just die fast.`, { err }); @@ -1618,11 +1619,7 @@ export class PuppeteerControl extends AsyncService { } } try { - const pSubFrameSnapshots = this.snapshotChildFrames(page); snapshot = await page.evaluate('giveSnapshot(true)') as PageSnapshot; - if (snapshot) { - snapshot.childFrames = await pSubFrameSnapshots; - } } catch (err: any) { this.logger.warn(`Page ${sn}: Failed to finalize ${url}`, { err }); if (stuff instanceof Error) { diff --git a/src/services/serp/compat.ts b/src/services/serp/compat.ts new file mode 100644 index 0000000..1543c34 --- /dev/null +++ b/src/services/serp/compat.ts @@ -0,0 +1,12 @@ +export interface WebSearchEntry { + link: string; + title: string; + source?: string; + date?: string; + snippet?: string; + imageUrl?: string; + siteLinks?: { + link: string; title: string; snippet?: string; + }[]; + variant?: 'web' | 'images' | 'news'; +} \ No newline at end of file diff --git a/src/services/serp/google.ts b/src/services/serp/google.ts new file mode 100644 index 0000000..11b4ac4 --- /dev/null +++ b/src/services/serp/google.ts @@ -0,0 +1,314 @@ +import { singleton } from 'tsyringe'; +import { AsyncService } from 'civkit/async-service'; +import { GlobalLogger } from '../logger'; +import { JSDomControl } from '../jsdom'; +import { isMainThread } from 'worker_threads'; +import _ from 'lodash'; +import { WebSearchEntry } from './compat'; +import { ScrappingOptions, SERPSpecializedPuppeteerControl } from './puppeteer'; +import { CurlControl } from '../curl'; +import { readFile } from 'fs/promises'; +import { ApplicationError } from 'civkit/civ-rpc'; +import { ServiceBadApproachError, ServiceBadAttemptError } from '../errors'; +import { parseJSONText } from 'civkit/vectorize'; +import { retryWith } from 'civkit/decorators'; +import { ProxyProvider } from '../../shared/services/proxy-provider'; + +@singleton() +export class GoogleSERP extends AsyncService { + + googleDomain = process.env.OVERRIDE_GOOGLE_DOMAIN || 'www.google.com'; + + constructor( + protected globalLogger: GlobalLogger, + protected puppeteerControl: SERPSpecializedPuppeteerControl, + protected jsDomControl: JSDomControl, + protected curlControl: CurlControl, + protected proxyProvider: ProxyProvider, + ) { + const filteredDeps = isMainThread ? arguments : _.without(arguments, puppeteerControl); + super(...filteredDeps); + } + + override async init() { + await this.dependencyReady(); + + this.emit('ready'); + } + + retryDet = new WeakSet(); + @retryWith((err) => { + if (err instanceof ServiceBadApproachError) { + return false; + } + if (err instanceof ServiceBadAttemptError) { + // Keep trying + return true; + } + if (err instanceof ApplicationError) { + // Quit with this error + return false; + } + return undefined; + }, 3) + async sideLoadWithAllocatedProxy(url: URL, opts?: ScrappingOptions) { + if (opts?.allocProxy === 'none') { + return this.curlControl.sideLoad(url, opts); + } + + const proxy = await this.proxyProvider.alloc( + process.env.PREFERRED_PROXY_COUNTRY || 'auto' + ); + if (opts) { + if (this.retryDet.has(opts) && proxy.protocol === 'socks5h:') { + proxy.protocol = 'socks5:'; + } + this.retryDet.add(opts); + } + const r = await this.curlControl.sideLoad(url, { + ...opts, + proxyUrl: proxy.href, + }); + + if (r.status === 429) { + throw new ServiceBadAttemptError('Google returned a 429 error. This may happen due to various reasons, including rate limiting or other issues.'); + } + + if (opts && opts.allocProxy) { + opts.proxyUrl ??= proxy.href; + } + + return { ...r, proxy }; + } + + digestQuery(query: { [k: string]: any; }) { + const url = new URL(`https://${this.googleDomain}/search`); + const clone = { ...query }; + const num = clone.num || 10; + if (clone.page) { + const page = parseInt(clone.page); + delete clone.page; + clone.start = (page - 1) * num; + if (clone.start === 0) { + delete clone.start; + } + } + if (clone.location) { + delete clone.location; + } + + for (const [k, v] of Object.entries(clone)) { + if (v === undefined || v === null) { + continue; + } + url.searchParams.set(k, `${v}`); + } + + return url; + } + + async webSearch(query: { [k: string]: any; }, opts?: ScrappingOptions) { + const url = this.digestQuery(query); + + const sideLoaded = await this.sideLoadWithAllocatedProxy(url, opts); + if (opts && sideLoaded.sideLoadOpts) { + opts.sideLoad = sideLoaded.sideLoadOpts; + } + + const snapshot = await this.puppeteerControl.controlledScrap(url, getWebSearchResults, opts); + + return snapshot; + } + + async newsSearch(query: { [k: string]: any; }, opts?: ScrappingOptions) { + const url = this.digestQuery(query); + + url.searchParams.set('tbm', 'nws'); + + const sideLoaded = await this.sideLoadWithAllocatedProxy(url, opts); + if (opts && sideLoaded.sideLoadOpts) { + opts.sideLoad = sideLoaded.sideLoadOpts; + } + + const snapshot = await this.puppeteerControl.controlledScrap(url, getNewsSearchResults, opts); + + return snapshot; + } + + async imageSearch(query: { [k: string]: any; }, opts?: ScrappingOptions) { + const url = this.digestQuery(query); + + url.searchParams.set('tbm', 'isch'); + url.searchParams.set('asearch', 'isch'); + url.searchParams.set('async', `_fmt:json,p:1,ijn:${query.start ? Math.floor(query.start / (query.num || 10)) : 0}`); + + const sideLoaded = await this.sideLoadWithAllocatedProxy(url, opts); + + if (sideLoaded.status !== 200 || !sideLoaded.file) { + throw new ServiceBadAttemptError('Google returned an error page. This may happen due to various reasons, including rate limiting or other issues.'); + } + + const jsonTxt = (await readFile((await sideLoaded.file.filePath))).toString(); + const rJSON = parseJSONText(jsonTxt.slice(jsonTxt.indexOf('{"ischj":'))); + + return _.get(rJSON, 'ischj.metadata').map((x: any) => { + + return { + link: _.get(x, 'result.referrer_url'), + title: _.get(x, 'result.page_title'), + snippet: _.get(x, 'text_in_grid.snippet'), + source: _.get(x, 'result.site_title'), + imageWidth: _.get(x, 'original_image.width'), + imageHeight: _.get(x, 'original_image.height'), + imageUrl: _.get(x, 'original_image.url'), + variant: 'images', + }; + }) as WebSearchEntry[]; + } +} + +async function getWebSearchResults() { + if (location.pathname.startsWith('/sorry') || location.pathname.startsWith('/error')) { + throw new Error('Google returned an error page. This may happen due to various reasons, including rate limiting or other issues.'); + } + + // @ts-ignore + await Promise.race([window.waitForSelector('div[data-async-context^="query"]'), window.waitForSelector('#botstuff .mnr-c')]); + + const wrapper1 = document.querySelector('div[data-async-context^="query"]'); + + if (!wrapper1) { + return undefined; + } + + const query = decodeURIComponent(wrapper1.getAttribute('data-async-context')?.split('query:')[1] || ''); + + if (!query) { + return undefined; + } + + const candidates = Array.from(wrapper1.querySelectorAll('div[lang],div[data-surl]')); + + return candidates.map((x, pos) => { + const primaryLink = x.querySelector('a:not([href="#"])'); + if (!primaryLink) { + return undefined; + } + const url = primaryLink.getAttribute('href'); + + if (primaryLink.querySelector('div[role="heading"]')) { + // const spans = primaryLink.querySelectorAll('span'); + // const title = spans[0]?.textContent; + // const source = spans[1]?.textContent; + // const date = spans[spans.length - 1].textContent; + + // return { + // link: url, + // title, + // source, + // date, + // variant: 'video' + // }; + return undefined; + } + + const title = primaryLink.querySelector('h3')?.textContent; + const source = Array.from(primaryLink.querySelectorAll('span')).find((x) => x.textContent)?.textContent; + const cite = primaryLink.querySelector('cite[role=text]')?.textContent; + let date = cite?.split('·')[1]?.trim(); + const snippets = Array.from(x.querySelectorAll('div[data-sncf*="1"] span')); + let snippet = snippets[snippets.length - 1]?.textContent; + if (!snippet) { + snippet = x.querySelector('div.IsZvec')?.textContent?.trim() || null; + } + date ??= snippets[snippets.length - 2]?.textContent?.trim(); + const imageUrl = x.querySelector('div[data-sncf*="1"] img[src]:not(img[src^="data"])')?.getAttribute('src'); + let siteLinks = Array.from(x.querySelectorAll('div[data-sncf*="3"] a[href]')).map((l) => { + return { + link: l.getAttribute('href'), + title: l.textContent, + }; + }); + const perhapsParent = x.parentElement?.closest('div[data-hveid]'); + if (!siteLinks?.length && perhapsParent) { + const candidates = Array.from(perhapsParent.querySelectorAll('td h3')); + if (candidates.length) { + siteLinks = candidates.map((l) => { + const link = l.querySelector('a'); + if (!link) { + return undefined; + } + const snippet = l.nextElementSibling?.textContent; + return { + link: link.getAttribute('href'), + title: link.textContent, + snippet, + }; + }).filter(Boolean) as any; + } + } + + return { + link: url, + title, + source, + date, + snippet: snippet ?? undefined, + imageUrl: imageUrl?.startsWith('data:') ? undefined : imageUrl, + siteLinks: siteLinks.length ? siteLinks : undefined, + variant: 'web', + }; + }).filter(Boolean) as WebSearchEntry[]; +} + +async function getNewsSearchResults() { + if (location.pathname.startsWith('/sorry') || location.pathname.startsWith('/error')) { + throw new Error('Google returned an error page. This may happen due to various reasons, including rate limiting or other issues.'); + } + + // @ts-ignore + await Promise.race([window.waitForSelector('div[data-async-context^="query"]'), window.waitForSelector('#botstuff .mnr-c')]); + + const wrapper1 = document.querySelector('div[data-async-context^="query"]'); + + if (!wrapper1) { + return undefined; + } + + const query = decodeURIComponent(wrapper1.getAttribute('data-async-context')?.split('query:')[1] || ''); + + if (!query) { + return undefined; + } + + const candidates = Array.from(wrapper1.querySelectorAll('div[data-news-doc-id]')); + + return candidates.map((x) => { + const primaryLink = x.querySelector('a:not([href="#"])'); + if (!primaryLink) { + return undefined; + } + const url = primaryLink.getAttribute('href'); + const titleElem = primaryLink.querySelector('div[role="heading"]'); + + if (!titleElem) { + return undefined; + } + + const title = titleElem.textContent?.trim(); + const source = titleElem.previousElementSibling?.textContent?.trim(); + const snippet = titleElem.nextElementSibling?.textContent?.trim(); + + const innerSpans = Array.from(titleElem.parentElement?.querySelectorAll('span') || []); + const date = innerSpans[innerSpans.length - 1]?.textContent?.trim(); + + return { + link: url, + title, + source, + date, + snippet, + variant: 'news', + }; + }).filter(Boolean) as WebSearchEntry[]; +} \ No newline at end of file diff --git a/src/services/serp/puppeteer.ts b/src/services/serp/puppeteer.ts new file mode 100644 index 0000000..5157879 --- /dev/null +++ b/src/services/serp/puppeteer.ts @@ -0,0 +1,692 @@ +import _ from 'lodash'; +import { readFile } from 'fs/promises'; +import { container, singleton } from 'tsyringe'; + +import type { Browser, CookieParam, GoToOptions, Page, Viewport } from 'puppeteer'; +import type { Cookie } from 'set-cookie-parser'; +import puppeteer, { TimeoutError } from 'puppeteer'; + +import { Defer } from 'civkit/defer'; +import { AssertionFailureError, ParamValidationError } from 'civkit/civ-rpc'; +import { AsyncService } from 'civkit/async-service'; +import { FancyFile } from 'civkit/fancy-file'; +import { delay } from 'civkit/timeout'; + +import { SecurityCompromiseError, ServiceCrashedError, ServiceNodeResourceDrainError } from '../../shared/lib/errors'; +import { CurlControl } from '../curl'; +import { AsyncLocalContext } from '../async-context'; +import { GlobalLogger } from '../logger'; +import { minimalStealth } from '../minimal-stealth'; +import { BlackHoleDetector } from '../blackhole-detector'; + + +export interface ScrappingOptions { + proxyUrl?: string; + cookies?: Cookie[]; + overrideUserAgent?: string; + timeoutMs?: number; + locale?: string; + referer?: string; + extraHeaders?: Record; + viewport?: Viewport; + proxyResources?: boolean; + allocProxy?: string; + + sideLoad?: { + impersonate: { + [url: string]: { + status: number; + headers: { [k: string]: string | string[]; }; + contentType?: string; + body?: FancyFile; + }; + }; + proxyOrigin: { [origin: string]: string; }; + }; + +} + +const SIMULATE_SCROLL = ` +(function () { + function createIntersectionObserverEntry(target, isIntersecting, timestamp) { + const targetRect = target.getBoundingClientRect(); + const record = { + target, + isIntersecting, + time: timestamp, + // If intersecting, intersectionRect matches boundingClientRect + // If not intersecting, intersectionRect is empty (0x0) + intersectionRect: isIntersecting + ? targetRect + : new DOMRectReadOnly(0, 0, 0, 0), + // Current bounding client rect of the target + boundingClientRect: targetRect, + // Intersection ratio is either 0 (not intersecting) or 1 (fully intersecting) + intersectionRatio: isIntersecting ? 1 : 0, + // Root bounds (viewport in our case) + rootBounds: new DOMRectReadOnly( + 0, + 0, + window.innerWidth, + window.innerHeight + ) + }; + Object.setPrototypeOf(record, window.IntersectionObserverEntry.prototype); + return record; + } + function cloneIntersectionObserverEntry(entry) { + const record = { + target: entry.target, + isIntersecting: entry.isIntersecting, + time: entry.time, + intersectionRect: entry.intersectionRect, + boundingClientRect: entry.boundingClientRect, + intersectionRatio: entry.intersectionRatio, + rootBounds: entry.rootBounds + }; + Object.setPrototypeOf(record, window.IntersectionObserverEntry.prototype); + return record; + } + const orig = window.IntersectionObserver; + const kCallback = Symbol('callback'); + const kLastEntryMap = Symbol('lastEntryMap'); + const liveObservers = new Map(); + class MangledIntersectionObserver extends orig { + constructor(callback, options) { + super((entries, observer) => { + const lastEntryMap = observer[kLastEntryMap]; + const lastEntry = entries[entries.length - 1]; + lastEntryMap.set(lastEntry.target, lastEntry); + return callback(entries, observer); + }, options); + this[kCallback] = callback; + this[kLastEntryMap] = new WeakMap(); + liveObservers.set(this, new Set()); + } + disconnect() { + liveObservers.get(this)?.clear(); + liveObservers.delete(this); + return super.disconnect(); + } + observe(target) { + const observer = liveObservers.get(this); + observer?.add(target); + return super.observe(target); + } + unobserve(target) { + const observer = liveObservers.get(this); + observer?.delete(target); + return super.unobserve(target); + } + } + Object.defineProperty(MangledIntersectionObserver, 'name', { value: 'IntersectionObserver', writable: false }); + window.IntersectionObserver = MangledIntersectionObserver; + function simulateScroll() { + for (const [observer, targets] of liveObservers.entries()) { + const t0 = performance.now(); + for (const target of targets) { + const entry = createIntersectionObserverEntry(target, true, t0); + observer[kCallback]([entry], observer); + setTimeout(() => { + const t1 = performance.now(); + const lastEntry = observer[kLastEntryMap].get(target); + if (!lastEntry) { + return; + } + const entry2 = { ...cloneIntersectionObserverEntry(lastEntry), time: t1 }; + observer[kCallback]([entry2], observer); + }); + } + } + } + window.simulateScroll = simulateScroll; +})(); +`; + +const MUTATION_IDLE_WATCH = ` +(function () { + let timeout; + const sendMsg = ()=> { + document.dispatchEvent(new CustomEvent('mutationIdle')); + }; + + const cb = () => { + if (timeout) { + clearTimeout(timeout); + timeout = setTimeout(sendMsg, 200); + } + }; + const mutationObserver = new MutationObserver(cb); + + document.addEventListener('DOMContentLoaded', () => { + mutationObserver.observe(document.documentElement, { + childList: true, + subtree: true, + }); + timeout = setTimeout(sendMsg, 200); + }, { once: true }) +})(); +`; + +const SCRIPT_TO_INJECT_INTO_FRAME = ` +${SIMULATE_SCROLL} +${MUTATION_IDLE_WATCH} +(${minimalStealth.toString()})(); + +(function(){ + +let lastMutationIdle = 0; +let initialAnalytics; +document.addEventListener('mutationIdle', ()=> lastMutationIdle = Date.now()); + +function waitForSelector(selectorText) { + return new Promise((resolve) => { + const existing = document.querySelector(selectorText); + if (existing) { + resolve(existing); + return; + } + if (document.readyState === 'loading') { + document.addEventListener('DOMContentLoaded', () => { + const observer = new MutationObserver(() => { + const elem = document.querySelector(selectorText); + if (elem) { + resolve(document.querySelector(selectorText)); + observer.disconnect(); + } + }); + observer.observe(document.documentElement, { + childList: true, + subtree: true + }); + }); + return; + } + const observer = new MutationObserver(() => { + const elem = document.querySelector(selectorText); + if (elem) { + resolve(document.querySelector(selectorText)); + observer.disconnect(); + } + }); + observer.observe(document.documentElement, { + childList: true, + subtree: true + }); + }); +} +window.waitForSelector = waitForSelector; +})(); +`; + +@singleton() +export class SERPSpecializedPuppeteerControl extends AsyncService { + + _sn = 0; + browser!: Browser; + logger = this.globalLogger.child({ service: this.constructor.name }); + + __loadedPage: Page[] = []; + + finalizerMap = new WeakMap>(); + snMap = new WeakMap(); + livePages = new Set(); + lastPageCratedAt: number = 0; + ua: string = ''; + + protected _REPORT_FUNCTION_NAME = 'bingo'; + + lifeCycleTrack = new WeakMap(); + + constructor( + protected globalLogger: GlobalLogger, + protected asyncLocalContext: AsyncLocalContext, + protected curlControl: CurlControl, + protected blackHoleDetector: BlackHoleDetector, + ) { + super(...arguments); + this.setMaxListeners(Infinity); + + let crippledTimes = 0; + this.on('crippled', () => { + crippledTimes += 1; + this.__loadedPage.length = 0; + this.livePages.clear(); + if (crippledTimes > 5) { + process.nextTick(() => { + this.emit('error', new Error('Browser crashed too many times, quitting...')); + // process.exit(1); + }); + } + }); + } + + override async init() { + await this.dependencyReady(); + if (process.env.NODE_ENV?.includes('dry-run')) { + this.emit('ready'); + return; + } + + if (this.browser) { + if (this.browser.connected) { + await this.browser.close(); + } else { + this.browser.process()?.kill('SIGKILL'); + } + } + this.browser = await puppeteer.launch({ + timeout: 10_000, + headless: !Boolean(process.env.DEBUG_BROWSER), + executablePath: process.env.OVERRIDE_CHROME_EXECUTABLE_PATH, + args: [ + '--disable-dev-shm-usage', '--disable-blink-features=AutomationControlled' + ] + }).catch((err: any) => { + this.logger.error(`Unknown firebase issue, just die fast.`, { err }); + process.nextTick(() => { + this.emit('error', err); + // process.exit(1); + }); + return Promise.reject(err); + }); + this.browser.once('disconnected', () => { + this.logger.warn(`Browser disconnected`); + if (this.browser) { + this.emit('crippled'); + } + process.nextTick(() => this.serviceReady()); + }); + this.ua = await this.browser.userAgent(); + this.logger.info(`Browser launched: ${this.browser.process()?.pid}, ${this.ua}`); + this.curlControl.impersonateChrome(this.ua.replace(/Headless/i, '')); + + await this.newPage('beware_deadlock').then((r) => this.__loadedPage.push(r)); + + this.emit('ready'); + } + + async newPage(bewareDeadLock: any = false) { + if (!bewareDeadLock) { + await this.serviceReady(); + } + const sn = this._sn++; + let page; + try { + const dedicatedContext = await this.browser.createBrowserContext(); + page = await dedicatedContext.newPage(); + } catch (err: any) { + this.logger.warn(`Failed to create page ${sn}`, { err }); + this.browser.process()?.kill('SIGKILL'); + throw new ServiceNodeResourceDrainError(`This specific worker node failed to open a new page, try again.`); + } + const preparations = []; + + preparations.push(page.setUserAgent(this.ua.replace(/Headless/i, ''))); + // preparations.push(page.setUserAgent(`Slackbot-LinkExpanding 1.0 (+https://api.slack.com/robots)`)); + // preparations.push(page.setUserAgent(`Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; GPTBot/1.0; +https://openai.com/gptbot)`)); + preparations.push(page.setBypassCSP(true)); + preparations.push(page.setViewport({ width: 1024, height: 1024 })); + preparations.push(page.exposeFunction(this._REPORT_FUNCTION_NAME, (thing: T) => { + page.emit(this._REPORT_FUNCTION_NAME, thing); + })); + preparations.push(page.exposeFunction('setViewport', (viewport: Viewport | null) => { + page.setViewport(viewport).catch(() => undefined); + })); + preparations.push(page.evaluateOnNewDocument(SCRIPT_TO_INJECT_INTO_FRAME)); + + await Promise.all(preparations); + + this.snMap.set(page, sn); + this.logger.debug(`Page ${sn} created.`); + this.lastPageCratedAt = Date.now(); + this.livePages.add(page); + + return page; + } + + async getNextPage() { + let thePage: Page | undefined; + if (this.__loadedPage.length) { + thePage = this.__loadedPage.shift(); + if (this.__loadedPage.length <= 1) { + process.nextTick(() => { + this.newPage() + .then((r) => this.__loadedPage.push(r)) + .catch((err) => { + this.logger.warn(`Failed to load new page ahead of time`, { err }); + }); + }); + } + } + + if (!thePage) { + thePage = await this.newPage(); + } + + const timer = setTimeout(() => { + this.logger.warn(`Page is not allowed to live past 5 minutes, ditching page ${this.snMap.get(thePage!)}...`); + this.ditchPage(thePage!); + }, 300 * 1000); + + this.finalizerMap.set(thePage, timer); + + return thePage; + } + + async ditchPage(page: Page) { + if (this.finalizerMap.has(page)) { + clearTimeout(this.finalizerMap.get(page)!); + this.finalizerMap.delete(page); + } + if (page.isClosed()) { + return; + } + const sn = this.snMap.get(page); + this.logger.debug(`Closing page ${sn}`); + await Promise.race([ + (async () => { + const ctx = page.browserContext(); + try { + await page.close(); + } finally { + await ctx.close(); + } + })(), + delay(5000) + ]).catch((err) => { + this.logger.error(`Failed to destroy page ${sn}`, { err }); + }); + this.livePages.delete(page); + } + + async controlledScrap(parsedUrl: URL, func: (this: void) => Promise, options: ScrappingOptions = {}): Promise { + // parsedUrl.search = ''; + const url = parsedUrl.toString(); + const page = await this.getNextPage(); + this.lifeCycleTrack.set(page, this.asyncLocalContext.ctx); + page.on('response', (_resp) => { + this.blackHoleDetector.itWorked(); + }); + page.on('request', async (req) => { + if (req.isInterceptResolutionHandled()) { + return; + }; + const reqUrlParsed = new URL(req.url()); + if (!reqUrlParsed.protocol.startsWith('http')) { + const overrides = req.continueRequestOverrides(); + + return req.continue(overrides, 0); + } + const typ = req.resourceType(); + if (typ === 'media' || typ === 'font' || typ === 'image' || typ === 'stylesheet') { + // Non-cooperative answer to block all media requests. + return req.abort('blockedbyclient'); + } + if (!options.proxyResources) { + const isDocRequest = ['document', 'xhr', 'fetch', 'websocket', 'prefetch', 'eventsource', 'ping'].includes(typ); + if (!isDocRequest) { + if (options.extraHeaders) { + const overrides = req.continueRequestOverrides(); + const continueArgs = [{ + ...overrides, + headers: { + ...req.headers(), + ...overrides?.headers, + ...options.extraHeaders, + } + }, 1] as const; + + return req.continue(continueArgs[0], continueArgs[1]); + } + const overrides = req.continueRequestOverrides(); + + return req.continue(overrides, 0); + } + } + const sideload = options.sideLoad; + + const impersonate = sideload?.impersonate[reqUrlParsed.href]; + if (impersonate) { + let body; + if (impersonate.body) { + body = await readFile(await impersonate.body.filePath); + if (req.isInterceptResolutionHandled()) { + return; + } + } + return req.respond({ + status: impersonate.status, + headers: impersonate.headers, + contentType: impersonate.contentType, + body: body ? Uint8Array.from(body) : undefined, + }, 999); + } + + const proxy = options.proxyUrl || sideload?.proxyOrigin?.[reqUrlParsed.origin]; + const ctx = this.lifeCycleTrack.get(page); + if (proxy && ctx) { + return await this.asyncLocalContext.bridge(ctx, async () => { + try { + const curled = await this.curlControl.sideLoad(reqUrlParsed, { + ...options, + method: req.method(), + body: req.postData(), + extraHeaders: { + ...req.headers(), + ...options.extraHeaders, + }, + proxyUrl: proxy + }); + if (req.isInterceptResolutionHandled()) { + return; + }; + + if (curled.chain.length === 1) { + if (!curled.file) { + return req.respond({ + status: curled.status, + headers: _.omit(curled.headers, 'result'), + contentType: curled.contentType, + }, 3); + } + const body = await readFile(await curled.file.filePath); + if (req.isInterceptResolutionHandled()) { + return; + }; + return req.respond({ + status: curled.status, + headers: _.omit(curled.headers, 'result'), + contentType: curled.contentType, + body: Uint8Array.from(body), + }, 3); + } + options.sideLoad ??= curled.sideLoadOpts; + _.merge(options.sideLoad, curled.sideLoadOpts); + const firstReq = curled.chain[0]; + + return req.respond({ + status: firstReq.result!.code, + headers: _.omit(firstReq, 'result'), + }, 3); + } catch (err: any) { + this.logger.warn(`Failed to sideload browser request ${reqUrlParsed.origin}`, { href: reqUrlParsed.href, err, proxy }); + } + if (req.isInterceptResolutionHandled()) { + return; + }; + const overrides = req.continueRequestOverrides(); + const continueArgs = [{ + ...overrides, + headers: { + ...req.headers(), + ...overrides?.headers, + ...options.extraHeaders, + } + }, 1] as const; + + return req.continue(continueArgs[0], continueArgs[1]); + }); + } + + if (req.isInterceptResolutionHandled()) { + return; + }; + const overrides = req.continueRequestOverrides(); + const continueArgs = [{ + ...overrides, + headers: { + ...req.headers(), + ...overrides?.headers, + ...options.extraHeaders, + } + }, 1] as const; + + return req.continue(continueArgs[0], continueArgs[1]); + }); + await page.setRequestInterception(true); + + const sn = this.snMap.get(page); + this.logger.info(`Page ${sn}: Scraping ${url}`, { url }); + + await page.evaluateOnNewDocument(`(function () { +if (window.top !== window.self) { + return; +} +const func = ${func.toString()}; + +func().then((result) => { + window.${this._REPORT_FUNCTION_NAME}({data: result}); +}).catch((err) => { + window.${this._REPORT_FUNCTION_NAME}({err: err}); +}); + +})();`); + + if (options.locale) { + // Add headers via request interception to walk around this bug + // https://github.com/puppeteer/puppeteer/issues/10235 + // await page.setExtraHTTPHeaders({ + // 'Accept-Language': options.locale + // }); + + await page.evaluateOnNewDocument(() => { + Object.defineProperty(navigator, "language", { + get: function () { + return options.locale; + } + }); + Object.defineProperty(navigator, "languages", { + get: function () { + return [options.locale]; + } + }); + }); + } + + if (options.cookies) { + const mapped = options.cookies.map((x) => { + const draft: CookieParam = { + name: x.name, + value: encodeURIComponent(x.value), + secure: x.secure, + domain: x.domain, + path: x.path, + expires: x.expires ? Math.floor(x.expires.valueOf() / 1000) : undefined, + sameSite: x.sameSite as any, + }; + if (!draft.expires && x.maxAge) { + draft.expires = Math.floor(Date.now() / 1000) + x.maxAge; + } + if (!draft.domain) { + draft.url = parsedUrl.toString(); + } + + return draft; + }); + try { + await page.setCookie(...mapped); + } catch (err: any) { + this.logger.warn(`Page ${sn}: Failed to set cookies`, { err }); + throw new ParamValidationError({ + path: 'cookies', + message: `Failed to set cookies: ${err?.message}` + }); + } + } + if (options.overrideUserAgent) { + await page.setUserAgent(options.overrideUserAgent); + } + if (options.viewport) { + await page.setViewport(options.viewport); + } + + const resultDeferred = Defer(); + const crippleListener = () => resultDeferred.reject(new ServiceCrashedError({ message: `Browser crashed, try again` })); + this.once('crippled', crippleListener); + resultDeferred.promise.finally(() => { + this.off('crippled', crippleListener); + }); + const hdl = (s: { + err?: any; + data?: T; + }) => { + if (s.err) { + resultDeferred.reject(s.err); + } + resultDeferred.resolve(s.data); + }; + page.on(this._REPORT_FUNCTION_NAME, hdl as any); + page.once('abuse', (event: any) => { + this.emit('abuse', { ...event, url: parsedUrl }); + + resultDeferred.reject( + new SecurityCompromiseError(`Abuse detected: ${event.reason}`) + ); + }); + + const timeout = options.timeoutMs || 30_000; + const goToOptions: GoToOptions = { + waitUntil: ['load', 'domcontentloaded', 'networkidle0'], + timeout, + }; + + if (options.referer) { + goToOptions.referer = options.referer; + } + + + const gotoPromise = page.goto(url, goToOptions) + .catch((err) => { + if (err instanceof TimeoutError) { + this.logger.warn(`Page ${sn}: Browsing of ${url} timed out`, { err }); + return new AssertionFailureError({ + message: `Failed to goto ${url}: ${err}`, + cause: err, + }); + } + + this.logger.warn(`Page ${sn}: Browsing of ${url} aborted`, { err }); + return undefined; + }).then(async (r) => { + await delay(5000); + resultDeferred.reject(new TimeoutError(`Control function did not respond in time`)); + return r; + }); + + try { + await Promise.race([resultDeferred.promise, gotoPromise]); + + return resultDeferred.promise; + } finally { + page.off(this._REPORT_FUNCTION_NAME, hdl as any); + this.ditchPage(page); + resultDeferred.resolve(); + } + } + +} + +const puppeteerControl = container.resolve(SERPSpecializedPuppeteerControl); + +export default puppeteerControl; diff --git a/src/services/serp/serper.ts b/src/services/serp/serper.ts new file mode 100644 index 0000000..51317ea --- /dev/null +++ b/src/services/serp/serper.ts @@ -0,0 +1,165 @@ + +import { singleton } from 'tsyringe'; +import { GlobalLogger } from '../logger'; +import { SecretExposer } from '../../shared/services/secrets'; +import { AsyncLocalContext } from '../async-context'; +import { SerperBingHTTP, SerperGoogleHTTP, SerperImageSearchResponse, SerperNewsSearchResponse, SerperSearchQueryParams, SerperWebSearchResponse } from '../../shared/3rd-party/serper-search'; +import { BlackHoleDetector } from '../blackhole-detector'; +import { Context } from '../registry'; +import { AsyncService } from 'civkit/async-service'; +import { AutoCastable, Prop, RPC_CALL_ENVIRONMENT } from 'civkit/civ-rpc'; + +@singleton() +export class SerperGoogleSearchService extends AsyncService { + + logger = this.globalLogger.child({ service: this.constructor.name }); + + client!: SerperGoogleHTTP; + + constructor( + protected globalLogger: GlobalLogger, + protected secretExposer: SecretExposer, + protected threadLocal: AsyncLocalContext, + protected blackHoleDetector: BlackHoleDetector, + ) { + super(...arguments); + } + + override async init() { + await this.dependencyReady(); + this.emit('ready'); + + this.client = new SerperGoogleHTTP(this.secretExposer.SERPER_SEARCH_API_KEY); + } + + + doSearch(variant: 'web', query: SerperSearchQueryParams): Promise; + doSearch(variant: 'images', query: SerperSearchQueryParams): Promise; + doSearch(variant: 'news', query: SerperSearchQueryParams): Promise; + async doSearch(variant: 'web' | 'images' | 'news', query: SerperSearchQueryParams) { + this.logger.debug(`Doing external search`, query); + let results; + switch (variant) { + case 'images': { + const r = await this.client.imageSearch(query); + + results = r.parsed.images; + break; + } + case 'news': { + const r = await this.client.newsSearch(query); + + results = r.parsed.news; + break; + } + case 'web': + default: { + const r = await this.client.webSearch(query); + + results = r.parsed.organic; + break; + } + } + + this.blackHoleDetector.itWorked(); + + return results; + } + + + async webSearch(query: SerperSearchQueryParams) { + return this.doSearch('web', query); + } + async imageSearch(query: SerperSearchQueryParams) { + return this.doSearch('images', query); + } + async newsSearch(query: SerperSearchQueryParams) { + return this.doSearch('news', query); + } + +} + +@singleton() +export class SerperBingSearchService extends SerperGoogleSearchService { + override client!: SerperBingHTTP; + + override async init() { + await this.dependencyReady(); + this.emit('ready'); + + this.client = new SerperBingHTTP(this.secretExposer.SERPER_SEARCH_API_KEY); + } +} + +export class GoogleSearchExplicitOperatorsDto extends AutoCastable { + @Prop({ + arrayOf: String, + desc: `Returns web pages with a specific file extension. Example: to find the Honda GX120 Owner’s manual in PDF, type “Honda GX120 ownners manual ext:pdf”.` + }) + ext?: string | string[]; + + @Prop({ + arrayOf: String, + desc: `Returns web pages created in the specified file type. Example: to find a web page created in PDF format about the evaluation of age-related cognitive changes, type “evaluation of age cognitive changes filetype:pdf”.` + }) + filetype?: string | string[]; + + @Prop({ + arrayOf: String, + desc: `Returns webpages containing the specified term in the title of the page. Example: to find pages about SEO conferences making sure the results contain 2023 in the title, type “seo conference intitle:2023”.` + }) + intitle?: string | string[]; + + @Prop({ + arrayOf: String, + desc: `Returns web pages written in the specified language. The language code must be in the ISO 639-1 two-letter code format. Example: to find information on visas only in Spanish, type “visas lang:es”.` + }) + loc?: string | string[]; + + @Prop({ + arrayOf: String, + desc: `Returns web pages coming only from a specific web site. Example: to find information about Goggles only on Brave pages, type “goggles site:brave.com”.` + }) + site?: string | string[]; + + addTo(searchTerm: string) { + const chunks = []; + for (const [key, value] of Object.entries(this)) { + if (value) { + const values = Array.isArray(value) ? value : [value]; + const textValue = values.map((v) => `${key}:${v}`).join(' OR '); + if (textValue) { + chunks.push(textValue); + } + } + } + const opPart = chunks.length > 1 ? chunks.map((x) => `(${x})`).join(' AND ') : chunks; + + if (opPart.length) { + return [searchTerm, opPart].join(' '); + } + + return searchTerm; + } + + static override from(input: any) { + const instance = super.from(input) as GoogleSearchExplicitOperatorsDto; + const ctx = Reflect.get(input, RPC_CALL_ENVIRONMENT) as Context | undefined; + + const params = ['ext', 'filetype', 'intitle', 'loc', 'site']; + + for (const p of params) { + const customValue = ctx?.get(`x-${p}`) || ctx?.get(`${p}`); + if (!customValue) { + continue; + } + + const filtered = customValue.split(', ').filter(Boolean); + if (filtered.length) { + Reflect.set(instance, p, filtered); + } + } + + return instance; + } +} diff --git a/src/stand-alone/serp.ts b/src/stand-alone/serp.ts new file mode 100644 index 0000000..557b4e6 --- /dev/null +++ b/src/stand-alone/serp.ts @@ -0,0 +1,160 @@ +import 'reflect-metadata'; +import { container, singleton } from 'tsyringe'; + +import { KoaServer } from 'civkit/civ-rpc/koa'; +import http2 from 'http2'; +import http from 'http'; +import { FsWalk, WalkOutEntity } from 'civkit/fswalk'; +import path from 'path'; +import fs from 'fs'; +import { mimeOfExt } from 'civkit/mime'; +import { Context, Next } from 'koa'; +import { RPCRegistry } from '../services/registry'; +import { AsyncResource } from 'async_hooks'; +import { runOnce } from 'civkit/decorators'; +import { randomUUID } from 'crypto'; +import { ThreadedServiceRegistry } from '../services/threaded'; +import { GlobalLogger } from '../services/logger'; +import { AsyncLocalContext } from '../services/async-context'; +import finalizer, { Finalizer } from '../services/finalizer'; +import { SerpHost } from '../api/serp'; + +@singleton() +export class SERPStandAloneServer extends KoaServer { + logger = this.globalLogger.child({ service: this.constructor.name }); + + httpAlternativeServer?: typeof this['httpServer']; + assets = new Map(); + + constructor( + protected globalLogger: GlobalLogger, + protected registry: RPCRegistry, + protected serpHost: SerpHost, + protected threadLocal: AsyncLocalContext, + protected threads: ThreadedServiceRegistry, + ) { + super(...arguments); + } + + h2c() { + this.httpAlternativeServer = this.httpServer; + const fn = this.koaApp.callback(); + this.httpServer = http2.createServer((req, res) => { + const ar = new AsyncResource('HTTP2ServerRequest'); + ar.runInAsyncScope(fn, this.koaApp, req, res); + }); + // useResourceBasedDefaultTracker(); + + return this; + } + + override async init() { + await this.walkForAssets(); + await this.dependencyReady(); + + for (const [k, v] of this.registry.conf.entries()) { + if (v.tags?.includes('crawl')) { + this.registry.conf.delete(k); + } + } + + await super.init(); + } + + async walkForAssets() { + const files = await FsWalk.walkOut(path.resolve(__dirname, '..', '..', 'public')); + + for (const file of files) { + if (file.type !== 'file') { + continue; + } + this.assets.set(file.relativePath.toString(), file); + } + } + + override listen(port: number) { + const r = super.listen(port); + if (this.httpAlternativeServer) { + const altPort = port + 1; + this.httpAlternativeServer.listen(altPort, () => { + this.logger.info(`Alternative ${this.httpAlternativeServer!.constructor.name} listening on port ${altPort}`); + }); + } + + return r; + } + + makeAssetsServingController() { + return (ctx: Context, next: Next) => { + const requestPath = ctx.path; + const file = requestPath.slice(1); + if (!file) { + return next(); + } + + const asset = this.assets.get(file); + if (asset?.type !== 'file') { + return next(); + } + + ctx.body = fs.createReadStream(asset.path); + ctx.type = mimeOfExt(path.extname(asset.path.toString())) || 'application/octet-stream'; + ctx.set('Content-Length', asset.stats.size.toString()); + + return; + }; + } + + registerRoutes(): void { + this.koaApp.use(this.makeAssetsServingController()); + this.koaApp.use(this.registry.makeShimController()); + } + + + // Using h2c server has an implication that multiple requests may share the same connection and x-cloud-trace-context + // TraceId is expected to be request-bound and unique. So these two has to be distinguished. + @runOnce() + override insertAsyncHookMiddleware() { + const asyncHookMiddleware = async (ctx: Context, next: () => Promise) => { + const googleTraceId = ctx.get('x-cloud-trace-context').split('/')?.[0]; + this.threadLocal.setup({ + traceId: randomUUID(), + traceT0: new Date(), + googleTraceId, + }); + + return next(); + }; + + this.koaApp.use(asyncHookMiddleware); + } + + @Finalizer() + override async standDown() { + const tasks: Promise[] = []; + if (this.httpAlternativeServer?.listening) { + (this.httpAlternativeServer as http.Server).closeIdleConnections?.(); + this.httpAlternativeServer.close(); + tasks.push(new Promise((resolve, reject) => { + this.httpAlternativeServer!.close((err) => { + if (err) { + return reject(err); + } + resolve(); + }); + })); + } + tasks.push(super.standDown()); + await Promise.all(tasks); + } + +} +const instance = container.resolve(SERPStandAloneServer); + +export default instance; + +if (process.env.NODE_ENV?.includes('dry-run')) { + instance.serviceReady().then(() => finalizer.terminate()); +} else { + instance.serviceReady().then((s) => s.h2c().listen(parseInt(process.env.PORT || '') || 3000)); +} diff --git a/thinapps-shared b/thinapps-shared index 2f45bd5..ca09ea8 160000 --- a/thinapps-shared +++ b/thinapps-shared @@ -1 +1 @@ -Subproject commit 2f45bd58ddfc007d04dfdb9cb0814d74dc25e3f3 +Subproject commit ca09ea8fcbb84aeea4eb8015bf8e98eef1813048