From df71c9a534b35dd2aa3c19a8774baacfe3f3dac8 Mon Sep 17 00:00:00 2001 From: Yanlong Wang Date: Mon, 20 May 2024 01:12:22 +0800 Subject: [PATCH] fix: stop using pool --- backend/functions/package-lock.json | 2 +- backend/functions/package.json | 1 - backend/functions/src/services/puppeteer.ts | 143 +++++++++++++------- 3 files changed, 96 insertions(+), 50 deletions(-) diff --git a/backend/functions/package-lock.json b/backend/functions/package-lock.json index 4f46e9a..a87bde6 100644 --- a/backend/functions/package-lock.json +++ b/backend/functions/package-lock.json @@ -20,7 +20,6 @@ "express": "^4.19.2", "firebase-admin": "^12.1.0", "firebase-functions": "^4.9.0", - "generic-pool": "^3.9.0", "htmlparser2": "^9.0.0", "jose": "^5.1.0", "jsdom": "^24.0.0", @@ -5796,6 +5795,7 @@ "version": "3.9.0", "resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz", "integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==", + "devOptional": true, "engines": { "node": ">= 4" } diff --git a/backend/functions/package.json b/backend/functions/package.json index 800e061..fc146b5 100644 --- a/backend/functions/package.json +++ b/backend/functions/package.json @@ -40,7 +40,6 @@ "express": "^4.19.2", "firebase-admin": "^12.1.0", "firebase-functions": "^4.9.0", - "generic-pool": "^3.9.0", "htmlparser2": "^9.0.0", "jose": "^5.1.0", "jsdom": "^24.0.0", diff --git a/backend/functions/src/services/puppeteer.ts b/backend/functions/src/services/puppeteer.ts index b472f38..148eab4 100644 --- a/backend/functions/src/services/puppeteer.ts +++ b/backend/functions/src/services/puppeteer.ts @@ -1,7 +1,6 @@ import os from 'os'; import fs from 'fs'; import { container, singleton } from 'tsyringe'; -import genericPool from 'generic-pool'; import { AsyncService, Defer, marshalErrorLike, AssertionFailureError, delay, maxConcurrency } from 'civkit'; import { Logger } from '../shared/services/logger'; import { JSDOM } from 'jsdom'; @@ -76,43 +75,32 @@ puppeteer.use(puppeteerPageProxy({ @singleton() export class PuppeteerControl extends AsyncService { + _sn = 0; browser!: Browser; logger = this.globalLogger.child({ service: this.constructor.name }); - pagePool = genericPool.createPool({ - create: async () => { - const page = await this.newPage(); - return page; - }, - destroy: async (page) => { - await Promise.race([ - (async () => { - const ctx = page.browserContext(); - await page.close(); - await ctx.close(); - })(), delay(5000) - ]).catch((err) => { - this.logger.error(`Failed to destroy page`, { err: marshalErrorLike(err) }); - }); - }, - validate: async (page) => { - return page.browser().connected && !page.isClosed(); - } - }, { - max: Math.max(1 + Math.floor(os.totalmem() / (256 * 1024 * 1024)), 16), - min: 1, - acquireTimeoutMillis: 60_000, - testOnBorrow: true, - testOnReturn: true, - autostart: false, - priorityRange: 3 - }); - private __healthCheckInterval?: NodeJS.Timeout; + __loadedPage: Page[] = []; + + finalizerMap = new WeakMap>(); + snMap = new WeakMap(); + livePages = new Set(); + lastPageCratedAt: number = 0; + constructor(protected globalLogger: Logger) { super(...arguments); - this.setMaxListeners(2 * this.pagePool.max + 1); + this.setMaxListeners(2 * Math.floor(os.totalmem() / (256 * 1024 * 1024)) + 1); 148 - 95; + + this.on('crippled', () => { + this.__loadedPage.length = 0; + this.livePages.clear(); + }); + } + + briefPages() { + this.logger.info(`Status: ${this.livePages.size} pages alive: ${Array.from(this.livePages).map((x) => this.snMap.get(x)).sort().join(', ')}; ${this.__loadedPage.length} idle pages: ${this.__loadedPage.map((x) => this.snMap.get(x)).sort().join(', ')}`); + this.logger.info(``); } override async init() { @@ -121,8 +109,6 @@ export class PuppeteerControl extends AsyncService { this.__healthCheckInterval = undefined; } await this.dependencyReady(); - this.logger.info(`PuppeteerControl initializing with pool size ${this.pagePool.max}`, { poolSize: this.pagePool.max }); - this.pagePool.start(); if (this.browser) { if (this.browser.connected) { @@ -151,24 +137,33 @@ export class PuppeteerControl extends AsyncService { this.emit('ready'); this.__healthCheckInterval = setInterval(() => this.healthCheck(), 30_000); + this.newPage().then((r) => this.__loadedPage.push(r)); } @maxConcurrency(1) async healthCheck() { - this.pagePool.max += 1; - const healthyPage = await this.pagePool.acquire(3).catch((err) => { + if (Date.now() - this.lastPageCratedAt <= 10_000) { + this.briefPages(); + return; + } + const healthyPage = await this.newPage().catch((err) => { this.logger.warn(`Health check failed`, { err: marshalErrorLike(err) }); return null; }); - this.pagePool.max -= 1; if (healthyPage) { - this.pagePool.release(healthyPage); + this.__loadedPage.push(healthyPage); + + if (this.__loadedPage.length > 3) { + this.ditchPage(this.__loadedPage.shift()!); + } + + this.briefPages(); + return; } this.logger.warn(`Trying to clean up...`); - await this.pagePool.clear(); this.browser.process()?.kill('SIGKILL'); Reflect.deleteProperty(this, 'browser'); this.emit('crippled'); @@ -178,7 +173,7 @@ export class PuppeteerControl extends AsyncService { async newPage() { await this.serviceReady(); const dedicatedContext = await this.browser.createBrowserContext(); - + const sn = this._sn++; const page = await dedicatedContext.newPage(); const preparations = []; @@ -300,18 +295,72 @@ document.addEventListener('readystatechange', handlePageLoad); document.addEventListener('load', handlePageLoad); `); + this.snMap.set(page, sn); + this.logger.warn(`Page ${sn} created.`); + this.lastPageCratedAt = Date.now(); + this.livePages.add(page); + return page; } + async getNextPage() { + let thePage; + if (this.__loadedPage.length) { + thePage = this.__loadedPage.shift(); + if (this.__loadedPage.length <= 1) { + this.newPage() + .then((r) => this.__loadedPage.push(r)) + .catch((err) => { + this.logger.warn(`Failed to load new page ahead of time`, { err: marshalErrorLike(err) }); + }); + } + } + + if (!thePage) { + thePage = await this.newPage(); + } + + const timer = setTimeout(() => { + this.logger.warn(`Page is not allowed to live past 5 minutes, ditching page ${this.snMap.get(thePage)}...`); + this.ditchPage(thePage); + }, 300 * 1000); + + this.finalizerMap.set(thePage, timer); + + return thePage; + } + + async ditchPage(page: Page) { + if (this.finalizerMap.has(page)) { + clearTimeout(this.finalizerMap.get(page)!); + this.finalizerMap.delete(page); + } + if (page.isClosed()) { + return; + } + const sn = this.snMap.get(page); + this.logger.info(`Closing page ${sn}`); + this.livePages.delete(page); + await Promise.race([ + (async () => { + const ctx = page.browserContext(); + await page.close(); + await ctx.close(); + })(), delay(5000) + ]).catch((err) => { + this.logger.error(`Failed to destroy page ${sn}`, { err: marshalErrorLike(err) }); + }); + } + async *scrap(parsedUrl: URL, options?: ScrappingOptions): AsyncGenerator { // parsedUrl.search = ''; const url = parsedUrl.toString(); - this.logger.info(`Scraping ${url}`, { url }); let snapshot: PageSnapshot | undefined; let screenshot: Buffer | undefined; - - const page = await this.pagePool.acquire(); + const page = await this.getNextPage(); + const sn = this.snMap.get(page); + this.logger.info(`Page ${sn}: Scraping ${url}`, { url }); if (options?.proxyUrl) { await page.useProxy(options.proxyUrl); } @@ -342,7 +391,7 @@ document.addEventListener('load', handlePageLoad); const gotoPromise = page.goto(url, { waitUntil: ['load', 'domcontentloaded', 'networkidle0'], timeout: 30_000 }) .catch((err) => { - this.logger.warn(`Browsing of ${url} did not fully succeed`, { err: marshalErrorLike(err) }); + this.logger.warn(`Page ${sn}: Browsing of ${url} did not fully succeed`, { err: marshalErrorLike(err) }); return Promise.reject(new AssertionFailureError({ message: `Failed to goto ${url}: ${err}`, cause: err, @@ -362,7 +411,7 @@ document.addEventListener('load', handlePageLoad); } } finalized = true; - this.logger.info(`Snapshot of ${url} done`, { url, title: snapshot?.title, href: snapshot?.href }); + this.logger.info(`Page ${sn}: Snapshot of ${url} done`, { url, title: snapshot?.title, href: snapshot?.href }); this.emit( 'crawled', { ...snapshot, screenshot }, @@ -378,7 +427,7 @@ document.addEventListener('load', handlePageLoad); nextSnapshotDeferred.resolve(snapshot); }) .catch((err) => { - this.logger.warn(`Failed to wait for selector ${options.waitForSelector}`, { err: marshalErrorLike(err) }); + this.logger.warn(`Page ${sn}: Failed to wait for selector ${options.waitForSelector}`, { err: marshalErrorLike(err) }); }); } @@ -401,9 +450,7 @@ document.addEventListener('load', handlePageLoad); } finally { gotoPromise.finally(() => { page.off('snapshot', hdl); - this.pagePool.destroy(page).catch((err) => { - this.logger.warn(`Failed to destroy page`, { err: marshalErrorLike(err) }); - }); + this.ditchPage(page); }); nextSnapshotDeferred.resolve(); }