fix: stop using pool

This commit is contained in:
Yanlong Wang 2024-05-20 01:12:22 +08:00
parent ba8ab88811
commit df71c9a534
No known key found for this signature in database
GPG Key ID: C0A623C0BADF9F37
3 changed files with 96 additions and 50 deletions

View File

@ -20,7 +20,6 @@
"express": "^4.19.2",
"firebase-admin": "^12.1.0",
"firebase-functions": "^4.9.0",
"generic-pool": "^3.9.0",
"htmlparser2": "^9.0.0",
"jose": "^5.1.0",
"jsdom": "^24.0.0",
@ -5796,6 +5795,7 @@
"version": "3.9.0",
"resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz",
"integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==",
"devOptional": true,
"engines": {
"node": ">= 4"
}

View File

@ -40,7 +40,6 @@
"express": "^4.19.2",
"firebase-admin": "^12.1.0",
"firebase-functions": "^4.9.0",
"generic-pool": "^3.9.0",
"htmlparser2": "^9.0.0",
"jose": "^5.1.0",
"jsdom": "^24.0.0",

View File

@ -1,7 +1,6 @@
import os from 'os';
import fs from 'fs';
import { container, singleton } from 'tsyringe';
import genericPool from 'generic-pool';
import { AsyncService, Defer, marshalErrorLike, AssertionFailureError, delay, maxConcurrency } from 'civkit';
import { Logger } from '../shared/services/logger';
import { JSDOM } from 'jsdom';
@ -76,43 +75,32 @@ puppeteer.use(puppeteerPageProxy({
@singleton()
export class PuppeteerControl extends AsyncService {
_sn = 0;
browser!: Browser;
logger = this.globalLogger.child({ service: this.constructor.name });
pagePool = genericPool.createPool({
create: async () => {
const page = await this.newPage();
return page;
},
destroy: async (page) => {
await Promise.race([
(async () => {
const ctx = page.browserContext();
await page.close();
await ctx.close();
})(), delay(5000)
]).catch((err) => {
this.logger.error(`Failed to destroy page`, { err: marshalErrorLike(err) });
});
},
validate: async (page) => {
return page.browser().connected && !page.isClosed();
}
}, {
max: Math.max(1 + Math.floor(os.totalmem() / (256 * 1024 * 1024)), 16),
min: 1,
acquireTimeoutMillis: 60_000,
testOnBorrow: true,
testOnReturn: true,
autostart: false,
priorityRange: 3
});
private __healthCheckInterval?: NodeJS.Timeout;
__loadedPage: Page[] = [];
finalizerMap = new WeakMap<Page, ReturnType<typeof setTimeout>>();
snMap = new WeakMap<Page, number>();
livePages = new Set<Page>();
lastPageCratedAt: number = 0;
constructor(protected globalLogger: Logger) {
super(...arguments);
this.setMaxListeners(2 * this.pagePool.max + 1);
this.setMaxListeners(2 * Math.floor(os.totalmem() / (256 * 1024 * 1024)) + 1); 148 - 95;
this.on('crippled', () => {
this.__loadedPage.length = 0;
this.livePages.clear();
});
}
briefPages() {
this.logger.info(`Status: ${this.livePages.size} pages alive: ${Array.from(this.livePages).map((x) => this.snMap.get(x)).sort().join(', ')}; ${this.__loadedPage.length} idle pages: ${this.__loadedPage.map((x) => this.snMap.get(x)).sort().join(', ')}`);
this.logger.info(``);
}
override async init() {
@ -121,8 +109,6 @@ export class PuppeteerControl extends AsyncService {
this.__healthCheckInterval = undefined;
}
await this.dependencyReady();
this.logger.info(`PuppeteerControl initializing with pool size ${this.pagePool.max}`, { poolSize: this.pagePool.max });
this.pagePool.start();
if (this.browser) {
if (this.browser.connected) {
@ -151,24 +137,33 @@ export class PuppeteerControl extends AsyncService {
this.emit('ready');
this.__healthCheckInterval = setInterval(() => this.healthCheck(), 30_000);
this.newPage().then((r) => this.__loadedPage.push(r));
}
@maxConcurrency(1)
async healthCheck() {
this.pagePool.max += 1;
const healthyPage = await this.pagePool.acquire(3).catch((err) => {
if (Date.now() - this.lastPageCratedAt <= 10_000) {
this.briefPages();
return;
}
const healthyPage = await this.newPage().catch((err) => {
this.logger.warn(`Health check failed`, { err: marshalErrorLike(err) });
return null;
});
this.pagePool.max -= 1;
if (healthyPage) {
this.pagePool.release(healthyPage);
this.__loadedPage.push(healthyPage);
if (this.__loadedPage.length > 3) {
this.ditchPage(this.__loadedPage.shift()!);
}
this.briefPages();
return;
}
this.logger.warn(`Trying to clean up...`);
await this.pagePool.clear();
this.browser.process()?.kill('SIGKILL');
Reflect.deleteProperty(this, 'browser');
this.emit('crippled');
@ -178,7 +173,7 @@ export class PuppeteerControl extends AsyncService {
async newPage() {
await this.serviceReady();
const dedicatedContext = await this.browser.createBrowserContext();
const sn = this._sn++;
const page = await dedicatedContext.newPage();
const preparations = [];
@ -300,18 +295,72 @@ document.addEventListener('readystatechange', handlePageLoad);
document.addEventListener('load', handlePageLoad);
`);
this.snMap.set(page, sn);
this.logger.warn(`Page ${sn} created.`);
this.lastPageCratedAt = Date.now();
this.livePages.add(page);
return page;
}
async getNextPage() {
let thePage;
if (this.__loadedPage.length) {
thePage = this.__loadedPage.shift();
if (this.__loadedPage.length <= 1) {
this.newPage()
.then((r) => this.__loadedPage.push(r))
.catch((err) => {
this.logger.warn(`Failed to load new page ahead of time`, { err: marshalErrorLike(err) });
});
}
}
if (!thePage) {
thePage = await this.newPage();
}
const timer = setTimeout(() => {
this.logger.warn(`Page is not allowed to live past 5 minutes, ditching page ${this.snMap.get(thePage)}...`);
this.ditchPage(thePage);
}, 300 * 1000);
this.finalizerMap.set(thePage, timer);
return thePage;
}
async ditchPage(page: Page) {
if (this.finalizerMap.has(page)) {
clearTimeout(this.finalizerMap.get(page)!);
this.finalizerMap.delete(page);
}
if (page.isClosed()) {
return;
}
const sn = this.snMap.get(page);
this.logger.info(`Closing page ${sn}`);
this.livePages.delete(page);
await Promise.race([
(async () => {
const ctx = page.browserContext();
await page.close();
await ctx.close();
})(), delay(5000)
]).catch((err) => {
this.logger.error(`Failed to destroy page ${sn}`, { err: marshalErrorLike(err) });
});
}
async *scrap(parsedUrl: URL, options?: ScrappingOptions): AsyncGenerator<PageSnapshot | undefined> {
// parsedUrl.search = '';
const url = parsedUrl.toString();
this.logger.info(`Scraping ${url}`, { url });
let snapshot: PageSnapshot | undefined;
let screenshot: Buffer | undefined;
const page = await this.pagePool.acquire();
const page = await this.getNextPage();
const sn = this.snMap.get(page);
this.logger.info(`Page ${sn}: Scraping ${url}`, { url });
if (options?.proxyUrl) {
await page.useProxy(options.proxyUrl);
}
@ -342,7 +391,7 @@ document.addEventListener('load', handlePageLoad);
const gotoPromise = page.goto(url, { waitUntil: ['load', 'domcontentloaded', 'networkidle0'], timeout: 30_000 })
.catch((err) => {
this.logger.warn(`Browsing of ${url} did not fully succeed`, { err: marshalErrorLike(err) });
this.logger.warn(`Page ${sn}: Browsing of ${url} did not fully succeed`, { err: marshalErrorLike(err) });
return Promise.reject(new AssertionFailureError({
message: `Failed to goto ${url}: ${err}`,
cause: err,
@ -362,7 +411,7 @@ document.addEventListener('load', handlePageLoad);
}
}
finalized = true;
this.logger.info(`Snapshot of ${url} done`, { url, title: snapshot?.title, href: snapshot?.href });
this.logger.info(`Page ${sn}: Snapshot of ${url} done`, { url, title: snapshot?.title, href: snapshot?.href });
this.emit(
'crawled',
{ ...snapshot, screenshot },
@ -378,7 +427,7 @@ document.addEventListener('load', handlePageLoad);
nextSnapshotDeferred.resolve(snapshot);
})
.catch((err) => {
this.logger.warn(`Failed to wait for selector ${options.waitForSelector}`, { err: marshalErrorLike(err) });
this.logger.warn(`Page ${sn}: Failed to wait for selector ${options.waitForSelector}`, { err: marshalErrorLike(err) });
});
}
@ -401,9 +450,7 @@ document.addEventListener('load', handlePageLoad);
} finally {
gotoPromise.finally(() => {
page.off('snapshot', hdl);
this.pagePool.destroy(page).catch((err) => {
this.logger.warn(`Failed to destroy page`, { err: marshalErrorLike(err) });
});
this.ditchPage(page);
});
nextSnapshotDeferred.resolve();
}