feat: serp endpoint (#1180)

* wip

* wip

* fix

* wip

* fix: add jitter to user cache

* cd

* fix

* fix

* fix: user cache age comparison

* fix: try to partition apiroll query

* bump: deps

* wip

* cd

* feat: fallback for serp

* fix

* cd

* fix

* fix

* serp: stop hiding expense

* serp: enable fallback by default
This commit is contained in:
Yanlong Wang 2025-04-02 14:58:13 +08:00 committed by GitHub
parent 919a81d7c9
commit 12ba1bcfad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 1988 additions and 521 deletions

View File

@ -75,9 +75,15 @@ jobs:
- name: Deploy SEARCH with Tag
run: |
gcloud beta run deploy search --image us-docker.pkg.dev/reader-6b7dc/jina-reader/reader@${{steps.container.outputs.imageid}} --tag ${{ env.RELEASE_VERSION }} --command '' --args build/stand-alone/search.js --region us-central1 --async --min-instances 0 --deploy-health-check --use-http2
- name: Deploy SERP with Tag
run: |
gcloud beta run deploy serp --image us-docker.pkg.dev/reader-6b7dc/jina-reader/reader@${{steps.container.outputs.imageid}} --tag ${{ env.RELEASE_VERSION }} --command '' --args build/stand-alone/serp.js --region us-central1 --async --min-instances 0 --deploy-health-check --use-http2
- name: Deploy CRAWL-EU with Tag
run: |
gcloud beta run deploy crawl-eu --image us-docker.pkg.dev/reader-6b7dc/jina-reader/reader@${{steps.container.outputs.imageid}} --tag ${{ env.RELEASE_VERSION }} --command '' --args build/stand-alone/crawl.js --region europe-west1 --async --min-instances 0 --deploy-health-check --use-http2
- name: Deploy SEARCH-EU with Tag
run: |
gcloud beta run deploy search-eu --image us-docker.pkg.dev/reader-6b7dc/jina-reader/reader@${{steps.container.outputs.imageid}} --tag ${{ env.RELEASE_VERSION }} --command '' --args build/stand-alone/search.js --region europe-west1 --async --min-instances 0 --deploy-health-check --use-http2
gcloud beta run deploy search-eu --image us-docker.pkg.dev/reader-6b7dc/jina-reader/reader@${{steps.container.outputs.imageid}} --tag ${{ env.RELEASE_VERSION }} --command '' --args build/stand-alone/search.js --region europe-west1 --async --min-instances 0 --deploy-health-check --use-http2
- name: Deploy SERP-JP with Tag
run: |
gcloud beta run deploy serp-jp --image us-docker.pkg.dev/reader-6b7dc/jina-reader/reader@${{steps.container.outputs.imageid}} --tag ${{ env.RELEASE_VERSION }} --command '' --args build/stand-alone/serp.js --region asia-northeast1 --async --min-instances 0 --deploy-health-check --use-http2

22
.vscode/launch.json vendored
View File

@ -102,5 +102,27 @@
"preLaunchTask": "Backend:build:watch",
"killBehavior": "forceful"
},
{
"name": "Debug Stand Alone SERP",
"request": "launch",
"runtimeArgs": [
"--env-file=.secret.local",
],
"env": {
"GCLOUD_PROJECT": "reader-6b7dc",
"PREFERRED_PROXY_COUNTRY": "hk",
"OVERRIDE_GOOGLE_DOMAIN": "www.google.com.hk",
"LD_PRELOAD": "/usr/local/lib/libcurl-impersonate-chrome.dylib"
},
"cwd": "${workspaceFolder}",
"program": "build/stand-alone/serp.js",
"skipFiles": [
"<node_internals>/**"
],
"type": "node",
"outputCapture": "std",
"preLaunchTask": "Backend:build:watch",
"killBehavior": "forceful"
},
]
}

View File

@ -48,6 +48,7 @@ import { RobotsTxtService } from '../services/robots-text';
import { TempFileManager } from '../services/temp-file';
import { MiscService } from '../services/misc';
import { HTTPServiceError } from 'civkit';
import { GeoIPService } from '../services/geoip';
export interface ExtraScrappingOptions extends ScrappingOptions {
withIframe?: boolean | 'quoted';
@ -58,6 +59,7 @@ export interface ExtraScrappingOptions extends ScrappingOptions {
engine?: string;
allocProxy?: string;
private?: boolean;
countryHint?: string;
}
const indexProto = {
@ -94,6 +96,7 @@ export class CrawlerHost extends RPCHost {
protected threadLocal: AsyncLocalContext,
protected robotsTxtService: RobotsTxtService,
protected tempFileManager: TempFileManager,
protected geoIpService: GeoIPService,
protected miscService: MiscService,
) {
super(...arguments);
@ -511,15 +514,16 @@ export class CrawlerHost extends RPCHost {
});
}
const result = await this.miscService.assertNormalizedUrl(url);
if (this.puppeteerControl.circuitBreakerHosts.has(result.hostname.toLowerCase())) {
const { url: safeURL, ips } = await this.miscService.assertNormalizedUrl(url);
if (this.puppeteerControl.circuitBreakerHosts.has(safeURL.hostname.toLowerCase())) {
throw new SecurityCompromiseError({
message: `Circular hostname: ${result.protocol}`,
message: `Circular hostname: ${safeURL.protocol}`,
path: 'url'
});
}
crawlerOptions._hintIps = ips;
return result;
return safeURL;
}
getUrlDigest(urlToCrawl: URL) {
@ -886,7 +890,11 @@ export class CrawlerHost extends RPCHost {
}
}
} else if (crawlOpts?.allocProxy && crawlOpts.allocProxy !== 'none' && !crawlOpts.proxyUrl) {
crawlOpts.proxyUrl = (await this.proxyProvider.alloc(crawlOpts.allocProxy)).href;
const proxyUrl = await this.proxyProvider.alloc(this.figureOutBestProxyCountry(crawlOpts));
if (proxyUrl.protocol === 'socks5h:') {
proxyUrl.protocol = 'socks5:';
}
crawlOpts.proxyUrl = proxyUrl.href;
}
try {
@ -1030,6 +1038,7 @@ export class CrawlerHost extends RPCHost {
proxyResources: (opts.proxyUrl || opts.proxy?.endsWith('+')) ? true : false,
private: Boolean(opts.doNotTrack),
};
if (crawlOpts.targetSelector?.length) {
if (typeof crawlOpts.targetSelector === 'string') {
crawlOpts.targetSelector = [crawlOpts.targetSelector];
@ -1046,6 +1055,18 @@ export class CrawlerHost extends RPCHost {
}
}
if (opts._hintIps?.length) {
const hints = await this.geoIpService.lookupCities(opts._hintIps);
const board: Record<string, number> = {};
for (const x of hints) {
if (x.country?.code) {
board[x.country.code] = (board[x.country.code] || 0) + 1;
}
}
const hintCountry = _.maxBy(Array.from(Object.entries(board)), 1)?.[0];
crawlOpts.countryHint = hintCountry?.toLowerCase();
}
if (opts.locale) {
crawlOpts.extraHeaders ??= {};
crawlOpts.extraHeaders['Accept-Language'] = opts.locale;
@ -1221,6 +1242,7 @@ export class CrawlerHost extends RPCHost {
};
}
retryDet = new WeakSet<ExtraScrappingOptions>();
@retryWith((err) => {
if (err instanceof ServiceBadApproachError) {
return false;
@ -1239,7 +1261,14 @@ export class CrawlerHost extends RPCHost {
if (opts?.allocProxy === 'none') {
return this.curlControl.sideLoad(url, opts);
}
const proxy = await this.proxyProvider.alloc(opts?.allocProxy);
const proxy = await this.proxyProvider.alloc(this.figureOutBestProxyCountry(opts));
if (opts) {
if (this.retryDet.has(opts) && proxy.protocol === 'socks5h:') {
proxy.protocol = 'socks5:';
}
this.retryDet.add(opts);
}
const r = await this.curlControl.sideLoad(url, {
...opts,
proxyUrl: proxy.href,
@ -1252,6 +1281,34 @@ export class CrawlerHost extends RPCHost {
return { ...r, proxy };
}
protected figureOutBestProxyCountry(opts?: ExtraScrappingOptions) {
if (!opts) {
return 'auto';
}
let draft;
if (opts.allocProxy) {
if (this.proxyProvider.supports(opts.allocProxy)) {
draft = opts.allocProxy;
} else if (opts.allocProxy === 'none') {
return 'none';
}
}
if (opts.countryHint) {
if (this.proxyProvider.supports(opts.countryHint)) {
draft ??= opts.countryHint;
} else if (opts.countryHint === 'cn') {
draft ??= 'hk';
}
}
draft ??= opts.allocProxy || 'auto';
return draft;
}
knownUrlThatSideLoadingWouldCrashTheBrowser(url: URL) {
if (url.hostname === 'chromewebstore.google.com') {
return true;

View File

@ -1,503 +0,0 @@
import { singleton } from 'tsyringe';
import _ from 'lodash';
import {
assignTransferProtocolMeta, RPCHost, RPCReflection,
AssertionFailureError,
RawString,
} from 'civkit/civ-rpc';
import { marshalErrorLike } from 'civkit/lang';
import { objHashMd5B64Of } from 'civkit/hash';
import { RateLimitControl, RateLimitDesc } from '../shared/services/rate-limit';
import { WebSearchApiResponse, SearchResult as WebSearchResult } from '../shared/3rd-party/brave-types';
import { WebSearchQueryParams } from '../shared/3rd-party/brave-search';
import { CrawlerHost, ExtraScrappingOptions } from './crawler';
import { SearchResult } from '../db/searched';
import { JinaEmbeddingsAuthDTO } from '../dto/jina-embeddings-auth';
import { CrawlerOptions } from '../dto/crawler-options';
import { BraveSearchExplicitOperatorsDto, BraveSearchService } from '../services/brave-search';
import { SnapshotFormatter, FormattedPage } from '../services/snapshot-formatter';
import { GlobalLogger } from '../services/logger';
import { AsyncLocalContext } from '../services/async-context';
import { OutputServerEventStream } from '../lib/transform-server-event-stream';
import { Context, Ctx, Method, Param, RPCReflect } from '../services/registry';
import { InsufficientBalanceError } from '../services/errors';
@singleton()
export class SearcherHost extends RPCHost {
logger = this.globalLogger.child({ service: this.constructor.name });
cacheRetentionMs = 1000 * 3600 * 24 * 7;
cacheValidMs = 1000 * 3600;
pageCacheToleranceMs = 1000 * 3600 * 24;
reasonableDelayMs = 15_000;
targetResultCount = 5;
constructor(
protected globalLogger: GlobalLogger,
protected rateLimitControl: RateLimitControl,
protected threadLocal: AsyncLocalContext,
protected braveSearchService: BraveSearchService,
protected crawler: CrawlerHost,
protected snapshotFormatter: SnapshotFormatter,
) {
super(...arguments);
}
override async init() {
await this.dependencyReady();
this.emit('ready');
}
@Method({
name: 'searchIndex',
ext: {
http: {
action: ['get', 'post'],
path: '/search'
}
},
tags: ['search'],
returnType: [String, OutputServerEventStream],
})
@Method({
ext: {
http: {
action: ['get', 'post'],
path: '::q'
}
},
tags: ['search'],
returnType: [String, OutputServerEventStream, RawString],
})
async search(
@RPCReflect() rpcReflect: RPCReflection,
@Ctx() ctx: Context,
auth: JinaEmbeddingsAuthDTO,
@Param('count', { default: 5, validate: (v) => v >= 0 && v <= 10 })
count: number,
crawlerOptions: CrawlerOptions,
braveSearchExplicitOperators: BraveSearchExplicitOperatorsDto,
@Param('q') q?: string,
) {
const uid = await auth.solveUID();
let chargeAmount = 0;
const noSlashPath = decodeURIComponent(ctx.path).slice(1);
if (!noSlashPath && !q) {
const index = await this.crawler.getIndex(auth);
if (!uid) {
index.note = 'Authentication is required to use this endpoint. Please provide a valid API key via Authorization header.';
}
if (!ctx.accepts('text/plain') && (ctx.accepts('text/json') || ctx.accepts('application/json'))) {
return index;
}
return assignTransferProtocolMeta(`${index}`,
{ contentType: 'text/plain', envelope: null }
);
}
const user = await auth.assertUser();
if (!(user.wallet.total_balance > 0)) {
throw new InsufficientBalanceError(`Account balance not enough to run this query, please recharge.`);
}
const rateLimitPolicy = auth.getRateLimits(rpcReflect.name.toUpperCase()) || [
parseInt(user.metadata?.speed_level) >= 2 ?
RateLimitDesc.from({
occurrence: 100,
periodSeconds: 60
}) :
RateLimitDesc.from({
occurrence: 40,
periodSeconds: 60
})
];
const apiRoll = await this.rateLimitControl.simpleRPCUidBasedLimit(
rpcReflect, uid!, [rpcReflect.name.toUpperCase()],
...rateLimitPolicy
);
rpcReflect.finally(() => {
if (chargeAmount) {
auth.reportUsage(chargeAmount, `reader-${rpcReflect.name}`).catch((err) => {
this.logger.warn(`Unable to report usage for ${uid}`, { err: marshalErrorLike(err) });
});
apiRoll.chargeAmount = chargeAmount;
}
});
delete crawlerOptions.html;
const crawlOpts = await this.crawler.configure(crawlerOptions);
const searchQuery = braveSearchExplicitOperators.addTo(q || noSlashPath);
const r = await this.cachedWebSearch({
q: searchQuery,
count: count ? Math.floor(count + 2) : 20
}, crawlerOptions.noCache);
if (!r.web?.results.length) {
throw new AssertionFailureError(`No search results available for query ${searchQuery}`);
}
if (crawlOpts.timeoutMs && crawlOpts.timeoutMs < 30_000) {
delete crawlOpts.timeoutMs;
}
const it = this.fetchSearchResults(crawlerOptions.respondWith, r.web?.results.slice(0, count + 2), crawlOpts,
CrawlerOptions.from({ ...crawlerOptions, cacheTolerance: crawlerOptions.cacheTolerance ?? this.pageCacheToleranceMs }),
count,
);
if (!ctx.accepts('text/plain') && ctx.accepts('text/event-stream')) {
const sseStream = new OutputServerEventStream();
rpcReflect.return(sseStream);
try {
for await (const scrapped of it) {
if (!scrapped) {
continue;
}
chargeAmount = this.assignChargeAmount(scrapped);
sseStream.write({
event: 'data',
data: scrapped,
});
}
} catch (err: any) {
this.logger.error(`Failed to collect search result for query ${searchQuery}`,
{ err: marshalErrorLike(err) }
);
sseStream.write({
event: 'error',
data: marshalErrorLike(err),
});
}
sseStream.end();
return sseStream;
}
let lastScrapped: any[] | undefined;
let earlyReturn = false;
if (!ctx.accepts('text/plain') && (ctx.accepts('text/json') || ctx.accepts('application/json'))) {
let earlyReturnTimer: ReturnType<typeof setTimeout> | undefined;
const setEarlyReturnTimer = () => {
if (earlyReturnTimer) {
return;
}
earlyReturnTimer = setTimeout(() => {
if (!lastScrapped) {
return;
}
chargeAmount = this.assignChargeAmount(lastScrapped);
rpcReflect.return(lastScrapped);
earlyReturn = true;
}, ((crawlerOptions.timeout || 0) * 1000) || this.reasonableDelayMs);
};
for await (const scrapped of it) {
lastScrapped = scrapped;
if (_.some(scrapped, (x) => this.pageQualified(x))) {
setEarlyReturnTimer();
}
if (!this.searchResultsQualified(scrapped, count)) {
continue;
}
if (earlyReturnTimer) {
clearTimeout(earlyReturnTimer);
}
chargeAmount = this.assignChargeAmount(scrapped);
return scrapped;
}
if (earlyReturnTimer) {
clearTimeout(earlyReturnTimer);
}
if (!lastScrapped) {
throw new AssertionFailureError(`No content available for query ${searchQuery}`);
}
if (!earlyReturn) {
chargeAmount = this.assignChargeAmount(lastScrapped);
}
return lastScrapped;
}
let earlyReturnTimer: ReturnType<typeof setTimeout> | undefined;
const setEarlyReturnTimer = () => {
if (earlyReturnTimer) {
return;
}
earlyReturnTimer = setTimeout(() => {
if (!lastScrapped) {
return;
}
chargeAmount = this.assignChargeAmount(lastScrapped);
rpcReflect.return(assignTransferProtocolMeta(`${lastScrapped}`, { contentType: 'text/plain', envelope: null }));
earlyReturn = true;
}, ((crawlerOptions.timeout || 0) * 1000) || this.reasonableDelayMs);
};
for await (const scrapped of it) {
lastScrapped = scrapped;
if (_.some(scrapped, (x) => this.pageQualified(x))) {
setEarlyReturnTimer();
}
if (!this.searchResultsQualified(scrapped, count)) {
continue;
}
if (earlyReturnTimer) {
clearTimeout(earlyReturnTimer);
}
chargeAmount = this.assignChargeAmount(scrapped);
return assignTransferProtocolMeta(`${scrapped}`, { contentType: 'text/plain', envelope: null });
}
if (earlyReturnTimer) {
clearTimeout(earlyReturnTimer);
}
if (!lastScrapped) {
throw new AssertionFailureError(`No content available for query ${searchQuery}`);
}
if (!earlyReturn) {
chargeAmount = this.assignChargeAmount(lastScrapped);
}
return assignTransferProtocolMeta(`${lastScrapped}`, { contentType: 'text/plain', envelope: null });
}
async *fetchSearchResults(
mode: string | 'markdown' | 'html' | 'text' | 'screenshot',
searchResults?: WebSearchResult[],
options?: ExtraScrappingOptions,
crawlerOptions?: CrawlerOptions,
count?: number,
) {
if (!searchResults) {
return;
}
if (count === 0) {
const resultArray = searchResults.map((upstreamSearchResult, i) => ({
url: upstreamSearchResult.url,
title: upstreamSearchResult.title,
description: upstreamSearchResult.description,
content: ['html', 'text', 'screenshot'].includes(mode) ? undefined : '',
toString() {
return `[${i + 1}] Title: ${this.title}
[${i + 1}] URL Source: ${this.url}
[${i + 1}] Description: ${this.description}
`;
}
})) as FormattedPage[];
resultArray.toString = function () {
return this.map((x, i) => x ? x.toString() : '').join('\n\n').trimEnd() + '\n';
};
yield resultArray;
return;
}
const urls = searchResults.map((x) => new URL(x.url));
const snapshotMap = new WeakMap();
for await (const scrapped of this.crawler.scrapMany(urls, options, crawlerOptions)) {
const mapped = scrapped.map((x, i) => {
const upstreamSearchResult = searchResults[i];
if (!x) {
return {
url: upstreamSearchResult.url,
title: upstreamSearchResult.title,
description: upstreamSearchResult.description,
content: ['html', 'text', 'screenshot'].includes(mode) ? undefined : ''
};
}
if (snapshotMap.has(x)) {
return snapshotMap.get(x);
}
return this.snapshotFormatter.formatSnapshot(mode, x, urls[i]).then((r) => {
r.title ??= upstreamSearchResult.title;
r.description = upstreamSearchResult.description;
snapshotMap.set(x, r);
return r;
}).catch((err) => {
this.logger.error(`Failed to format snapshot for ${urls[i].href}`, { err: marshalErrorLike(err) });
return {
url: upstreamSearchResult.url,
title: upstreamSearchResult.title,
description: upstreamSearchResult.description,
content: x.text,
};
});
});
const resultArray = await Promise.all(mapped) as FormattedPage[];
yield this.reOrganizeSearchResults(resultArray, count);
}
}
reOrganizeSearchResults(searchResults: FormattedPage[], count?: number) {
const targetResultCount = count || this.targetResultCount;
const [qualifiedPages, unqualifiedPages] = _.partition(searchResults, (x) => this.pageQualified(x));
const acceptSet = new Set(qualifiedPages);
const n = targetResultCount - qualifiedPages.length;
for (const x of unqualifiedPages.slice(0, n >= 0 ? n : 0)) {
acceptSet.add(x);
}
const filtered = searchResults.filter((x) => acceptSet.has(x)).slice(0, targetResultCount);
const resultArray = filtered.map((x, i) => {
return {
...x,
toString(this: any) {
if (!this.content && this.description) {
if (this.title || x.textRepresentation) {
const textRep = x.textRepresentation ? `\n[${i + 1}] Content: \n${x.textRepresentation}` : '';
return `[${i + 1}] Title: ${this.title}
[${i + 1}] URL Source: ${this.url}
[${i + 1}] Description: ${this.description}${textRep}
`;
}
return `[${i + 1}] No content available for ${this.url}`;
}
const mixins = [];
if (this.description) {
mixins.push(`[${i + 1}] Description: ${this.description}`);
}
if (this.publishedTime) {
mixins.push(`[${i + 1}] Published Time: ${this.publishedTime}`);
}
const suffixMixins = [];
if (this.images) {
const imageSummaryChunks = [`[${i + 1}] Images:`];
for (const [k, v] of Object.entries(this.images)) {
imageSummaryChunks.push(`- ![${k}](${v})`);
}
if (imageSummaryChunks.length === 1) {
imageSummaryChunks.push('This page does not seem to contain any images.');
}
suffixMixins.push(imageSummaryChunks.join('\n'));
}
if (this.links) {
const linkSummaryChunks = [`[${i + 1}] Links/Buttons:`];
for (const [k, v] of Object.entries(this.links)) {
linkSummaryChunks.push(`- [${k}](${v})`);
}
if (linkSummaryChunks.length === 1) {
linkSummaryChunks.push('This page does not seem to contain any buttons/links.');
}
suffixMixins.push(linkSummaryChunks.join('\n'));
}
return `[${i + 1}] Title: ${this.title}
[${i + 1}] URL Source: ${this.url}${mixins.length ? `\n${mixins.join('\n')}` : ''}
[${i + 1}] Markdown Content:
${this.content}
${suffixMixins.length ? `\n${suffixMixins.join('\n')}\n` : ''}`;
}
};
});
resultArray.toString = function () {
return this.map((x, i) => x ? x.toString() : `[${i + 1}] No content available for ${this[i].url}`).join('\n\n').trimEnd() + '\n';
};
return resultArray;
}
assignChargeAmount(formatted: FormattedPage[]) {
return _.sum(
formatted.map((x) => this.crawler.assignChargeAmount(x) || 0)
);
}
pageQualified(formattedPage: FormattedPage) {
return formattedPage.title &&
formattedPage.content ||
formattedPage.screenshotUrl ||
formattedPage.pageshotUrl ||
formattedPage.text ||
formattedPage.html;
}
searchResultsQualified(results: FormattedPage[], targetResultCount = this.targetResultCount) {
return _.every(results, (x) => this.pageQualified(x)) && results.length >= targetResultCount;
}
async cachedWebSearch(query: WebSearchQueryParams, noCache: boolean = false) {
const queryDigest = objHashMd5B64Of(query);
let cache;
if (!noCache) {
cache = (await SearchResult.fromFirestoreQuery(
SearchResult.COLLECTION.where('queryDigest', '==', queryDigest)
.orderBy('createdAt', 'desc')
.limit(1)
))[0];
if (cache) {
const age = Date.now() - cache.createdAt.valueOf();
const stale = cache.createdAt.valueOf() < (Date.now() - this.cacheValidMs);
this.logger.info(`${stale ? 'Stale cache exists' : 'Cache hit'} for search query "${query.q}", normalized digest: ${queryDigest}, ${age}ms old`, {
query, digest: queryDigest, age, stale
});
if (!stale) {
return cache.response as WebSearchApiResponse;
}
}
}
try {
const r = await this.braveSearchService.webSearch(query);
const nowDate = new Date();
const record = SearchResult.from({
query,
queryDigest,
response: r,
createdAt: nowDate,
expireAt: new Date(nowDate.valueOf() + this.cacheRetentionMs)
});
SearchResult.save(record.degradeForFireStore()).catch((err) => {
this.logger.warn(`Failed to cache search result`, { err });
});
return r;
} catch (err: any) {
if (cache) {
this.logger.warn(`Failed to fetch search result, but a stale cache is available. falling back to stale cache`, { err: marshalErrorLike(err) });
return cache.response as WebSearchApiResponse;
}
throw err;
}
}
}

505
src/api/serp.ts Normal file
View File

@ -0,0 +1,505 @@
import { singleton } from 'tsyringe';
import {
RPCHost, RPCReflection, assignMeta, RawString,
ParamValidationError,
assignTransferProtocolMeta,
} from 'civkit/civ-rpc';
import { marshalErrorLike } from 'civkit/lang';
import _ from 'lodash';
import { RateLimitControl, RateLimitDesc } from '../shared/services/rate-limit';
import { GlobalLogger } from '../services/logger';
import { AsyncLocalContext } from '../services/async-context';
import { Context, Ctx, Method, Param, RPCReflect } from '../services/registry';
import { OutputServerEventStream } from '../lib/transform-server-event-stream';
import { JinaEmbeddingsAuthDTO } from '../dto/jina-embeddings-auth';
import { InsufficientBalanceError, RateLimitTriggeredError } from '../services/errors';
import { WORLD_COUNTRIES, WORLD_LANGUAGES } from '../shared/3rd-party/serper-search';
import { GoogleSERP } from '../services/serp/google';
import { WebSearchEntry } from '../services/serp/compat';
import { CrawlerOptions } from '../dto/crawler-options';
import { ScrappingOptions } from '../services/serp/puppeteer';
import { objHashMd5B64Of } from 'civkit/hash';
import { SERPResult } from '../db/searched';
import { SerperBingSearchService, SerperGoogleSearchService } from '../services/serp/serper';
import type { JinaEmbeddingsTokenAccount } from '../shared/db/jina-embeddings-token-account';
import { LRUCache } from 'lru-cache';
const WORLD_COUNTRY_CODES = Object.keys(WORLD_COUNTRIES).map((x) => x.toLowerCase());
type RateLimitCache = {
blockedUntil?: Date;
user?: JinaEmbeddingsTokenAccount;
};
const indexProto = {
toString: function (): string {
return _(this)
.toPairs()
.map(([k, v]) => k ? `[${_.upperFirst(_.lowerCase(k))}] ${v}` : '')
.value()
.join('\n') + '\n';
}
};
@singleton()
export class SerpHost extends RPCHost {
logger = this.globalLogger.child({ service: this.constructor.name });
cacheRetentionMs = 1000 * 3600 * 24 * 7;
cacheValidMs = 1000 * 3600;
pageCacheToleranceMs = 1000 * 3600 * 24;
reasonableDelayMs = 15_000;
targetResultCount = 5;
highFreqKeyCache = new LRUCache<string, RateLimitCache>({
max: 256,
ttl: 60 * 60 * 1000,
updateAgeOnGet: false,
updateAgeOnHas: false,
});
async getIndex(ctx: Context, auth?: JinaEmbeddingsAuthDTO) {
const indexObject: Record<string, string | number | undefined> = Object.create(indexProto);
Object.assign(indexObject, {
usage1: 'https://r.jina.ai/YOUR_URL',
usage2: 'https://s.jina.ai/YOUR_SEARCH_QUERY',
usage3: `${ctx.origin}/search/YOUR_SEARCH_QUERY`,
homepage: 'https://jina.ai/reader',
sourceCode: 'https://github.com/jina-ai/reader',
});
if (auth && auth.user) {
indexObject[''] = undefined;
indexObject.authenticatedAs = `${auth.user.user_id} (${auth.user.full_name})`;
indexObject.balanceLeft = auth.user.wallet.total_balance;
} else {
indexObject.note = 'Authentication is required to use this endpoint. Please provide a valid API key via Authorization header.';
}
return indexObject;
}
constructor(
protected globalLogger: GlobalLogger,
protected rateLimitControl: RateLimitControl,
protected threadLocal: AsyncLocalContext,
protected googleSerp: GoogleSERP,
protected serperGoogle: SerperGoogleSearchService,
protected serperBing: SerperBingSearchService,
) {
super(...arguments);
}
override async init() {
await this.dependencyReady();
this.emit('ready');
}
@Method({
name: 'searchIndex',
ext: {
http: {
action: ['get', 'post'],
path: '/'
}
},
tags: ['search'],
returnType: [String, OutputServerEventStream, RawString],
})
@Method({
ext: {
http: {
action: ['get', 'post'],
}
},
tags: ['search'],
returnType: [String, OutputServerEventStream, RawString],
})
async search(
@RPCReflect() rpcReflect: RPCReflection,
@Ctx() ctx: Context,
crawlerOptions: CrawlerOptions,
auth: JinaEmbeddingsAuthDTO,
@Param('type', { type: new Set(['web', 'images', 'news']), default: 'web' })
variant: 'web' | 'images' | 'news',
@Param('q') q?: string,
@Param('provider', { type: new Set(['google', 'bing']) })
searchEngine?: 'google' | 'bing',
@Param('num', { validate: (v: number) => v >= 0 && v <= 20 })
num?: number,
@Param('gl', { validate: (v: string) => WORLD_COUNTRY_CODES.includes(v?.toLowerCase()) }) gl?: string,
@Param('hl', { validate: (v: string) => WORLD_LANGUAGES.some(l => l.code === v) }) hl?: string,
@Param('location') location?: string,
@Param('page') page?: number,
@Param('fallback', { default: true }) fallback?: boolean,
) {
const authToken = auth.bearerToken;
let highFreqKey: RateLimitCache | undefined;
if (authToken && this.highFreqKeyCache.has(authToken)) {
highFreqKey = this.highFreqKeyCache.get(authToken)!;
auth.user = highFreqKey.user;
auth.uid = highFreqKey.user?.user_id;
}
const uid = await auth.solveUID();
if (!q) {
if (ctx.path === '/') {
const indexObject = this.getIndex(ctx, auth);
if (!ctx.accepts('text/plain') && (ctx.accepts('text/json') || ctx.accepts('application/json'))) {
return indexObject;
}
return assignTransferProtocolMeta(`${indexObject}`,
{ contentType: 'text/plain; charset=utf-8', envelope: null }
);
}
throw new ParamValidationError({
path: 'q',
message: `Required but not provided`
});
}
// Return content by default
const user = await auth.assertUser();
if (!(user.wallet.total_balance > 0)) {
throw new InsufficientBalanceError(`Account balance not enough to run this query, please recharge.`);
}
if (highFreqKey?.blockedUntil) {
const now = new Date();
const blockedTimeRemaining = (highFreqKey.blockedUntil.valueOf() - now.valueOf());
if (blockedTimeRemaining > 0) {
throw RateLimitTriggeredError.from({
message: `Per UID rate limit exceeded (async)`,
retryAfter: Math.ceil(blockedTimeRemaining / 1000),
});
}
}
const PREMIUM_KEY_LIMIT = 400;
const rateLimitPolicy = auth.getRateLimits('SEARCH') || [
parseInt(user.metadata?.speed_level) >= 2 ?
RateLimitDesc.from({
occurrence: PREMIUM_KEY_LIMIT,
periodSeconds: 60
}) :
RateLimitDesc.from({
occurrence: 40,
periodSeconds: 60
})
];
const apiRollPromise = this.rateLimitControl.simpleRPCUidBasedLimit(
rpcReflect, uid!, ['SEARCH'],
...rateLimitPolicy
);
if (!highFreqKey) {
// Normal path
await apiRollPromise;
if (rateLimitPolicy.some(
(x) => {
const rpm = x.occurrence / (x.periodSeconds / 60);
if (rpm >= PREMIUM_KEY_LIMIT) {
return true;
}
return false;
})
) {
this.highFreqKeyCache.set(auth.bearerToken!, {
user,
});
}
} else {
// High freq key path
apiRollPromise.then(
// Rate limit not triggered, make sure not blocking.
() => {
delete highFreqKey.blockedUntil;
},
// Rate limit triggered
(err) => {
if (!(err instanceof RateLimitTriggeredError)) {
return;
}
const now = Date.now();
let tgtDate;
if (err.retryAfter) {
tgtDate = new Date(now + err.retryAfter * 1000);
} else if (err.retryAfterDate) {
tgtDate = err.retryAfterDate;
}
if (tgtDate) {
const dt = tgtDate.valueOf() - now;
highFreqKey.blockedUntil = tgtDate;
setTimeout(() => {
if (highFreqKey.blockedUntil === tgtDate) {
delete highFreqKey.blockedUntil;
}
}, dt).unref();
}
}
).finally(async () => {
// Always asynchronously update user(wallet);
const user = await auth.getBrief().catch(() => undefined);
if (user) {
highFreqKey.user = user;
}
});
}
let chargeAmount = 0;
rpcReflect.finally(async () => {
if (chargeAmount) {
auth.reportUsage(chargeAmount, `reader-serp`).catch((err) => {
this.logger.warn(`Unable to report usage for ${uid}`, { err: marshalErrorLike(err) });
});
const apiRoll = await apiRollPromise;
apiRoll.chargeAmount = chargeAmount;
}
});
let chargeAmountScaler = 1;
if (searchEngine === 'bing') {
chargeAmountScaler = 3;
}
if (variant !== 'web') {
chargeAmountScaler = 5;
}
let realQuery = q;
let results = await this.cachedSearch(variant, {
provider: searchEngine,
q,
num,
gl,
hl,
location,
page,
}, crawlerOptions);
if (fallback && !results?.length && (!page || page === 1)) {
let tryTimes = 1;
const containsRTL = /[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF\u0590-\u05FF\uFB1D-\uFB4F\u0700-\u074F\u0780-\u07BF\u07C0-\u07FF]/.test(q);
const terms = q.split(/\s+/g).filter((x) => !!x);
while (terms.length > 1) {
containsRTL ? terms.shift() : terms.pop(); // reduce the query by one term at a time
realQuery = terms.join(' ').trim();
if (!realQuery) {
break;
}
this.logger.info(`Retrying search with fallback query: "${realQuery}"`);
results = await this.cachedSearch(variant, {
provider: searchEngine,
q: realQuery,
num,
gl,
hl,
location,
}, crawlerOptions);
tryTimes += 1;
if (results?.length) {
break;
}
}
chargeAmountScaler *= tryTimes;
}
if (!results?.length) {
results = [];
}
const finalResults = results.map((x: any) => this.mapToFinalResults(x));
await Promise.all(finalResults.map((x: any) => this.assignGeneralMixin(x)));
this.assignChargeAmount(finalResults, chargeAmountScaler);
assignMeta(finalResults, {
query: realQuery,
fallback: realQuery === q ? undefined : realQuery,
});
return finalResults;
}
assignChargeAmount(items: unknown[], scaler: number) {
const numCharge = Math.ceil(items.length / 10) * 10000 * scaler;
assignMeta(items, { usage: { tokens: numCharge } });
return numCharge;
}
async getFavicon(domain: string) {
const url = `https://www.google.com/s2/favicons?sz=32&domain_url=${domain}`;
try {
const response = await fetch(url);
if (!response.ok) {
return '';
}
const ab = await response.arrayBuffer();
const buffer = Buffer.from(ab);
const base64 = buffer.toString('base64');
return `data:image/png;base64,${base64}`;
} catch (error: any) {
this.logger.warn(`Failed to get favicon base64 string`, { err: marshalErrorLike(error) });
return '';
}
}
async configure(opts: CrawlerOptions) {
const crawlOpts: ScrappingOptions = {
proxyUrl: opts.proxyUrl,
cookies: opts.setCookies,
overrideUserAgent: opts.userAgent,
timeoutMs: opts.timeout ? opts.timeout * 1000 : undefined,
locale: opts.locale,
referer: opts.referer,
viewport: opts.viewport,
proxyResources: (opts.proxyUrl || opts.proxy?.endsWith('+')) ? true : false,
allocProxy: opts.proxy?.endsWith('+') ? opts.proxy.slice(0, -1) : opts.proxy,
};
if (opts.locale) {
crawlOpts.extraHeaders ??= {};
crawlOpts.extraHeaders['Accept-Language'] = opts.locale;
}
return crawlOpts;
}
mapToFinalResults(input: WebSearchEntry) {
const whitelistedProps = [
'imageUrl', 'imageWidth', 'imageHeight', 'source', 'date', 'siteLinks'
];
const result = {
title: input.title,
url: input.link,
description: Reflect.get(input, 'snippet'),
..._.pick(input, whitelistedProps),
};
return result;
}
*iterProviders(preference?: string) {
if (preference === 'bing') {
yield this.serperBing;
yield this.serperGoogle;
yield this.googleSerp;
return;
}
if (preference === 'google') {
yield this.googleSerp;
yield this.googleSerp;
yield this.serperGoogle;
return;
}
yield this.serperGoogle;
yield this.googleSerp;
yield this.googleSerp;
}
async cachedSearch(variant: 'web' | 'news' | 'images', query: Record<string, any>, opts: CrawlerOptions) {
const queryDigest = objHashMd5B64Of({ ...query, variant });
const provider = query.provider;
Reflect.deleteProperty(query, 'provider');
const noCache = opts.noCache;
let cache;
if (!noCache) {
cache = (await SERPResult.fromFirestoreQuery(
SERPResult.COLLECTION.where('queryDigest', '==', queryDigest)
.orderBy('createdAt', 'desc')
.limit(1)
))[0];
if (cache) {
const age = Date.now() - cache.createdAt.valueOf();
const stale = cache.createdAt.valueOf() < (Date.now() - this.cacheValidMs);
this.logger.info(`${stale ? 'Stale cache exists' : 'Cache hit'} for search query "${query.q}", normalized digest: ${queryDigest}, ${age}ms old`, {
query, digest: queryDigest, age, stale
});
if (!stale) {
return cache.response as any;
}
}
}
const scrappingOptions = await this.configure(opts);
try {
let r: any[] | undefined;
let lastError;
outerLoop:
for (const client of this.iterProviders(provider)) {
try {
switch (variant) {
case 'images': {
r = await Reflect.apply(client.imageSearch, client, [query, scrappingOptions]);
break outerLoop;
}
case 'news': {
r = await Reflect.apply(client.newsSearch, client, [query, scrappingOptions]);
break outerLoop;
}
case 'web':
default: {
r = await Reflect.apply(client.webSearch, client, [query, scrappingOptions]);
break outerLoop;
}
}
} catch (err) {
lastError = err;
this.logger.warn(`Failed to do ${variant} search using ${client.constructor.name}`, { err });
}
}
if (r?.length) {
const nowDate = new Date();
const record = SERPResult.from({
query,
queryDigest,
response: r,
createdAt: nowDate,
expireAt: new Date(nowDate.valueOf() + this.cacheRetentionMs)
});
SERPResult.save(record.degradeForFireStore()).catch((err) => {
this.logger.warn(`Failed to cache search result`, { err });
});
} else if (lastError) {
throw lastError;
}
return r;
} catch (err: any) {
if (cache) {
this.logger.warn(`Failed to fetch search result, but a stale cache is available. falling back to stale cache`, { err: marshalErrorLike(err) });
return cache.response as any;
}
throw err;
}
}
async assignGeneralMixin(result: Partial<WebSearchEntry>) {
const collectFavicon = this.threadLocal.get('collect-favicon');
if (collectFavicon && result.link) {
const url = new URL(result.link);
Reflect.set(result, 'favicon', await this.getFavicon(url.origin));
}
}
}

View File

@ -62,3 +62,7 @@ export class SearchResult extends FirestoreRecord {
export class SerperSearchResult extends SearchResult {
static override collectionName = 'serperSearchResults';
}
export class SERPResult extends SearchResult {
static override collectionName = 'SERPResults';
}

View File

@ -429,6 +429,8 @@ export class CrawlerOptions extends AutoCastable {
})
respondTiming?: RESPOND_TIMING;
_hintIps?: string[];
static override from(input: any) {
const instance = super.from(input) as CrawlerOptions;
const ctx = Reflect.get(input, RPC_CALL_ENVIRONMENT) as Context | undefined;

View File

@ -1,8 +1,9 @@
import _ from 'lodash';
import {
Also, AuthenticationFailedError, AuthenticationRequiredError,
DownstreamServiceFailureError, RPC_CALL_ENVIRONMENT,
RPC_CALL_ENVIRONMENT,
AutoCastable,
DownstreamServiceError,
} from 'civkit/civ-rpc';
import { htmlEscape } from 'civkit/escape';
import { marshalErrorLike } from 'civkit/lang';
@ -96,12 +97,14 @@ export class JinaEmbeddingsAuthDTO extends AutoCastable {
});
}
let firestoreDegradation = false;
let account;
try {
account = await JinaEmbeddingsTokenAccount.fromFirestore(this.bearerToken);
} catch (err) {
// FireStore would not accept any string as input and may throw if not happy with it
void 0;
firestoreDegradation = true;
logger.warn(`Firestore issue`, { err });
}
@ -109,7 +112,7 @@ export class JinaEmbeddingsAuthDTO extends AutoCastable {
const jitter = Math.ceil(Math.random() * 30 * 1000);
if (account && !ignoreCache) {
if (account && (age < (180_000 - jitter))) {
if ((age < (180_000 - jitter)) && (account.wallet?.total_balance > 0)) {
this.user = account;
this.uid = this.user?.user_id;
@ -117,6 +120,20 @@ export class JinaEmbeddingsAuthDTO extends AutoCastable {
}
}
if (firestoreDegradation) {
logger.debug(`Using remote UC cached user`);
const r = await this.jinaEmbeddingsDashboard.authorization(this.bearerToken);
const brief = r.data;
const draftAccount = JinaEmbeddingsTokenAccount.from({
...account, ...brief, _id: this.bearerToken,
lastSyncedAt: new Date()
});
this.user = draftAccount;
this.uid = this.user?.user_id;
return draftAccount;
}
try {
const r = await this.jinaEmbeddingsDashboard.validateToken(this.bearerToken);
const brief = r.data;
@ -148,7 +165,7 @@ export class JinaEmbeddingsAuthDTO extends AutoCastable {
}
throw new DownstreamServiceFailureError(`Failed to authenticate: ${err}`);
throw new DownstreamServiceError(`Failed to authenticate: ${err}`);
}
}

View File

@ -4,6 +4,7 @@ import { CityResponse, Reader } from 'maxmind';
import { AsyncService, AutoCastable, Prop, runOnce } from 'civkit';
import { GlobalLogger } from './logger';
import path from 'path';
import { Threaded } from './threaded';
export enum GEOIP_SUPPORTED_LANGUAGES {
EN = 'en',
@ -85,6 +86,7 @@ export class GeoIPService extends AsyncService {
}
@Threaded()
async lookupCity(ip: string, lang: GEOIP_SUPPORTED_LANGUAGES = GEOIP_SUPPORTED_LANGUAGES.EN) {
await this._lazyload();
@ -116,6 +118,13 @@ export class GeoIPService extends AsyncService {
});
}
@Threaded()
async lookupCities(ips: string[], lang: GEOIP_SUPPORTED_LANGUAGES = GEOIP_SUPPORTED_LANGUAGES.EN) {
const r = (await Promise.all(ips.map((ip) => this.lookupCity(ip, lang)))).filter(Boolean) as GeoIPCityResponse[];
return r;
}
}
const instance = container.resolve(GeoIPService);

View File

@ -57,7 +57,11 @@ export class MiscService extends AsyncService {
}
const normalizedHostname = result.hostname.startsWith('[') ? result.hostname.slice(1, -1) : result.hostname;
let ips: string[] = [];
const isIp = isIP(normalizedHostname);
if (isIp) {
ips.push(normalizedHostname);
}
if (
(result.hostname === 'localhost') ||
(isIp && isIPInNonPublicRange(normalizedHostname))
@ -88,12 +92,16 @@ export class MiscService extends AsyncService {
path: 'url'
});
}
ips.push(x.address);
}
}
}
return result;
return {
url: result,
ips
};
}
}

View File

@ -562,7 +562,8 @@ export class PuppeteerControl extends AsyncService {
headless: !Boolean(process.env.DEBUG_BROWSER),
executablePath: process.env.OVERRIDE_CHROME_EXECUTABLE_PATH,
args: [
'--disable-dev-shm-usage', '--disable-blink-features=AutomationControlled'
'--disable-dev-shm-usage',
'--disable-blink-features=AutomationControlled'
]
}).catch((err: any) => {
this.logger.error(`Unknown firebase issue, just die fast.`, { err });
@ -1618,11 +1619,7 @@ export class PuppeteerControl extends AsyncService {
}
}
try {
const pSubFrameSnapshots = this.snapshotChildFrames(page);
snapshot = await page.evaluate('giveSnapshot(true)') as PageSnapshot;
if (snapshot) {
snapshot.childFrames = await pSubFrameSnapshots;
}
} catch (err: any) {
this.logger.warn(`Page ${sn}: Failed to finalize ${url}`, { err });
if (stuff instanceof Error) {

View File

@ -0,0 +1,12 @@
export interface WebSearchEntry {
link: string;
title: string;
source?: string;
date?: string;
snippet?: string;
imageUrl?: string;
siteLinks?: {
link: string; title: string; snippet?: string;
}[];
variant?: 'web' | 'images' | 'news';
}

314
src/services/serp/google.ts Normal file
View File

@ -0,0 +1,314 @@
import { singleton } from 'tsyringe';
import { AsyncService } from 'civkit/async-service';
import { GlobalLogger } from '../logger';
import { JSDomControl } from '../jsdom';
import { isMainThread } from 'worker_threads';
import _ from 'lodash';
import { WebSearchEntry } from './compat';
import { ScrappingOptions, SERPSpecializedPuppeteerControl } from './puppeteer';
import { CurlControl } from '../curl';
import { readFile } from 'fs/promises';
import { ApplicationError } from 'civkit/civ-rpc';
import { ServiceBadApproachError, ServiceBadAttemptError } from '../errors';
import { parseJSONText } from 'civkit/vectorize';
import { retryWith } from 'civkit/decorators';
import { ProxyProvider } from '../../shared/services/proxy-provider';
@singleton()
export class GoogleSERP extends AsyncService {
googleDomain = process.env.OVERRIDE_GOOGLE_DOMAIN || 'www.google.com';
constructor(
protected globalLogger: GlobalLogger,
protected puppeteerControl: SERPSpecializedPuppeteerControl,
protected jsDomControl: JSDomControl,
protected curlControl: CurlControl,
protected proxyProvider: ProxyProvider,
) {
const filteredDeps = isMainThread ? arguments : _.without(arguments, puppeteerControl);
super(...filteredDeps);
}
override async init() {
await this.dependencyReady();
this.emit('ready');
}
retryDet = new WeakSet<ScrappingOptions>();
@retryWith((err) => {
if (err instanceof ServiceBadApproachError) {
return false;
}
if (err instanceof ServiceBadAttemptError) {
// Keep trying
return true;
}
if (err instanceof ApplicationError) {
// Quit with this error
return false;
}
return undefined;
}, 3)
async sideLoadWithAllocatedProxy(url: URL, opts?: ScrappingOptions) {
if (opts?.allocProxy === 'none') {
return this.curlControl.sideLoad(url, opts);
}
const proxy = await this.proxyProvider.alloc(
process.env.PREFERRED_PROXY_COUNTRY || 'auto'
);
if (opts) {
if (this.retryDet.has(opts) && proxy.protocol === 'socks5h:') {
proxy.protocol = 'socks5:';
}
this.retryDet.add(opts);
}
const r = await this.curlControl.sideLoad(url, {
...opts,
proxyUrl: proxy.href,
});
if (r.status === 429) {
throw new ServiceBadAttemptError('Google returned a 429 error. This may happen due to various reasons, including rate limiting or other issues.');
}
if (opts && opts.allocProxy) {
opts.proxyUrl ??= proxy.href;
}
return { ...r, proxy };
}
digestQuery(query: { [k: string]: any; }) {
const url = new URL(`https://${this.googleDomain}/search`);
const clone = { ...query };
const num = clone.num || 10;
if (clone.page) {
const page = parseInt(clone.page);
delete clone.page;
clone.start = (page - 1) * num;
if (clone.start === 0) {
delete clone.start;
}
}
if (clone.location) {
delete clone.location;
}
for (const [k, v] of Object.entries(clone)) {
if (v === undefined || v === null) {
continue;
}
url.searchParams.set(k, `${v}`);
}
return url;
}
async webSearch(query: { [k: string]: any; }, opts?: ScrappingOptions) {
const url = this.digestQuery(query);
const sideLoaded = await this.sideLoadWithAllocatedProxy(url, opts);
if (opts && sideLoaded.sideLoadOpts) {
opts.sideLoad = sideLoaded.sideLoadOpts;
}
const snapshot = await this.puppeteerControl.controlledScrap(url, getWebSearchResults, opts);
return snapshot;
}
async newsSearch(query: { [k: string]: any; }, opts?: ScrappingOptions) {
const url = this.digestQuery(query);
url.searchParams.set('tbm', 'nws');
const sideLoaded = await this.sideLoadWithAllocatedProxy(url, opts);
if (opts && sideLoaded.sideLoadOpts) {
opts.sideLoad = sideLoaded.sideLoadOpts;
}
const snapshot = await this.puppeteerControl.controlledScrap(url, getNewsSearchResults, opts);
return snapshot;
}
async imageSearch(query: { [k: string]: any; }, opts?: ScrappingOptions) {
const url = this.digestQuery(query);
url.searchParams.set('tbm', 'isch');
url.searchParams.set('asearch', 'isch');
url.searchParams.set('async', `_fmt:json,p:1,ijn:${query.start ? Math.floor(query.start / (query.num || 10)) : 0}`);
const sideLoaded = await this.sideLoadWithAllocatedProxy(url, opts);
if (sideLoaded.status !== 200 || !sideLoaded.file) {
throw new ServiceBadAttemptError('Google returned an error page. This may happen due to various reasons, including rate limiting or other issues.');
}
const jsonTxt = (await readFile((await sideLoaded.file.filePath))).toString();
const rJSON = parseJSONText(jsonTxt.slice(jsonTxt.indexOf('{"ischj":')));
return _.get(rJSON, 'ischj.metadata').map((x: any) => {
return {
link: _.get(x, 'result.referrer_url'),
title: _.get(x, 'result.page_title'),
snippet: _.get(x, 'text_in_grid.snippet'),
source: _.get(x, 'result.site_title'),
imageWidth: _.get(x, 'original_image.width'),
imageHeight: _.get(x, 'original_image.height'),
imageUrl: _.get(x, 'original_image.url'),
variant: 'images',
};
}) as WebSearchEntry[];
}
}
async function getWebSearchResults() {
if (location.pathname.startsWith('/sorry') || location.pathname.startsWith('/error')) {
throw new Error('Google returned an error page. This may happen due to various reasons, including rate limiting or other issues.');
}
// @ts-ignore
await Promise.race([window.waitForSelector('div[data-async-context^="query"]'), window.waitForSelector('#botstuff .mnr-c')]);
const wrapper1 = document.querySelector('div[data-async-context^="query"]');
if (!wrapper1) {
return undefined;
}
const query = decodeURIComponent(wrapper1.getAttribute('data-async-context')?.split('query:')[1] || '');
if (!query) {
return undefined;
}
const candidates = Array.from(wrapper1.querySelectorAll('div[lang],div[data-surl]'));
return candidates.map((x, pos) => {
const primaryLink = x.querySelector('a:not([href="#"])');
if (!primaryLink) {
return undefined;
}
const url = primaryLink.getAttribute('href');
if (primaryLink.querySelector('div[role="heading"]')) {
// const spans = primaryLink.querySelectorAll('span');
// const title = spans[0]?.textContent;
// const source = spans[1]?.textContent;
// const date = spans[spans.length - 1].textContent;
// return {
// link: url,
// title,
// source,
// date,
// variant: 'video'
// };
return undefined;
}
const title = primaryLink.querySelector('h3')?.textContent;
const source = Array.from(primaryLink.querySelectorAll('span')).find((x) => x.textContent)?.textContent;
const cite = primaryLink.querySelector('cite[role=text]')?.textContent;
let date = cite?.split('·')[1]?.trim();
const snippets = Array.from(x.querySelectorAll('div[data-sncf*="1"] span'));
let snippet = snippets[snippets.length - 1]?.textContent;
if (!snippet) {
snippet = x.querySelector('div.IsZvec')?.textContent?.trim() || null;
}
date ??= snippets[snippets.length - 2]?.textContent?.trim();
const imageUrl = x.querySelector('div[data-sncf*="1"] img[src]:not(img[src^="data"])')?.getAttribute('src');
let siteLinks = Array.from(x.querySelectorAll('div[data-sncf*="3"] a[href]')).map((l) => {
return {
link: l.getAttribute('href'),
title: l.textContent,
};
});
const perhapsParent = x.parentElement?.closest('div[data-hveid]');
if (!siteLinks?.length && perhapsParent) {
const candidates = Array.from(perhapsParent.querySelectorAll('td h3'));
if (candidates.length) {
siteLinks = candidates.map((l) => {
const link = l.querySelector('a');
if (!link) {
return undefined;
}
const snippet = l.nextElementSibling?.textContent;
return {
link: link.getAttribute('href'),
title: link.textContent,
snippet,
};
}).filter(Boolean) as any;
}
}
return {
link: url,
title,
source,
date,
snippet: snippet ?? undefined,
imageUrl: imageUrl?.startsWith('data:') ? undefined : imageUrl,
siteLinks: siteLinks.length ? siteLinks : undefined,
variant: 'web',
};
}).filter(Boolean) as WebSearchEntry[];
}
async function getNewsSearchResults() {
if (location.pathname.startsWith('/sorry') || location.pathname.startsWith('/error')) {
throw new Error('Google returned an error page. This may happen due to various reasons, including rate limiting or other issues.');
}
// @ts-ignore
await Promise.race([window.waitForSelector('div[data-async-context^="query"]'), window.waitForSelector('#botstuff .mnr-c')]);
const wrapper1 = document.querySelector('div[data-async-context^="query"]');
if (!wrapper1) {
return undefined;
}
const query = decodeURIComponent(wrapper1.getAttribute('data-async-context')?.split('query:')[1] || '');
if (!query) {
return undefined;
}
const candidates = Array.from(wrapper1.querySelectorAll('div[data-news-doc-id]'));
return candidates.map((x) => {
const primaryLink = x.querySelector('a:not([href="#"])');
if (!primaryLink) {
return undefined;
}
const url = primaryLink.getAttribute('href');
const titleElem = primaryLink.querySelector('div[role="heading"]');
if (!titleElem) {
return undefined;
}
const title = titleElem.textContent?.trim();
const source = titleElem.previousElementSibling?.textContent?.trim();
const snippet = titleElem.nextElementSibling?.textContent?.trim();
const innerSpans = Array.from(titleElem.parentElement?.querySelectorAll('span') || []);
const date = innerSpans[innerSpans.length - 1]?.textContent?.trim();
return {
link: url,
title,
source,
date,
snippet,
variant: 'news',
};
}).filter(Boolean) as WebSearchEntry[];
}

View File

@ -0,0 +1,692 @@
import _ from 'lodash';
import { readFile } from 'fs/promises';
import { container, singleton } from 'tsyringe';
import type { Browser, CookieParam, GoToOptions, Page, Viewport } from 'puppeteer';
import type { Cookie } from 'set-cookie-parser';
import puppeteer, { TimeoutError } from 'puppeteer';
import { Defer } from 'civkit/defer';
import { AssertionFailureError, ParamValidationError } from 'civkit/civ-rpc';
import { AsyncService } from 'civkit/async-service';
import { FancyFile } from 'civkit/fancy-file';
import { delay } from 'civkit/timeout';
import { SecurityCompromiseError, ServiceCrashedError, ServiceNodeResourceDrainError } from '../../shared/lib/errors';
import { CurlControl } from '../curl';
import { AsyncLocalContext } from '../async-context';
import { GlobalLogger } from '../logger';
import { minimalStealth } from '../minimal-stealth';
import { BlackHoleDetector } from '../blackhole-detector';
export interface ScrappingOptions {
proxyUrl?: string;
cookies?: Cookie[];
overrideUserAgent?: string;
timeoutMs?: number;
locale?: string;
referer?: string;
extraHeaders?: Record<string, string>;
viewport?: Viewport;
proxyResources?: boolean;
allocProxy?: string;
sideLoad?: {
impersonate: {
[url: string]: {
status: number;
headers: { [k: string]: string | string[]; };
contentType?: string;
body?: FancyFile;
};
};
proxyOrigin: { [origin: string]: string; };
};
}
const SIMULATE_SCROLL = `
(function () {
function createIntersectionObserverEntry(target, isIntersecting, timestamp) {
const targetRect = target.getBoundingClientRect();
const record = {
target,
isIntersecting,
time: timestamp,
// If intersecting, intersectionRect matches boundingClientRect
// If not intersecting, intersectionRect is empty (0x0)
intersectionRect: isIntersecting
? targetRect
: new DOMRectReadOnly(0, 0, 0, 0),
// Current bounding client rect of the target
boundingClientRect: targetRect,
// Intersection ratio is either 0 (not intersecting) or 1 (fully intersecting)
intersectionRatio: isIntersecting ? 1 : 0,
// Root bounds (viewport in our case)
rootBounds: new DOMRectReadOnly(
0,
0,
window.innerWidth,
window.innerHeight
)
};
Object.setPrototypeOf(record, window.IntersectionObserverEntry.prototype);
return record;
}
function cloneIntersectionObserverEntry(entry) {
const record = {
target: entry.target,
isIntersecting: entry.isIntersecting,
time: entry.time,
intersectionRect: entry.intersectionRect,
boundingClientRect: entry.boundingClientRect,
intersectionRatio: entry.intersectionRatio,
rootBounds: entry.rootBounds
};
Object.setPrototypeOf(record, window.IntersectionObserverEntry.prototype);
return record;
}
const orig = window.IntersectionObserver;
const kCallback = Symbol('callback');
const kLastEntryMap = Symbol('lastEntryMap');
const liveObservers = new Map();
class MangledIntersectionObserver extends orig {
constructor(callback, options) {
super((entries, observer) => {
const lastEntryMap = observer[kLastEntryMap];
const lastEntry = entries[entries.length - 1];
lastEntryMap.set(lastEntry.target, lastEntry);
return callback(entries, observer);
}, options);
this[kCallback] = callback;
this[kLastEntryMap] = new WeakMap();
liveObservers.set(this, new Set());
}
disconnect() {
liveObservers.get(this)?.clear();
liveObservers.delete(this);
return super.disconnect();
}
observe(target) {
const observer = liveObservers.get(this);
observer?.add(target);
return super.observe(target);
}
unobserve(target) {
const observer = liveObservers.get(this);
observer?.delete(target);
return super.unobserve(target);
}
}
Object.defineProperty(MangledIntersectionObserver, 'name', { value: 'IntersectionObserver', writable: false });
window.IntersectionObserver = MangledIntersectionObserver;
function simulateScroll() {
for (const [observer, targets] of liveObservers.entries()) {
const t0 = performance.now();
for (const target of targets) {
const entry = createIntersectionObserverEntry(target, true, t0);
observer[kCallback]([entry], observer);
setTimeout(() => {
const t1 = performance.now();
const lastEntry = observer[kLastEntryMap].get(target);
if (!lastEntry) {
return;
}
const entry2 = { ...cloneIntersectionObserverEntry(lastEntry), time: t1 };
observer[kCallback]([entry2], observer);
});
}
}
}
window.simulateScroll = simulateScroll;
})();
`;
const MUTATION_IDLE_WATCH = `
(function () {
let timeout;
const sendMsg = ()=> {
document.dispatchEvent(new CustomEvent('mutationIdle'));
};
const cb = () => {
if (timeout) {
clearTimeout(timeout);
timeout = setTimeout(sendMsg, 200);
}
};
const mutationObserver = new MutationObserver(cb);
document.addEventListener('DOMContentLoaded', () => {
mutationObserver.observe(document.documentElement, {
childList: true,
subtree: true,
});
timeout = setTimeout(sendMsg, 200);
}, { once: true })
})();
`;
const SCRIPT_TO_INJECT_INTO_FRAME = `
${SIMULATE_SCROLL}
${MUTATION_IDLE_WATCH}
(${minimalStealth.toString()})();
(function(){
let lastMutationIdle = 0;
let initialAnalytics;
document.addEventListener('mutationIdle', ()=> lastMutationIdle = Date.now());
function waitForSelector(selectorText) {
return new Promise((resolve) => {
const existing = document.querySelector(selectorText);
if (existing) {
resolve(existing);
return;
}
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', () => {
const observer = new MutationObserver(() => {
const elem = document.querySelector(selectorText);
if (elem) {
resolve(document.querySelector(selectorText));
observer.disconnect();
}
});
observer.observe(document.documentElement, {
childList: true,
subtree: true
});
});
return;
}
const observer = new MutationObserver(() => {
const elem = document.querySelector(selectorText);
if (elem) {
resolve(document.querySelector(selectorText));
observer.disconnect();
}
});
observer.observe(document.documentElement, {
childList: true,
subtree: true
});
});
}
window.waitForSelector = waitForSelector;
})();
`;
@singleton()
export class SERPSpecializedPuppeteerControl extends AsyncService {
_sn = 0;
browser!: Browser;
logger = this.globalLogger.child({ service: this.constructor.name });
__loadedPage: Page[] = [];
finalizerMap = new WeakMap<Page, ReturnType<typeof setTimeout>>();
snMap = new WeakMap<Page, number>();
livePages = new Set<Page>();
lastPageCratedAt: number = 0;
ua: string = '';
protected _REPORT_FUNCTION_NAME = 'bingo';
lifeCycleTrack = new WeakMap();
constructor(
protected globalLogger: GlobalLogger,
protected asyncLocalContext: AsyncLocalContext,
protected curlControl: CurlControl,
protected blackHoleDetector: BlackHoleDetector,
) {
super(...arguments);
this.setMaxListeners(Infinity);
let crippledTimes = 0;
this.on('crippled', () => {
crippledTimes += 1;
this.__loadedPage.length = 0;
this.livePages.clear();
if (crippledTimes > 5) {
process.nextTick(() => {
this.emit('error', new Error('Browser crashed too many times, quitting...'));
// process.exit(1);
});
}
});
}
override async init() {
await this.dependencyReady();
if (process.env.NODE_ENV?.includes('dry-run')) {
this.emit('ready');
return;
}
if (this.browser) {
if (this.browser.connected) {
await this.browser.close();
} else {
this.browser.process()?.kill('SIGKILL');
}
}
this.browser = await puppeteer.launch({
timeout: 10_000,
headless: !Boolean(process.env.DEBUG_BROWSER),
executablePath: process.env.OVERRIDE_CHROME_EXECUTABLE_PATH,
args: [
'--disable-dev-shm-usage', '--disable-blink-features=AutomationControlled'
]
}).catch((err: any) => {
this.logger.error(`Unknown firebase issue, just die fast.`, { err });
process.nextTick(() => {
this.emit('error', err);
// process.exit(1);
});
return Promise.reject(err);
});
this.browser.once('disconnected', () => {
this.logger.warn(`Browser disconnected`);
if (this.browser) {
this.emit('crippled');
}
process.nextTick(() => this.serviceReady());
});
this.ua = await this.browser.userAgent();
this.logger.info(`Browser launched: ${this.browser.process()?.pid}, ${this.ua}`);
this.curlControl.impersonateChrome(this.ua.replace(/Headless/i, ''));
await this.newPage('beware_deadlock').then((r) => this.__loadedPage.push(r));
this.emit('ready');
}
async newPage<T>(bewareDeadLock: any = false) {
if (!bewareDeadLock) {
await this.serviceReady();
}
const sn = this._sn++;
let page;
try {
const dedicatedContext = await this.browser.createBrowserContext();
page = await dedicatedContext.newPage();
} catch (err: any) {
this.logger.warn(`Failed to create page ${sn}`, { err });
this.browser.process()?.kill('SIGKILL');
throw new ServiceNodeResourceDrainError(`This specific worker node failed to open a new page, try again.`);
}
const preparations = [];
preparations.push(page.setUserAgent(this.ua.replace(/Headless/i, '')));
// preparations.push(page.setUserAgent(`Slackbot-LinkExpanding 1.0 (+https://api.slack.com/robots)`));
// preparations.push(page.setUserAgent(`Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; GPTBot/1.0; +https://openai.com/gptbot)`));
preparations.push(page.setBypassCSP(true));
preparations.push(page.setViewport({ width: 1024, height: 1024 }));
preparations.push(page.exposeFunction(this._REPORT_FUNCTION_NAME, (thing: T) => {
page.emit(this._REPORT_FUNCTION_NAME, thing);
}));
preparations.push(page.exposeFunction('setViewport', (viewport: Viewport | null) => {
page.setViewport(viewport).catch(() => undefined);
}));
preparations.push(page.evaluateOnNewDocument(SCRIPT_TO_INJECT_INTO_FRAME));
await Promise.all(preparations);
this.snMap.set(page, sn);
this.logger.debug(`Page ${sn} created.`);
this.lastPageCratedAt = Date.now();
this.livePages.add(page);
return page;
}
async getNextPage() {
let thePage: Page | undefined;
if (this.__loadedPage.length) {
thePage = this.__loadedPage.shift();
if (this.__loadedPage.length <= 1) {
process.nextTick(() => {
this.newPage()
.then((r) => this.__loadedPage.push(r))
.catch((err) => {
this.logger.warn(`Failed to load new page ahead of time`, { err });
});
});
}
}
if (!thePage) {
thePage = await this.newPage();
}
const timer = setTimeout(() => {
this.logger.warn(`Page is not allowed to live past 5 minutes, ditching page ${this.snMap.get(thePage!)}...`);
this.ditchPage(thePage!);
}, 300 * 1000);
this.finalizerMap.set(thePage, timer);
return thePage;
}
async ditchPage(page: Page) {
if (this.finalizerMap.has(page)) {
clearTimeout(this.finalizerMap.get(page)!);
this.finalizerMap.delete(page);
}
if (page.isClosed()) {
return;
}
const sn = this.snMap.get(page);
this.logger.debug(`Closing page ${sn}`);
await Promise.race([
(async () => {
const ctx = page.browserContext();
try {
await page.close();
} finally {
await ctx.close();
}
})(),
delay(5000)
]).catch((err) => {
this.logger.error(`Failed to destroy page ${sn}`, { err });
});
this.livePages.delete(page);
}
async controlledScrap<T>(parsedUrl: URL, func: (this: void) => Promise<T>, options: ScrappingOptions = {}): Promise<T> {
// parsedUrl.search = '';
const url = parsedUrl.toString();
const page = await this.getNextPage();
this.lifeCycleTrack.set(page, this.asyncLocalContext.ctx);
page.on('response', (_resp) => {
this.blackHoleDetector.itWorked();
});
page.on('request', async (req) => {
if (req.isInterceptResolutionHandled()) {
return;
};
const reqUrlParsed = new URL(req.url());
if (!reqUrlParsed.protocol.startsWith('http')) {
const overrides = req.continueRequestOverrides();
return req.continue(overrides, 0);
}
const typ = req.resourceType();
if (typ === 'media' || typ === 'font' || typ === 'image' || typ === 'stylesheet') {
// Non-cooperative answer to block all media requests.
return req.abort('blockedbyclient');
}
if (!options.proxyResources) {
const isDocRequest = ['document', 'xhr', 'fetch', 'websocket', 'prefetch', 'eventsource', 'ping'].includes(typ);
if (!isDocRequest) {
if (options.extraHeaders) {
const overrides = req.continueRequestOverrides();
const continueArgs = [{
...overrides,
headers: {
...req.headers(),
...overrides?.headers,
...options.extraHeaders,
}
}, 1] as const;
return req.continue(continueArgs[0], continueArgs[1]);
}
const overrides = req.continueRequestOverrides();
return req.continue(overrides, 0);
}
}
const sideload = options.sideLoad;
const impersonate = sideload?.impersonate[reqUrlParsed.href];
if (impersonate) {
let body;
if (impersonate.body) {
body = await readFile(await impersonate.body.filePath);
if (req.isInterceptResolutionHandled()) {
return;
}
}
return req.respond({
status: impersonate.status,
headers: impersonate.headers,
contentType: impersonate.contentType,
body: body ? Uint8Array.from(body) : undefined,
}, 999);
}
const proxy = options.proxyUrl || sideload?.proxyOrigin?.[reqUrlParsed.origin];
const ctx = this.lifeCycleTrack.get(page);
if (proxy && ctx) {
return await this.asyncLocalContext.bridge(ctx, async () => {
try {
const curled = await this.curlControl.sideLoad(reqUrlParsed, {
...options,
method: req.method(),
body: req.postData(),
extraHeaders: {
...req.headers(),
...options.extraHeaders,
},
proxyUrl: proxy
});
if (req.isInterceptResolutionHandled()) {
return;
};
if (curled.chain.length === 1) {
if (!curled.file) {
return req.respond({
status: curled.status,
headers: _.omit(curled.headers, 'result'),
contentType: curled.contentType,
}, 3);
}
const body = await readFile(await curled.file.filePath);
if (req.isInterceptResolutionHandled()) {
return;
};
return req.respond({
status: curled.status,
headers: _.omit(curled.headers, 'result'),
contentType: curled.contentType,
body: Uint8Array.from(body),
}, 3);
}
options.sideLoad ??= curled.sideLoadOpts;
_.merge(options.sideLoad, curled.sideLoadOpts);
const firstReq = curled.chain[0];
return req.respond({
status: firstReq.result!.code,
headers: _.omit(firstReq, 'result'),
}, 3);
} catch (err: any) {
this.logger.warn(`Failed to sideload browser request ${reqUrlParsed.origin}`, { href: reqUrlParsed.href, err, proxy });
}
if (req.isInterceptResolutionHandled()) {
return;
};
const overrides = req.continueRequestOverrides();
const continueArgs = [{
...overrides,
headers: {
...req.headers(),
...overrides?.headers,
...options.extraHeaders,
}
}, 1] as const;
return req.continue(continueArgs[0], continueArgs[1]);
});
}
if (req.isInterceptResolutionHandled()) {
return;
};
const overrides = req.continueRequestOverrides();
const continueArgs = [{
...overrides,
headers: {
...req.headers(),
...overrides?.headers,
...options.extraHeaders,
}
}, 1] as const;
return req.continue(continueArgs[0], continueArgs[1]);
});
await page.setRequestInterception(true);
const sn = this.snMap.get(page);
this.logger.info(`Page ${sn}: Scraping ${url}`, { url });
await page.evaluateOnNewDocument(`(function () {
if (window.top !== window.self) {
return;
}
const func = ${func.toString()};
func().then((result) => {
window.${this._REPORT_FUNCTION_NAME}({data: result});
}).catch((err) => {
window.${this._REPORT_FUNCTION_NAME}({err: err});
});
})();`);
if (options.locale) {
// Add headers via request interception to walk around this bug
// https://github.com/puppeteer/puppeteer/issues/10235
// await page.setExtraHTTPHeaders({
// 'Accept-Language': options.locale
// });
await page.evaluateOnNewDocument(() => {
Object.defineProperty(navigator, "language", {
get: function () {
return options.locale;
}
});
Object.defineProperty(navigator, "languages", {
get: function () {
return [options.locale];
}
});
});
}
if (options.cookies) {
const mapped = options.cookies.map((x) => {
const draft: CookieParam = {
name: x.name,
value: encodeURIComponent(x.value),
secure: x.secure,
domain: x.domain,
path: x.path,
expires: x.expires ? Math.floor(x.expires.valueOf() / 1000) : undefined,
sameSite: x.sameSite as any,
};
if (!draft.expires && x.maxAge) {
draft.expires = Math.floor(Date.now() / 1000) + x.maxAge;
}
if (!draft.domain) {
draft.url = parsedUrl.toString();
}
return draft;
});
try {
await page.setCookie(...mapped);
} catch (err: any) {
this.logger.warn(`Page ${sn}: Failed to set cookies`, { err });
throw new ParamValidationError({
path: 'cookies',
message: `Failed to set cookies: ${err?.message}`
});
}
}
if (options.overrideUserAgent) {
await page.setUserAgent(options.overrideUserAgent);
}
if (options.viewport) {
await page.setViewport(options.viewport);
}
const resultDeferred = Defer<T>();
const crippleListener = () => resultDeferred.reject(new ServiceCrashedError({ message: `Browser crashed, try again` }));
this.once('crippled', crippleListener);
resultDeferred.promise.finally(() => {
this.off('crippled', crippleListener);
});
const hdl = (s: {
err?: any;
data?: T;
}) => {
if (s.err) {
resultDeferred.reject(s.err);
}
resultDeferred.resolve(s.data);
};
page.on(this._REPORT_FUNCTION_NAME, hdl as any);
page.once('abuse', (event: any) => {
this.emit('abuse', { ...event, url: parsedUrl });
resultDeferred.reject(
new SecurityCompromiseError(`Abuse detected: ${event.reason}`)
);
});
const timeout = options.timeoutMs || 30_000;
const goToOptions: GoToOptions = {
waitUntil: ['load', 'domcontentloaded', 'networkidle0'],
timeout,
};
if (options.referer) {
goToOptions.referer = options.referer;
}
const gotoPromise = page.goto(url, goToOptions)
.catch((err) => {
if (err instanceof TimeoutError) {
this.logger.warn(`Page ${sn}: Browsing of ${url} timed out`, { err });
return new AssertionFailureError({
message: `Failed to goto ${url}: ${err}`,
cause: err,
});
}
this.logger.warn(`Page ${sn}: Browsing of ${url} aborted`, { err });
return undefined;
}).then(async (r) => {
await delay(5000);
resultDeferred.reject(new TimeoutError(`Control function did not respond in time`));
return r;
});
try {
await Promise.race([resultDeferred.promise, gotoPromise]);
return resultDeferred.promise;
} finally {
page.off(this._REPORT_FUNCTION_NAME, hdl as any);
this.ditchPage(page);
resultDeferred.resolve();
}
}
}
const puppeteerControl = container.resolve(SERPSpecializedPuppeteerControl);
export default puppeteerControl;

165
src/services/serp/serper.ts Normal file
View File

@ -0,0 +1,165 @@
import { singleton } from 'tsyringe';
import { GlobalLogger } from '../logger';
import { SecretExposer } from '../../shared/services/secrets';
import { AsyncLocalContext } from '../async-context';
import { SerperBingHTTP, SerperGoogleHTTP, SerperImageSearchResponse, SerperNewsSearchResponse, SerperSearchQueryParams, SerperWebSearchResponse } from '../../shared/3rd-party/serper-search';
import { BlackHoleDetector } from '../blackhole-detector';
import { Context } from '../registry';
import { AsyncService } from 'civkit/async-service';
import { AutoCastable, Prop, RPC_CALL_ENVIRONMENT } from 'civkit/civ-rpc';
@singleton()
export class SerperGoogleSearchService extends AsyncService {
logger = this.globalLogger.child({ service: this.constructor.name });
client!: SerperGoogleHTTP;
constructor(
protected globalLogger: GlobalLogger,
protected secretExposer: SecretExposer,
protected threadLocal: AsyncLocalContext,
protected blackHoleDetector: BlackHoleDetector,
) {
super(...arguments);
}
override async init() {
await this.dependencyReady();
this.emit('ready');
this.client = new SerperGoogleHTTP(this.secretExposer.SERPER_SEARCH_API_KEY);
}
doSearch(variant: 'web', query: SerperSearchQueryParams): Promise<SerperWebSearchResponse['organic']>;
doSearch(variant: 'images', query: SerperSearchQueryParams): Promise<SerperImageSearchResponse['images']>;
doSearch(variant: 'news', query: SerperSearchQueryParams): Promise<SerperNewsSearchResponse['news']>;
async doSearch(variant: 'web' | 'images' | 'news', query: SerperSearchQueryParams) {
this.logger.debug(`Doing external search`, query);
let results;
switch (variant) {
case 'images': {
const r = await this.client.imageSearch(query);
results = r.parsed.images;
break;
}
case 'news': {
const r = await this.client.newsSearch(query);
results = r.parsed.news;
break;
}
case 'web':
default: {
const r = await this.client.webSearch(query);
results = r.parsed.organic;
break;
}
}
this.blackHoleDetector.itWorked();
return results;
}
async webSearch(query: SerperSearchQueryParams) {
return this.doSearch('web', query);
}
async imageSearch(query: SerperSearchQueryParams) {
return this.doSearch('images', query);
}
async newsSearch(query: SerperSearchQueryParams) {
return this.doSearch('news', query);
}
}
@singleton()
export class SerperBingSearchService extends SerperGoogleSearchService {
override client!: SerperBingHTTP;
override async init() {
await this.dependencyReady();
this.emit('ready');
this.client = new SerperBingHTTP(this.secretExposer.SERPER_SEARCH_API_KEY);
}
}
export class GoogleSearchExplicitOperatorsDto extends AutoCastable {
@Prop({
arrayOf: String,
desc: `Returns web pages with a specific file extension. Example: to find the Honda GX120 Owners manual in PDF, type “Honda GX120 ownners manual ext:pdf”.`
})
ext?: string | string[];
@Prop({
arrayOf: String,
desc: `Returns web pages created in the specified file type. Example: to find a web page created in PDF format about the evaluation of age-related cognitive changes, type “evaluation of age cognitive changes filetype:pdf”.`
})
filetype?: string | string[];
@Prop({
arrayOf: String,
desc: `Returns webpages containing the specified term in the title of the page. Example: to find pages about SEO conferences making sure the results contain 2023 in the title, type “seo conference intitle:2023”.`
})
intitle?: string | string[];
@Prop({
arrayOf: String,
desc: `Returns web pages written in the specified language. The language code must be in the ISO 639-1 two-letter code format. Example: to find information on visas only in Spanish, type “visas lang:es”.`
})
loc?: string | string[];
@Prop({
arrayOf: String,
desc: `Returns web pages coming only from a specific web site. Example: to find information about Goggles only on Brave pages, type “goggles site:brave.com”.`
})
site?: string | string[];
addTo(searchTerm: string) {
const chunks = [];
for (const [key, value] of Object.entries(this)) {
if (value) {
const values = Array.isArray(value) ? value : [value];
const textValue = values.map((v) => `${key}:${v}`).join(' OR ');
if (textValue) {
chunks.push(textValue);
}
}
}
const opPart = chunks.length > 1 ? chunks.map((x) => `(${x})`).join(' AND ') : chunks;
if (opPart.length) {
return [searchTerm, opPart].join(' ');
}
return searchTerm;
}
static override from(input: any) {
const instance = super.from(input) as GoogleSearchExplicitOperatorsDto;
const ctx = Reflect.get(input, RPC_CALL_ENVIRONMENT) as Context | undefined;
const params = ['ext', 'filetype', 'intitle', 'loc', 'site'];
for (const p of params) {
const customValue = ctx?.get(`x-${p}`) || ctx?.get(`${p}`);
if (!customValue) {
continue;
}
const filtered = customValue.split(', ').filter(Boolean);
if (filtered.length) {
Reflect.set(instance, p, filtered);
}
}
return instance;
}
}

160
src/stand-alone/serp.ts Normal file
View File

@ -0,0 +1,160 @@
import 'reflect-metadata';
import { container, singleton } from 'tsyringe';
import { KoaServer } from 'civkit/civ-rpc/koa';
import http2 from 'http2';
import http from 'http';
import { FsWalk, WalkOutEntity } from 'civkit/fswalk';
import path from 'path';
import fs from 'fs';
import { mimeOfExt } from 'civkit/mime';
import { Context, Next } from 'koa';
import { RPCRegistry } from '../services/registry';
import { AsyncResource } from 'async_hooks';
import { runOnce } from 'civkit/decorators';
import { randomUUID } from 'crypto';
import { ThreadedServiceRegistry } from '../services/threaded';
import { GlobalLogger } from '../services/logger';
import { AsyncLocalContext } from '../services/async-context';
import finalizer, { Finalizer } from '../services/finalizer';
import { SerpHost } from '../api/serp';
@singleton()
export class SERPStandAloneServer extends KoaServer {
logger = this.globalLogger.child({ service: this.constructor.name });
httpAlternativeServer?: typeof this['httpServer'];
assets = new Map<string, WalkOutEntity>();
constructor(
protected globalLogger: GlobalLogger,
protected registry: RPCRegistry,
protected serpHost: SerpHost,
protected threadLocal: AsyncLocalContext,
protected threads: ThreadedServiceRegistry,
) {
super(...arguments);
}
h2c() {
this.httpAlternativeServer = this.httpServer;
const fn = this.koaApp.callback();
this.httpServer = http2.createServer((req, res) => {
const ar = new AsyncResource('HTTP2ServerRequest');
ar.runInAsyncScope(fn, this.koaApp, req, res);
});
// useResourceBasedDefaultTracker();
return this;
}
override async init() {
await this.walkForAssets();
await this.dependencyReady();
for (const [k, v] of this.registry.conf.entries()) {
if (v.tags?.includes('crawl')) {
this.registry.conf.delete(k);
}
}
await super.init();
}
async walkForAssets() {
const files = await FsWalk.walkOut(path.resolve(__dirname, '..', '..', 'public'));
for (const file of files) {
if (file.type !== 'file') {
continue;
}
this.assets.set(file.relativePath.toString(), file);
}
}
override listen(port: number) {
const r = super.listen(port);
if (this.httpAlternativeServer) {
const altPort = port + 1;
this.httpAlternativeServer.listen(altPort, () => {
this.logger.info(`Alternative ${this.httpAlternativeServer!.constructor.name} listening on port ${altPort}`);
});
}
return r;
}
makeAssetsServingController() {
return (ctx: Context, next: Next) => {
const requestPath = ctx.path;
const file = requestPath.slice(1);
if (!file) {
return next();
}
const asset = this.assets.get(file);
if (asset?.type !== 'file') {
return next();
}
ctx.body = fs.createReadStream(asset.path);
ctx.type = mimeOfExt(path.extname(asset.path.toString())) || 'application/octet-stream';
ctx.set('Content-Length', asset.stats.size.toString());
return;
};
}
registerRoutes(): void {
this.koaApp.use(this.makeAssetsServingController());
this.koaApp.use(this.registry.makeShimController());
}
// Using h2c server has an implication that multiple requests may share the same connection and x-cloud-trace-context
// TraceId is expected to be request-bound and unique. So these two has to be distinguished.
@runOnce()
override insertAsyncHookMiddleware() {
const asyncHookMiddleware = async (ctx: Context, next: () => Promise<void>) => {
const googleTraceId = ctx.get('x-cloud-trace-context').split('/')?.[0];
this.threadLocal.setup({
traceId: randomUUID(),
traceT0: new Date(),
googleTraceId,
});
return next();
};
this.koaApp.use(asyncHookMiddleware);
}
@Finalizer()
override async standDown() {
const tasks: Promise<any>[] = [];
if (this.httpAlternativeServer?.listening) {
(this.httpAlternativeServer as http.Server).closeIdleConnections?.();
this.httpAlternativeServer.close();
tasks.push(new Promise<void>((resolve, reject) => {
this.httpAlternativeServer!.close((err) => {
if (err) {
return reject(err);
}
resolve();
});
}));
}
tasks.push(super.standDown());
await Promise.all(tasks);
}
}
const instance = container.resolve(SERPStandAloneServer);
export default instance;
if (process.env.NODE_ENV?.includes('dry-run')) {
instance.serviceReady().then(() => finalizer.terminate());
} else {
instance.serviceReady().then((s) => s.h2c().listen(parseInt(process.env.PORT || '') || 3000));
}

@ -1 +1 @@
Subproject commit 2f45bd58ddfc007d04dfdb9cb0814d74dc25e3f3
Subproject commit ca09ea8fcbb84aeea4eb8015bf8e98eef1813048