diff --git a/src/services/puppeteer.ts b/src/services/puppeteer.ts index 07f3007..b9d06af 100644 --- a/src/services/puppeteer.ts +++ b/src/services/puppeteer.ts @@ -1,21 +1,26 @@ +import _ from 'lodash'; +import { isIP } from 'net'; +import { readFile } from 'fs/promises'; import fs from 'fs'; import { container, singleton } from 'tsyringe'; -import { AsyncService, Defer, marshalErrorLike, AssertionFailureError, delay, Deferred, perNextTick, ParamValidationError, FancyFile } from 'civkit'; -import { GlobalLogger } from './logger'; -import type { Browser, CookieParam, GoToOptions, HTTPResponse, Page, Viewport } from 'puppeteer'; +import type { Browser, CookieParam, GoToOptions, HTTPRequest, HTTPResponse, Page, Viewport } from 'puppeteer'; import type { Cookie } from 'set-cookie-parser'; import puppeteer from 'puppeteer-extra'; +import { TimeoutError } from 'puppeteer'; + +import { Defer, Deferred } from 'civkit/defer'; +import { AssertionFailureError, ParamValidationError } from 'civkit/civ-rpc'; +import { AsyncService } from 'civkit/async-service'; +import { FancyFile } from 'civkit/fancy-file'; +import { delay } from 'civkit/timeout'; import puppeteerBlockResources from 'puppeteer-extra-plugin-block-resources'; import { SecurityCompromiseError, ServiceCrashedError, ServiceNodeResourceDrainError } from '../shared/lib/errors'; -import { TimeoutError } from 'puppeteer'; -import _ from 'lodash'; -import { isIP } from 'net'; import { CurlControl } from './curl'; -import { readFile } from 'fs/promises'; import { BlackHoleDetector } from './blackhole-detector'; import { AsyncLocalContext } from './async-context'; +import { GlobalLogger } from './logger'; const tldExtract = require('tld-extract'); const READABILITY_JS = fs.readFileSync(require.resolve('@mozilla/readability/Readability.js'), 'utf-8'); @@ -440,6 +445,39 @@ window.briefImgs = briefImgs; })(); `; +class PageReqCtrlKit { + reqSet: Set = new Set(); + blockers: Deferred[] = []; + + constructor( + public concurrency: number, + ) { + if (isNaN(concurrency) || concurrency < 1) { + throw new AssertionFailureError(`Invalid concurrency: ${concurrency}`); + } + } + + onNewRequest(req: HTTPRequest) { + this.reqSet.add(req); + if (this.reqSet.size <= this.concurrency) { + return Promise.resolve(); + } + const deferred = Defer(); + this.blockers.push(deferred); + + return deferred.promise; + } + + onFinishRequest(req: HTTPRequest) { + this.reqSet.delete(req); + if (this.reqSet.size > this.concurrency) { + return; + } + const deferred = this.blockers.shift(); + deferred?.resolve(); + } +} + @singleton() export class PuppeteerControl extends AsyncService { @@ -447,8 +485,6 @@ export class PuppeteerControl extends AsyncService { browser!: Browser; logger = this.globalLogger.child({ service: this.constructor.name }); - private __reqCapInterval?: NodeJS.Timeout; - __loadedPage: Page[] = []; finalizerMap = new WeakMap>(); @@ -458,9 +494,10 @@ export class PuppeteerControl extends AsyncService { lastPageCratedAt: number = 0; ua: string = ''; - rpsCap: number = 500; + concurrentRequestsPerPage: number = 16; + pageReqCtrl = new WeakMap(); + lastReqSentAt: number = 0; - requestDeferredQueue: Deferred[] = []; circuitBreakerHosts: Set = new Set(); @@ -490,10 +527,6 @@ export class PuppeteerControl extends AsyncService { } override async init() { - if (this.__reqCapInterval) { - clearInterval(this.__reqCapInterval); - this.__reqCapInterval = undefined; - } await this.dependencyReady(); if (process.env.NODE_ENV?.includes('dry-run')) { this.emit('ready'); @@ -536,22 +569,14 @@ export class PuppeteerControl extends AsyncService { this.emit('ready'); } - @perNextTick() - reqCapRoutine() { - const now = Date.now(); - const numToPass = Math.round((now - this.lastReqSentAt) / 1000 * this.rpsCap); - this.requestDeferredQueue.splice(0, numToPass).forEach((x) => x.resolve(true)); - if (numToPass) { - this.lastReqSentAt = now; - } - if (!this.requestDeferredQueue.length) { - if (this.__reqCapInterval) { - clearInterval(this.__reqCapInterval); - this.__reqCapInterval = undefined; - } - } else if (!this.__reqCapInterval) { - this.__reqCapInterval = setInterval(() => this.reqCapRoutine(), 1000 / this.rpsCap).unref(); + protected getRpsControlKit(page: Page) { + let kit = this.pageReqCtrl.get(page); + if (!kit) { + kit = new PageReqCtrlKit(this.concurrentRequestsPerPage); + this.pageReqCtrl.set(page, kit); } + + return kit; } async newPage(bewareDeadLock: any = false) { @@ -564,7 +589,7 @@ export class PuppeteerControl extends AsyncService { const dedicatedContext = await this.browser.createBrowserContext(); page = await dedicatedContext.newPage(); } catch (err: any) { - this.logger.warn(`Failed to create page ${sn}`, { err: marshalErrorLike(err) }); + this.logger.warn(`Failed to create page ${sn}`, { err }); this.browser.process()?.kill('SIGKILL'); throw new ServiceNodeResourceDrainError(`This specific worker node failed to open a new page, try again.`); } @@ -661,10 +686,8 @@ export class PuppeteerControl extends AsyncService { } if (requestUrl.startsWith('http')) { - const d = Defer(); - this.requestDeferredQueue.push(d); - this.reqCapRoutine(); - await d.promise; + const kit = this.getRpsControlKit(page); + await kit.onNewRequest(req); } if (req.isInterceptResolutionHandled()) { @@ -677,6 +700,13 @@ export class PuppeteerControl extends AsyncService { return req.continue(continueArgs[0], continueArgs[1]); }); + const reqFinishHandler = (req: HTTPRequest) => { + const kit = this.getRpsControlKit(page); + kit.onFinishRequest(req); + }; + page.on('requestfinished', reqFinishHandler); + page.on('requestfailed', reqFinishHandler); + page.on('requestservedfromcache', reqFinishHandler); await page.evaluateOnNewDocument(` (function () { @@ -721,7 +751,7 @@ export class PuppeteerControl extends AsyncService { this.newPage() .then((r) => this.__loadedPage.push(r)) .catch((err) => { - this.logger.warn(`Failed to load new page ahead of time`, { err: marshalErrorLike(err) }); + this.logger.warn(`Failed to load new page ahead of time`, { err }); }); } } @@ -761,7 +791,7 @@ export class PuppeteerControl extends AsyncService { })(), delay(5000) ]).catch((err) => { - this.logger.error(`Failed to destroy page ${sn}`, { err: marshalErrorLike(err) }); + this.logger.error(`Failed to destroy page ${sn}`, { err }); }); this.livePages.delete(page); this.pagePhase.delete(page); @@ -997,7 +1027,7 @@ export class PuppeteerControl extends AsyncService { try { await page.setCookie(...mapped); } catch (err: any) { - this.logger.warn(`Page ${sn}: Failed to set cookies`, { err: marshalErrorLike(err) }); + this.logger.warn(`Page ${sn}: Failed to set cookies`, { err }); throw new ParamValidationError({ path: 'cookies', message: `Failed to set cookies: ${err?.message}` @@ -1062,7 +1092,7 @@ export class PuppeteerControl extends AsyncService { const gotoPromise = page.goto(url, goToOptions) .catch((err) => { if (err instanceof TimeoutError) { - this.logger.warn(`Page ${sn}: Browsing of ${url} timed out`, { err: marshalErrorLike(err) }); + this.logger.warn(`Page ${sn}: Browsing of ${url} timed out`, { err }); return new AssertionFailureError({ message: `Failed to goto ${url}: ${err}`, cause: err, @@ -1075,7 +1105,7 @@ export class PuppeteerControl extends AsyncService { } } - this.logger.warn(`Page ${sn}: Browsing of ${url} failed`, { err: marshalErrorLike(err) }); + this.logger.warn(`Page ${sn}: Browsing of ${url} failed`, { err }); return new AssertionFailureError({ message: `Failed to goto ${url}: ${err}`, cause: err, @@ -1126,7 +1156,7 @@ export class PuppeteerControl extends AsyncService { // } // } // } catch (err: any) { - // this.logger.warn(`Page ${sn}: Failed to salvage ${url}`, { err: marshalErrorLike(err) }); + // this.logger.warn(`Page ${sn}: Failed to salvage ${url}`, { err }); // } finalized = true; @@ -1166,7 +1196,7 @@ export class PuppeteerControl extends AsyncService { finalized = true; }) .catch((err) => { - this.logger.warn(`Page ${sn}: Failed to wait for selector ${options.waitForSelector}`, { err: marshalErrorLike(err) }); + this.logger.warn(`Page ${sn}: Failed to wait for selector ${options.waitForSelector}`, { err }); waitForPromise = undefined; }); return p as any; @@ -1243,7 +1273,7 @@ export class PuppeteerControl extends AsyncService { // } // await page.goto(googleArchiveUrl, { waitUntil: ['load', 'domcontentloaded', 'networkidle0'], timeout: 15_000 }).catch((err) => { - // this.logger.warn(`Page salvation did not fully succeed.`, { err: marshalErrorLike(err) }); + // this.logger.warn(`Page salvation did not fully succeed.`, { err }); // }); // this.logger.info(`Salvation completed.`);