From 7e002a8b06a27327759949c96c210749f97c0dcd Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 30 Jul 2024 13:27:23 -0400 Subject: [PATCH] Nick: bull mq --- apps/api/package.json | 4 +- apps/api/pnpm-lock.yaml | 60 +++++----- apps/api/src/controllers/admin/queue.ts | 2 +- apps/api/src/controllers/crawl-cancel.ts | 19 +++- apps/api/src/controllers/crawl-status.ts | 18 ++- apps/api/src/controllers/crawl.ts | 2 +- apps/api/src/controllers/scrape.ts | 2 +- apps/api/src/controllers/status.ts | 21 +++- apps/api/src/index.ts | 14 +-- apps/api/src/lib/scrape-events.ts | 4 +- apps/api/src/main/runWebScraper.ts | 16 +-- apps/api/src/services/alerts/index.ts | 3 +- apps/api/src/services/queue-jobs.ts | 4 +- apps/api/src/services/queue-service.ts | 44 +++++--- apps/api/src/services/queue-worker.ts | 135 ++++++++++++++++++++--- apps/api/src/services/system-monitor.ts | 81 ++++++++++++++ 16 files changed, 330 insertions(+), 99 deletions(-) create mode 100644 apps/api/src/services/system-monitor.ts diff --git a/apps/api/package.json b/apps/api/package.json index 15e97377..4815b612 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -29,7 +29,6 @@ "@jest/globals": "^29.7.0", "@tsconfig/recommended": "^1.0.3", "@types/body-parser": "^1.19.2", - "@types/bull": "^4.10.0", "@types/cors": "^2.8.13", "@types/express": "^4.17.17", "@types/jest": "^29.5.12", @@ -63,7 +62,7 @@ "async-mutex": "^0.5.0", "axios": "^1.3.4", "bottleneck": "^2.19.5", - "bull": "^4.15.0", + "bullmq": "^5.11.0", "cacheable-lookup": "^6.1.0", "cheerio": "^1.0.0-rc.12", "cohere": "^1.1.1", @@ -98,6 +97,7 @@ "robots-parser": "^3.0.1", "scrapingbee": "^1.7.4", "stripe": "^16.1.0", + "systeminformation": "^5.22.11", "turndown": "^7.1.3", "turndown-plugin-gfm": "^1.0.2", "typesense": "^1.5.4", diff --git a/apps/api/pnpm-lock.yaml b/apps/api/pnpm-lock.yaml index ec83e18b..62760a98 100644 --- a/apps/api/pnpm-lock.yaml +++ b/apps/api/pnpm-lock.yaml @@ -56,9 +56,9 @@ importers: bottleneck: specifier: ^2.19.5 version: 2.19.5 - bull: - specifier: ^4.15.0 - version: 4.15.0 + bullmq: + specifier: ^5.11.0 + version: 5.11.0 cacheable-lookup: specifier: ^6.1.0 version: 6.1.0 @@ -161,6 +161,9 @@ importers: stripe: specifier: ^16.1.0 version: 16.1.0 + systeminformation: + specifier: ^5.22.11 + version: 5.22.11 turndown: specifier: ^7.1.3 version: 7.2.0 @@ -201,9 +204,6 @@ importers: '@types/body-parser': specifier: ^1.19.2 version: 1.19.5 - '@types/bull': - specifier: ^4.10.0 - version: 4.10.0 '@types/cors': specifier: ^2.8.13 version: 2.8.17 @@ -1535,10 +1535,6 @@ packages: '@types/body-parser@1.19.5': resolution: {integrity: sha512-fB3Zu92ucau0iQ0JMCFQE7b/dv8Ot07NI3KaZIkIUNXq82k4eBAqUaneXfleGY9JWskeS9y+u0nXMyspcuQrCg==} - '@types/bull@4.10.0': - resolution: {integrity: sha512-RkYW8K2H3J76HT6twmHYbzJ0GtLDDotpLP9ah9gtiA7zfF6peBH1l5fEiK0oeIZ3/642M7Jcb9sPmor8Vf4w6g==} - deprecated: This is a stub types definition. bull provides its own type definitions, so you do not need this installed. - '@types/bunyan@1.8.9': resolution: {integrity: sha512-ZqS9JGpBxVOvsawzmVt30sP++gSQMTejCkIAQ3VdadOcRE8izTyW66hufvwLeH+YEGP6Js2AW7Gz+RMyvrEbmw==} @@ -1935,9 +1931,8 @@ packages: buffer@6.0.3: resolution: {integrity: sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==} - bull@4.15.0: - resolution: {integrity: sha512-nOEAfUXwUXtFbRPQP3bWCwpQ/NAerAu2Nym/ucv5C1E+Qh2x6RGdKKsYIfZam4mYncayTynTUN/HLhRgGi2N8w==} - engines: {node: '>=12'} + bullmq@5.11.0: + resolution: {integrity: sha512-qVzyWGZqie3VHaYEgRXhId/j8ebfmj6MExEJyUByMsUJA5pVciVle3hKLer5fyMwtQ8lTMP7GwhXV/NZ+HzlRA==} bytes@3.1.2: resolution: {integrity: sha512-/Nf7TyzTx6S3yRJObOAV7956r8cr2+Oj8AC5dt8wSP3BQAoeX58NoHyCU8P8zGkNXStjTSi6fzO6F0pBdcYbEg==} @@ -2559,10 +2554,6 @@ packages: resolution: {integrity: sha512-pjzuKtY64GYfWizNAJ0fr9VqttZkNiK2iS430LtIHzjBEr6bX8Am2zm4sW4Ro5wjWW5cAlRL1qAMTcXbjNAO2Q==} engines: {node: '>=8.0.0'} - get-port@5.1.1: - resolution: {integrity: sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==} - engines: {node: '>=8'} - get-stream@5.2.0: resolution: {integrity: sha512-nBF+F1rAZVCu/p7rjzgA+Yb4lfYXrpl7a6VmJrU8wF9I1CKvP/QwPNZHnOlwbTkY6dvtFIzFMSyQXbLoTQPRpA==} engines: {node: '>=8'} @@ -3533,6 +3524,9 @@ packages: resolution: {integrity: sha512-dBpDMdxv9Irdq66304OLfEmQ9tbNRFnFTuZiLo+bD+r332bBmMJ8GBLXklIXXgxd3+v9+KUnZaUR5PJMa75Gsg==} engines: {node: '>= 0.4.0'} + node-abort-controller@3.1.1: + resolution: {integrity: sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==} + node-domexception@1.0.0: resolution: {integrity: sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ==} engines: {node: '>=10.5.0'} @@ -4258,6 +4252,12 @@ packages: resolution: {integrity: sha512-SzRP5LQ6Ts2G5NyAa/jg16s8e3R7rfdFjizy1zeoecYWw+nGL+YA1xZvW/+iJmidBGSdLkuvdwTYEyJEb+EiUw==} engines: {node: '>=0.2.6'} + systeminformation@5.22.11: + resolution: {integrity: sha512-aLws5yi4KCHTb0BVvbodQY5bY8eW4asMRDTxTW46hqw9lGjACX6TlLdJrkdoHYRB0qs+MekqEq1zG7WDnWE8Ug==} + engines: {node: '>=8.0.0'} + os: [darwin, linux, win32, freebsd, openbsd, netbsd, sunos, android] + hasBin: true + tar-fs@3.0.5: resolution: {integrity: sha512-JOgGAmZyMgbqpLwct7ZV8VzkEB6pxXFBVErLtb+XCOqzc6w1xiWKI9GVd6bwk68EX7eJ4DWmfXVmq8K2ziZTGg==} @@ -4450,10 +4450,6 @@ packages: resolution: {integrity: sha512-8XkAphELsDnEGrDxUOHB3RGvXz6TeuYSGEZBOjtTtPm2lwhGBjLgOzLHB63IUWfBpNucQjND6d3AOudO+H3RWQ==} hasBin: true - uuid@8.3.2: - resolution: {integrity: sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==} - hasBin: true - uuid@9.0.1: resolution: {integrity: sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==} hasBin: true @@ -6437,12 +6433,6 @@ snapshots: '@types/connect': 3.4.38 '@types/node': 20.14.1 - '@types/bull@4.10.0': - dependencies: - bull: 4.15.0 - transitivePeerDependencies: - - supports-color - '@types/bunyan@1.8.9': dependencies: '@types/node': 20.14.1 @@ -6913,15 +6903,15 @@ snapshots: base64-js: 1.5.1 ieee754: 1.2.1 - bull@4.15.0: + bullmq@5.11.0: dependencies: cron-parser: 4.9.0 - get-port: 5.1.1 ioredis: 5.4.1 - lodash: 4.17.21 msgpackr: 1.10.2 + node-abort-controller: 3.1.1 semver: 7.6.2 - uuid: 8.3.2 + tslib: 2.6.3 + uuid: 9.0.1 transitivePeerDependencies: - supports-color @@ -7522,8 +7512,6 @@ snapshots: get-package-type@0.1.0: {} - get-port@5.1.1: {} - get-stream@5.2.0: dependencies: pump: 3.0.0 @@ -8605,6 +8593,8 @@ snapshots: netmask@2.0.2: {} + node-abort-controller@3.1.1: {} + node-domexception@1.0.0: {} node-ensure@0.0.0: {} @@ -9417,6 +9407,8 @@ snapshots: sylvester@0.0.12: {} + systeminformation@5.22.11: {} + tar-fs@3.0.5: dependencies: pump: 3.0.0 @@ -9589,8 +9581,6 @@ snapshots: uuid@10.0.0: {} - uuid@8.3.2: {} - uuid@9.0.1: {} v8-compile-cache-lib@3.0.1: {} diff --git a/apps/api/src/controllers/admin/queue.ts b/apps/api/src/controllers/admin/queue.ts index cb5f99ed..3f1e9323 100644 --- a/apps/api/src/controllers/admin/queue.ts +++ b/apps/api/src/controllers/admin/queue.ts @@ -1,6 +1,6 @@ import { Request, Response } from "express"; -import { Job } from "bull"; +import { Job } from "bullmq"; import { Logger } from "../../lib/logger"; import { getWebScraperQueue } from "../../services/queue-service"; import { checkAlerts } from "../../services/alerts"; diff --git a/apps/api/src/controllers/crawl-cancel.ts b/apps/api/src/controllers/crawl-cancel.ts index d0c109ec..eb870f0a 100644 --- a/apps/api/src/controllers/crawl-cancel.ts +++ b/apps/api/src/controllers/crawl-cancel.ts @@ -41,7 +41,15 @@ export async function crawlCancelController(req: Request, res: Response) { } const jobState = await job.getState(); - const { partialDocs } = await job.progress(); + let progress = job.progress; + if(typeof progress !== 'object') { + progress = { + partialDocs: [] + } + } + const { + partialDocs = [] + } = progress as { partialDocs: any[] }; if (partialDocs && partialDocs.length > 0 && jobState === "active") { Logger.info("Billing team for partial docs..."); @@ -51,10 +59,11 @@ export async function crawlCancelController(req: Request, res: Response) { } try { - await getWebScraperQueue().client.del(job.lockKey()); - await job.takeLock(); - await job.discard(); - await job.moveToFailed(Error("Job cancelled by user"), true); + // TODO: FIX THIS by doing as a flag on the data? + // await getWebScraperQueue().client.del(job.lockKey()); + // await job.takeLock(); + // await job.discard(); + // await job.moveToFailed(Error("Job cancelled by user"), true); } catch (error) { Logger.error(error); } diff --git a/apps/api/src/controllers/crawl-status.ts b/apps/api/src/controllers/crawl-status.ts index 5aafa433..87f1fca0 100644 --- a/apps/api/src/controllers/crawl-status.ts +++ b/apps/api/src/controllers/crawl-status.ts @@ -21,7 +21,23 @@ export async function crawlStatusController(req: Request, res: Response) { return res.status(404).json({ error: "Job not found" }); } - const { current, current_url, total, current_step, partialDocs } = await job.progress(); + let progress = job.progress; + if(typeof progress !== 'object') { + progress = { + current: 0, + current_url: '', + total: 0, + current_step: '', + partialDocs: [] + } + } + const { + current = 0, + current_url = '', + total = 0, + current_step = '', + partialDocs = [] + } = progress as { current: number, current_url: string, total: number, current_step: string, partialDocs: any[] }; let data = job.returnvalue; if (process.env.USE_DB_AUTHENTICATION === "true") { diff --git a/apps/api/src/controllers/crawl.ts b/apps/api/src/controllers/crawl.ts index 9480c63b..42593d83 100644 --- a/apps/api/src/controllers/crawl.ts +++ b/apps/api/src/controllers/crawl.ts @@ -74,7 +74,7 @@ export async function crawlController(req: Request, res: Response) { }); const docs = await a.getDocuments(false, (progress) => { - job.progress({ + job.updateProgress({ current: progress.current, total: progress.total, current_step: "SCRAPING", diff --git a/apps/api/src/controllers/scrape.ts b/apps/api/src/controllers/scrape.ts index 7b4ccfd1..52ebba31 100644 --- a/apps/api/src/controllers/scrape.ts +++ b/apps/api/src/controllers/scrape.ts @@ -76,7 +76,7 @@ export async function scrapeHelper( } } - wsq.on("global:completed", listener); + // wsq.on("global:completed", listener); const timeoutPromise = new Promise<{ success: boolean; error?: string; returnCode: number }>((_, reject) => setTimeout(() => reject({ success: false, error: "Request timed out. Increase the timeout by passing `timeout` param to the request.", returnCode: 408 }), timeout) diff --git a/apps/api/src/controllers/status.ts b/apps/api/src/controllers/status.ts index 3d7fccbb..6437bea0 100644 --- a/apps/api/src/controllers/status.ts +++ b/apps/api/src/controllers/status.ts @@ -10,7 +10,24 @@ export async function crawlJobStatusPreviewController(req: Request, res: Respons return res.status(404).json({ error: "Job not found" }); } - const { current, current_url, total, current_step, partialDocs } = await job.progress(); + let progress = job.progress; + if(typeof progress !== 'object') { + progress = { + current: 0, + current_url: '', + total: 0, + current_step: '', + partialDocs: [] + } + } + const { + current = 0, + current_url = '', + total = 0, + current_step = '', + partialDocs = [] + } = progress as { current: number, current_url: string, total: number, current_step: string, partialDocs: any[] }; + let data = job.returnvalue; if (process.env.USE_DB_AUTHENTICATION === "true") { const supabaseData = await supabaseGetJobById(req.params.jobId); @@ -21,7 +38,7 @@ export async function crawlJobStatusPreviewController(req: Request, res: Respons } let jobStatus = await job.getState(); - if (jobStatus === 'waiting' || jobStatus === 'stuck') { + if (jobStatus === 'waiting' || jobStatus === 'delayed' || jobStatus === 'waiting-children' || jobStatus === 'unknown' || jobStatus === 'prioritized') { jobStatus = 'active'; } diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index ebe6ef38..c9bb6589 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -181,11 +181,11 @@ if (cluster.isMaster) { Logger.info(`Worker ${process.pid} started`); } -const wsq = getWebScraperQueue(); +// const wsq = getWebScraperQueue(); -wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); -wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active")); -wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed")); -wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused")); -wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); -wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); +// wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); +// wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active")); +// wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed")); +// wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused")); +// wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); +// wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); diff --git a/apps/api/src/lib/scrape-events.ts b/apps/api/src/lib/scrape-events.ts index ab4ef681..0ae2d77c 100644 --- a/apps/api/src/lib/scrape-events.ts +++ b/apps/api/src/lib/scrape-events.ts @@ -1,4 +1,4 @@ -import { Job, JobId } from "bull"; +import { Job } from "bullmq"; import type { baseScrapers } from "../scraper/WebScraper/single_url"; import { supabase_service as supabase } from "../services/supabase"; import { Logger } from "./logger"; @@ -70,7 +70,7 @@ export class ScrapeEvents { } } - static async logJobEvent(job: Job | JobId, event: ScrapeQueueEvent["event"]) { + static async logJobEvent(job: Job | any, event: ScrapeQueueEvent["event"]) { try { await this.insert(((job as any).id ? (job as any).id : job) as string, { type: "queue", diff --git a/apps/api/src/main/runWebScraper.ts b/apps/api/src/main/runWebScraper.ts index 5e7d2279..8eb95eb2 100644 --- a/apps/api/src/main/runWebScraper.ts +++ b/apps/api/src/main/runWebScraper.ts @@ -1,4 +1,4 @@ -import { Job } from "bull"; +import { Job } from "bullmq"; import { CrawlResult, WebScraperOptions, @@ -15,8 +15,10 @@ import { ScrapeEvents } from "../lib/scrape-events"; export async function startWebScraperPipeline({ job, + token, }: { job: Job; + token: string; }) { let partialDocs: Document[] = []; return (await runWebScraper({ @@ -31,17 +33,17 @@ export async function startWebScraperPipeline({ if (partialDocs.length > 50) { partialDocs = partialDocs.slice(-50); } - job.progress({ ...progress, partialDocs: partialDocs }); + job.updateProgress({ ...progress, partialDocs: partialDocs }); } }, onSuccess: (result) => { Logger.debug(`🐂 Job completed ${job.id}`); - saveJob(job, result); + saveJob(job, result, token); }, onError: (error) => { Logger.error(`🐂 Job failed ${job.id}`); ScrapeEvents.logJobEvent(job, "failed"); - job.moveToFailed(error); + job.moveToFailed(error, token, false); }, team_id: job.data.team_id, bull_job_id: job.id.toString(), @@ -121,7 +123,7 @@ export async function runWebScraper({ } } -const saveJob = async (job: Job, result: any) => { +const saveJob = async (job: Job, result: any, token: string) => { try { if (process.env.USE_DB_AUTHENTICATION === "true") { const { data, error } = await supabase_service @@ -131,13 +133,13 @@ const saveJob = async (job: Job, result: any) => { if (error) throw new Error(error.message); try { - await job.moveToCompleted(null); + await job.moveToCompleted(null, token, false); } catch (error) { // I think the job won't exist here anymore } } else { try { - await job.moveToCompleted(result); + await job.moveToCompleted(result, token, false); } catch (error) { // I think the job won't exist here anymore } diff --git a/apps/api/src/services/alerts/index.ts b/apps/api/src/services/alerts/index.ts index 88b3c726..0376f4c2 100644 --- a/apps/api/src/services/alerts/index.ts +++ b/apps/api/src/services/alerts/index.ts @@ -36,9 +36,8 @@ export async function checkAlerts() { const checkWaitingQueue = async () => { const webScraperQueue = getWebScraperQueue(); const waitingJobs = await webScraperQueue.getWaitingCount(); - const paused = await webScraperQueue.getPausedCount(); - if (waitingJobs !== paused && waitingJobs > Number(process.env.ALERT_NUM_WAITING_JOBS)) { + if (waitingJobs > Number(process.env.ALERT_NUM_WAITING_JOBS)) { Logger.warn( `Alert: Number of waiting jobs is over ${process.env.ALERT_NUM_WAITING_JOBS}. Current waiting jobs: ${waitingJobs}.` ); diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index d982f32f..028cdd77 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -1,4 +1,4 @@ -import { Job, Queue } from "bull"; +import { Job, Queue } from "bullmq"; import { getWebScraperQueue, } from "./queue-service"; @@ -10,7 +10,7 @@ export async function addWebScraperJob( options: any = {}, jobId: string = uuidv4(), ): Promise { - return await getWebScraperQueue().add(webScraperOptions, { + return await getWebScraperQueue().add(jobId, webScraperOptions, { ...options, jobId, }); diff --git a/apps/api/src/services/queue-service.ts b/apps/api/src/services/queue-service.ts index d531c2db..db62c711 100644 --- a/apps/api/src/services/queue-service.ts +++ b/apps/api/src/services/queue-service.ts @@ -1,23 +1,41 @@ -import Queue from "bull"; -import { Queue as BullQueue } from "bull"; +import { Queue } from "bullmq"; import { Logger } from "../lib/logger"; +import IORedis from "ioredis"; +import { Worker } from "bullmq"; +import systemMonitor from "./system-monitor"; +import { v4 as uuidv4 } from "uuid"; -let webScraperQueue: BullQueue; +let webScraperQueue: Queue; +export const redisConnection = new IORedis(process.env.REDIS_URL, { + maxRetriesPerRequest: null, +}); + + + +export const webScraperQueueName = "{webscraperQueue}"; export function getWebScraperQueue() { if (!webScraperQueue) { - webScraperQueue = new Queue("web-scraper", process.env.REDIS_URL, { - settings: { - lockDuration: 1 * 60 * 1000, // 1 minute in milliseconds, - lockRenewTime: 15 * 1000, // 15 seconds in milliseconds - stalledInterval: 30 * 1000, - maxStalledCount: 10, - }, - defaultJobOptions:{ - attempts: 5 + webScraperQueue = new Queue( + webScraperQueueName, + { + connection: redisConnection, } - }); + // { + // settings: { + // lockDuration: 1 * 60 * 1000, // 1 minute in milliseconds, + // lockRenewTime: 15 * 1000, // 15 seconds in milliseconds + // stalledInterval: 30 * 1000, + // maxStalledCount: 10, + // }, + // defaultJobOptions:{ + // attempts: 5 + // } + // } + ); Logger.info("Web scraper queue created"); } return webScraperQueue; } + + diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index cda690dd..3cd62751 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -1,14 +1,17 @@ import { CustomError } from "../lib/custom-error"; -import { getWebScraperQueue } from "./queue-service"; +import { getWebScraperQueue, redisConnection, webScraperQueueName } from "./queue-service"; import "dotenv/config"; import { logtail } from "./logtail"; import { startWebScraperPipeline } from "../main/runWebScraper"; import { callWebhook } from "./webhook"; import { logJob } from "./logging/log_job"; import { initSDK } from '@hyperdx/node-opentelemetry'; -import { Job } from "bull"; +import { Job } from "bullmq"; import { Logger } from "../lib/logger"; import { ScrapeEvents } from "../lib/scrape-events"; +import { Worker } from "bullmq"; +import systemMonitor from "./system-monitor"; +import { v4 as uuidv4 } from "uuid"; if (process.env.ENV === 'production') { initSDK({ @@ -16,21 +19,115 @@ if (process.env.ENV === 'production') { additionalInstrumentations: [], }); } +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); +const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000; +const workerStalledCheckInterval = + Number(process.env.WORKER_STALLED_CHECK_INTERVAL) || 30000; +const jobLockExtendInterval = + Number(process.env.JOB_LOCK_EXTEND_INTERVAL) || 15000; +const jobLockExtensionTime = + Number(process.env.JOB_LOCK_EXTENSION_TIME) || 15000; + +const cantAcceptConnectionInterval = + Number(process.env.CANT_ACCEPT_CONNECTION_INTERVAL) || 2000; +const connectionMonitorInterval = + Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10; +const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20; const wsq = getWebScraperQueue(); -async function processJob(job: Job, done) { +const processJobInternal = async (token: string, job: Job) => { + + const extendLockInterval = setInterval(async () => { + await job.extendLock(token, jobLockExtensionTime); + }, jobLockExtendInterval); + + try { + const result = await processJob(job, token); + + // await resultQueue.add('resultJob', result,{jobId: job.id}); + console.log("🐂 Job completed", result); + console.log({token}) + console.log(await job.getState()) + // await job.moveToCompleted(result, token, false); //3rd arg fetchNext + } catch (error) { + console.log("Job failed, error:", error); + + await job.moveToFailed(error, token, false); + } finally { + clearInterval(extendLockInterval); + } +}; + +let isShuttingDown = false; + +process.on("SIGINT", () => { + console.log("Received SIGINT. Shutting down gracefully..."); + isShuttingDown = true; +}); + + +const workerFun = async () => { + // const bullQueueName = queueNames[engine]; + // const resultQueue = messageQueues[engine]; + + const worker = new Worker(webScraperQueueName, null, { + connection: redisConnection, + lockDuration: 1 * 60 * 1000, // 1 minute + // lockRenewTime: 15 * 1000, // 15 seconds + stalledInterval: 30 * 1000, // 30 seconds + maxStalledCount: 10, // 10 times + }); + + worker.startStalledCheckTimer(); + + let contextManager; + + const monitor = await systemMonitor; + + while (true) { + if (isShuttingDown) { + console.log("No longer accepting new jobs. SIGINT"); + break; + } + const token = uuidv4(); + // console.time("acceptConnectionDelay"); + const canAcceptConnection = await monitor.acceptConnection(); + // console.timeEnd("acceptConnectionDelay"); + // console.log("canAcceptConnection", canAcceptConnection); + if (!canAcceptConnection) { + console.log("Cant accept connection"); + await sleep(cantAcceptConnectionInterval); // more sleep + continue; + } + + const job = await worker.getNextJob(token); + // console.log("job", job); + if (job) { + processJobInternal(token, job); + await sleep(gotJobInterval); + } else { + await sleep(connectionMonitorInterval); + } + } +}; + +workerFun(); + +async function processJob(job: Job, token: string) { Logger.debug(`🐂 Worker taking job ${job.id}`); try { - job.progress({ + console.log("🐂 Updating progress"); + console.log({job}) + job.updateProgress({ current: 1, total: 100, current_step: "SCRAPING", current_url: "", }); const start = Date.now(); - const { success, message, docs } = await startWebScraperPipeline({ job }); + const { success, message, docs } = await startWebScraperPipeline({ job, token }); const end = Date.now(); const timeTakenInSeconds = (end - start) / 1000; @@ -64,10 +161,11 @@ async function processJob(job: Job, done) { origin: job.data.origin, }); Logger.debug(`🐂 Job done ${job.id}`); - done(null, data); + // done(null, data); + return data; } catch (error) { Logger.error(`🐂 Job errored ${job.id} - ${error}`); - if (await getWebScraperQueue().isPaused(false)) { + if (await getWebScraperQueue().isPaused()) { Logger.debug("🐂Queue is paused, ignoring"); return; } @@ -112,18 +210,19 @@ async function processJob(job: Job, done) { pageOptions: job.data.pageOptions, origin: job.data.origin, }); - done(null, data); + // done(null, data); + return data; } } -wsq.process( - Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)), - processJob -); +// wsq.process( +// Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)), +// processJob +// ); -wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); -wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active")); -wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed")); -wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused")); -wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); -wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); +// wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); +// wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active")); +// wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed")); +// wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused")); +// wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); +// wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); diff --git a/apps/api/src/services/system-monitor.ts b/apps/api/src/services/system-monitor.ts new file mode 100644 index 00000000..c0574912 --- /dev/null +++ b/apps/api/src/services/system-monitor.ts @@ -0,0 +1,81 @@ +import si from 'systeminformation'; +import { Mutex } from "async-mutex"; + +const MAX_CPU = process.env.MAX_CPU ? parseFloat(process.env.MAX_CPU) : 0.8; +const MAX_RAM = process.env.MAX_RAM ? parseFloat(process.env.MAX_RAM) : 0.8; +const CACHE_DURATION = process.env.SYS_INFO_MAX_CACHE_DURATION ? parseFloat(process.env.SYS_INFO_MAX_CACHE_DURATION) : 150; + +class SystemMonitor { + private static instance: SystemMonitor; + private static instanceMutex = new Mutex(); + + private cpuUsageCache: number | null = null; + private memoryUsageCache: number | null = null; + private lastCpuCheck: number = 0; + private lastMemoryCheck: number = 0; + + private constructor() {} + + public static async getInstance(): Promise { + if (SystemMonitor.instance) { + return SystemMonitor.instance; + } + + await this.instanceMutex.runExclusive(async () => { + if (!SystemMonitor.instance) { + SystemMonitor.instance = new SystemMonitor(); + } + }); + + return SystemMonitor.instance; + } + + private async checkMemoryUsage() { + const now = Date.now(); + if (this.memoryUsageCache !== null && (now - this.lastMemoryCheck) < CACHE_DURATION) { + return this.memoryUsageCache; + } + + const memoryData = await si.mem(); + const totalMemory = memoryData.total; + const availableMemory = memoryData.available; + const usedMemory = totalMemory - availableMemory; + const usedMemoryPercentage = (usedMemory / totalMemory); + + this.memoryUsageCache = usedMemoryPercentage; + this.lastMemoryCheck = now; + + return usedMemoryPercentage; + } + + private async checkCpuUsage() { + const now = Date.now(); + if (this.cpuUsageCache !== null && (now - this.lastCpuCheck) < CACHE_DURATION) { + return this.cpuUsageCache; + } + + const cpuData = await si.currentLoad(); + const cpuLoad = cpuData.currentLoad / 100; + + this.cpuUsageCache = cpuLoad; + this.lastCpuCheck = now; + + return cpuLoad; + } + + public async acceptConnection() { + const cpuUsage = await this.checkCpuUsage(); + const memoryUsage = await this.checkMemoryUsage(); + + return cpuUsage < MAX_CPU && memoryUsage < MAX_RAM; + } + + public clearCache() { + this.cpuUsageCache = null; + this.memoryUsageCache = null; + this.lastCpuCheck = 0; + this.lastMemoryCheck = 0; + } +} + +export default SystemMonitor.getInstance(); \ No newline at end of file