feat: leveraging NODE_COMPILE_CACHE (#1162)

* wip: try to leverage NODE_COMPILE_CACHE

* fix

* fix

* fix

* fix

* fix: black hole detector

* bhd: also tracking curl requests
This commit is contained in:
Yanlong Wang 2025-03-10 09:23:25 +08:00 committed by GitHub
parent d0e20cc086
commit 6e78e38e95
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 101 additions and 81 deletions

View File

@ -29,8 +29,11 @@ COPY licensed ./licensed
RUN rm -rf ~/.config/chromium && mkdir -p ~/.config/chromium
RUN NODE_COMPILE_CACHE=node_modules npm run dry-run
ENV OVERRIDE_CHROME_EXECUTABLE_PATH=/usr/bin/google-chrome-stable
ENV LD_PRELOAD=/usr/local/lib/libcurl-impersonate.so CURL_IMPERSONATE=chrome116 CURL_IMPERSONATE_HEADERS=no
ENV NODE_CACHE_DIR=node_modules
ENV PORT=8080
EXPOSE 3000 3001 8080 8081

8
package-lock.json generated
View File

@ -17,7 +17,7 @@
"axios": "^1.3.3",
"bcrypt": "^5.1.0",
"busboy": "^1.6.0",
"civkit": "^0.8.4-5f839a7",
"civkit": "^0.8.4-f65f570",
"core-js": "^3.37.1",
"cors": "^2.8.5",
"dayjs": "^1.11.9",
@ -4095,9 +4095,9 @@
}
},
"node_modules/civkit": {
"version": "0.8.4-5f839a7",
"resolved": "https://registry.npmjs.org/civkit/-/civkit-0.8.4-5f839a7.tgz",
"integrity": "sha512-wF9Sm0dKBNGTXtueYtmwqreciilEw2+H3uAZgJNK/B+MoeQecvQ1alrqPqIP/Xf64H1ik6mD0Z47cez8jkayGA==",
"version": "0.8.4-f65f570",
"resolved": "https://registry.npmjs.org/civkit/-/civkit-0.8.4-f65f570.tgz",
"integrity": "sha512-s04b5ca/wYaBI1cCfSxNS5t170y+vEHXkc7CqriZA+qLJY3mI5hAiFSeWoOBwt4/0nc+lY4UaHuhmjjpRXYSqw==",
"license": "AGPL",
"dependencies": {
"lodash": "^4.17.21",

View File

@ -7,7 +7,8 @@
"build:clean": "rm -rf ./build",
"serve": "npm run build && npm run start",
"debug": "npm run build && npm run dev",
"start": "npm run shell"
"start": "node ./build/stand-alone/crawl.js",
"dry-run": "NODE_ENV=dry-run node ./build/stand-alone/search.js"
},
"engines": {
"node": ">=18"
@ -25,7 +26,7 @@
"axios": "^1.3.3",
"bcrypt": "^5.1.0",
"busboy": "^1.6.0",
"civkit": "^0.8.4-5f839a7",
"civkit": "^0.8.4-f65f570",
"core-js": "^3.37.1",
"cors": "^2.8.5",
"dayjs": "^1.11.9",

View File

@ -137,7 +137,9 @@ export class CrawlerHost extends RPCHost {
override async init() {
await this.dependencyReady();
this.curlControl.impersonateChrome(this.puppeteerControl.ua.replace(/Headless/i, ''));
if (this.puppeteerControl.ua) {
this.curlControl.impersonateChrome(this.puppeteerControl.ua.replace(/Headless/i, ''));
}
this.emit('ready');
}

View File

@ -1,31 +0,0 @@
import 'reflect-metadata';
import './shared/lib/doom-domain';
import { initializeApp } from 'firebase-admin/app';
initializeApp();
import { loadModulesDynamically, registry } from './shared';
import path from 'path';
loadModulesDynamically(path.resolve(__dirname, 'cloud-functions'));
loadModulesDynamically(path.resolve(__dirname, 'shared', 'cloud-functions'));
Object.assign(exports, registry.exportAll());
Object.assign(exports, registry.exportGrouped({
memory: '4GiB',
timeoutSeconds: 540,
}));
registry.allHandsOnDeck().catch(() => void 0);
registry.title = 'reader';
registry.version = '0.1.0';
process.on('unhandledRejection', (_err) => `Somehow is false alarm in firebase`);
process.on('uncaughtException', (err) => {
console.log('Uncaught exception', err);
// Looks like Firebase runtime does not handle error properly.
// Make sure to quit the process.
process.nextTick(() => process.exit(1));
console.error('Uncaught exception, process quit.');
throw err;
});

View File

@ -1,6 +1,7 @@
import { singleton } from 'tsyringe';
import { AsyncService } from 'civkit/async-service';
import { GlobalLogger } from './logger';
import { delay } from 'civkit/timeout';
@singleton()
@ -22,7 +23,7 @@ export class BlackHoleDetector extends AsyncService {
if (process.env.NODE_ENV?.startsWith('prod')) {
setInterval(() => {
this.routine();
}, 1000 * 15).unref();
}, 1000 * 30).unref();
}
}
@ -32,14 +33,16 @@ export class BlackHoleDetector extends AsyncService {
this.emit('ready');
}
routine() {
async routine() {
// We give routine a 3s grace period for potentially paused CPU to spin up and process some requests
await delay(3000);
const now = Date.now();
const lastWorked = this.lastWorkedTs;
if (!lastWorked) {
return;
}
const dt = (now - lastWorked);
if (this.concurrentRequests > 0 &&
if (this.concurrentRequests > 1 &&
this.lastIncomingRequestTs && lastWorked &&
this.lastIncomingRequestTs >= lastWorked &&
(dt > (this.maxDelay * (this.strikes + 1)))

View File

@ -14,6 +14,7 @@ import { ZSTDDecompress } from 'simple-zstd';
import _ from 'lodash';
import { Readable } from 'stream';
import { AsyncLocalContext } from './async-context';
import { BlackHoleDetector } from './blackhole-detector';
export interface CURLScrappingOptions extends ScrappingOptions {
method?: string;
@ -36,6 +37,7 @@ export class CurlControl extends AsyncService {
protected globalLogger: GlobalLogger,
protected tempFileManager: TempFileManager,
protected asyncLocalContext: AsyncLocalContext,
protected blackHoleDetector: BlackHoleDetector,
) {
super(...arguments);
}
@ -349,7 +351,7 @@ export class CurlControl extends AsyncService {
async sideLoad(targetUrl: URL, crawlOpts?: CURLScrappingOptions) {
const curlResult = await this.urlToFile(targetUrl, crawlOpts);
this.blackHoleDetector.itWorked();
let finalURL = targetUrl;
const sideLoadOpts: CURLScrappingOptions['sideLoad'] = {
impersonate: {},
@ -406,13 +408,13 @@ export class CurlControl extends AsyncService {
}
// Retryable errors
case CurlCode.CURLE_RECV_ERROR:
case CurlCode.CURLE_OPERATION_TIMEDOUT:
case CurlCode.CURLE_SSL_CONNECT_ERROR:
case CurlCode.CURLE_QUIC_CONNECT_ERROR:
case CurlCode.CURLE_COULDNT_RESOLVE_PROXY:
case CurlCode.CURLE_COULDNT_CONNECT:
case CurlCode.CURLE_PARTIAL_FILE:
case CurlCode.CURLE_OPERATION_TIMEDOUT: {
case CurlCode.CURLE_PARTIAL_FILE: {
return new ServiceBadAttemptError(msg);
}

View File

@ -3,12 +3,24 @@ import { container, singleton } from 'tsyringe';
import { isMainThread } from 'worker_threads';
import { GlobalLogger } from './logger';
const realProcessExit = process.exit;
process.exit = ((code?: number) => {
if (isMainThread) {
return;
}
return realProcessExit(code);
}) as typeof process.exit;
@singleton()
export class FinalizerService extends AbstractFinalizerService {
container = container;
logger = this.globalLogger.child({ service: this.constructor.name });
override quitProcess(code?: string | number | null | undefined): never {
return realProcessExit(code);
}
constructor(protected globalLogger: GlobalLogger) {
super(...arguments);
}

View File

@ -1,4 +1,3 @@
import os from 'os';
import fs from 'fs';
import { container, singleton } from 'tsyringe';
import { AsyncService, Defer, marshalErrorLike, AssertionFailureError, delay, Deferred, perNextTick, ParamValidationError, FancyFile } from 'civkit';
@ -474,7 +473,7 @@ export class PuppeteerControl extends AsyncService {
protected blackHoleDetector: BlackHoleDetector,
) {
super(...arguments);
this.setMaxListeners(2 * Math.floor(os.totalmem() / (256 * 1024 * 1024)) + 1); 148 - 95;
this.setMaxListeners(Infinity);
let crippledTimes = 0;
this.on('crippled', () => {
@ -496,6 +495,10 @@ export class PuppeteerControl extends AsyncService {
this.__reqCapInterval = undefined;
}
await this.dependencyReady();
if (process.env.NODE_ENV?.includes('dry-run')) {
this.emit('ready');
return;
}
if (this.browser) {
if (this.browser.connected) {

View File

@ -1,6 +1,7 @@
import { AbstractTempFileManger } from 'civkit/temp';
import { unlink } from 'fs/promises';
import { rm } from 'fs/promises';
import { singleton } from 'tsyringe';
import { Finalizer } from './finalizer';
@singleton()
export class TempFileManager extends AbstractTempFileManger {
@ -13,10 +14,10 @@ export class TempFileManager extends AbstractTempFileManger {
this.emit('ready');
}
@Finalizer()
override async standDown() {
await super.standDown();
await unlink(this.rootDir);
await rm(this.rootDir, { recursive: true, force: true });
}
}

View File

@ -3,6 +3,7 @@ import { container, singleton } from 'tsyringe';
import { KoaServer } from 'civkit/civ-rpc/koa';
import http2 from 'http2';
import http from 'http';
import { CrawlerHost } from '../api/crawler';
import { FsWalk, WalkOutEntity } from 'civkit/fswalk';
import path from 'path';
@ -14,21 +15,9 @@ import { AsyncResource } from 'async_hooks';
import { runOnce } from 'civkit/decorators';
import { randomUUID } from 'crypto';
import { ThreadedServiceRegistry } from '../services/threaded';
import globalLogger, { GlobalLogger } from '../services/logger';
import { GlobalLogger } from '../services/logger';
import { AsyncLocalContext } from '../services/async-context';
process.on('unhandledRejection', (err) => {
globalLogger.warn('Unhandled rejection', err);
});
process.on('uncaughtException', (err) => {
globalLogger.error('Uncaught exception', err);
// Looks like Firebase runtime does not handle error properly.
// Make sure to quit the process.
globalLogger.error('Uncaught exception, process quit.');
process.nextTick(() => process.exit(1));
});
import finalizer, { Finalizer } from '../services/finalizer';
@singleton()
export class CrawlStandAloneServer extends KoaServer {
@ -131,9 +120,32 @@ export class CrawlStandAloneServer extends KoaServer {
this.koaApp.use(asyncHookMiddleware);
}
@Finalizer()
override async standDown() {
const tasks: Promise<any>[] = [];
if (this.httpAlternativeServer?.listening) {
(this.httpAlternativeServer as http.Server).closeIdleConnections?.();
this.httpAlternativeServer.close();
tasks.push(new Promise<void>((resolve, reject) => {
this.httpAlternativeServer!.close((err) => {
if (err) {
return reject(err);
}
resolve();
});
}));
}
tasks.push(super.standDown());
await Promise.all(tasks);
}
}
const instance = container.resolve(CrawlStandAloneServer);
export default instance;
instance.serviceReady().then((s) => s.h2c().listen(parseInt(process.env.PORT || '') || 3000));
if (process.env.NODE_ENV?.includes('dry-run')) {
instance.serviceReady().then(() => finalizer.terminate());
} else {
instance.serviceReady().then((s) => s.h2c().listen(parseInt(process.env.PORT || '') || 3000));
}

View File

@ -3,6 +3,7 @@ import { container, singleton } from 'tsyringe';
import { KoaServer } from 'civkit/civ-rpc/koa';
import http2 from 'http2';
import http from 'http';
import { SearcherHost } from '../api/searcher-serper';
import { FsWalk, WalkOutEntity } from 'civkit/fswalk';
import path from 'path';
@ -14,21 +15,9 @@ import { AsyncResource } from 'async_hooks';
import { runOnce } from 'civkit/decorators';
import { randomUUID } from 'crypto';
import { ThreadedServiceRegistry } from '../services/threaded';
import globalLogger, { GlobalLogger } from '../services/logger';
import { GlobalLogger } from '../services/logger';
import { AsyncLocalContext } from '../services/async-context';
process.on('unhandledRejection', (err) => {
globalLogger.warn('Unhandled rejection', err);
});
process.on('uncaughtException', (err) => {
globalLogger.error('Uncaught exception', err);
// Looks like Firebase runtime does not handle error properly.
// Make sure to quit the process.
globalLogger.error('Uncaught exception, process quit.');
process.nextTick(() => process.exit(1));
});
import finalizer, { Finalizer } from '../services/finalizer';
@singleton()
export class SearchStandAloneServer extends KoaServer {
@ -63,7 +52,7 @@ export class SearchStandAloneServer extends KoaServer {
await this.walkForAssets();
await this.dependencyReady();
for (const [k,v] of this.registry.conf.entries()) {
for (const [k, v] of this.registry.conf.entries()) {
if (v.tags?.includes('crawl')) {
this.registry.conf.delete(k);
}
@ -140,9 +129,32 @@ export class SearchStandAloneServer extends KoaServer {
this.koaApp.use(asyncHookMiddleware);
}
@Finalizer()
override async standDown() {
const tasks: Promise<any>[] = [];
if (this.httpAlternativeServer?.listening) {
(this.httpAlternativeServer as http.Server).closeIdleConnections?.();
this.httpAlternativeServer.close();
tasks.push(new Promise<void>((resolve, reject) => {
this.httpAlternativeServer!.close((err) => {
if (err) {
return reject(err);
}
resolve();
});
}));
}
tasks.push(super.standDown());
await Promise.all(tasks);
}
}
const instance = container.resolve(SearchStandAloneServer);
export default instance;
instance.serviceReady().then((s) => s.h2c().listen(parseInt(process.env.PORT || '') || 3000));
if (process.env.NODE_ENV?.includes('dry-run')) {
instance.serviceReady().then(() => finalizer.terminate());
} else {
instance.serviceReady().then((s) => s.h2c().listen(parseInt(process.env.PORT || '') || 3000));
}

@ -1 +1 @@
Subproject commit ed905226dac8e465b85b957f295eab02d260fc83
Subproject commit f7d65a8b12fa32d3d6fa46585d73693cba7b14e3