diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 47c97ea4..4d44814c 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -9,8 +9,6 @@ import { v0Router } from "./routes/v0"; import cluster from "cluster"; import os from "os"; import { Job } from "bull"; -import { supabase_service } from "./services/supabase"; -import { logJob } from "./services/logging/log_job"; const { createBullBoard } = require("@bull-board/api"); const { BullAdapter } = require("@bull-board/api/bullAdapter"); @@ -22,39 +20,6 @@ console.log(`Number of CPUs: ${numCPUs} available`); if (cluster.isMaster) { console.log(`Master ${process.pid} is running`); - // (async () => { - // if (process.env.USE_DB_AUTHENTICATION) { - // const wsq = getWebScraperQueue(); - // const { error, data } = await supabase_service - // .from("firecrawl_jobs") - // .select() - // .eq("retry", true); - - // if (error) throw new Error(error.message); - - // await wsq.addBulk(data.map(x => ({ - // data: { - // url: x.url, - // mode: x.mode, - // crawlerOptions: x.crawler_options, - // team_id: x.team_id, - // pageOptions: x.page_options, - // origin: x.origin, - // }, - // opts: { - // jobId: x.job_id, - // } - // }))) - - // if (data.length > 0) { - // await supabase_service - // .from("firecrawl_jobs") - // .delete() - // .in("id", data.map(x => x.id)); - // } - // } - // })(); - // Fork workers. for (let i = 0; i < numCPUs; i++) { cluster.fork(); @@ -271,57 +236,3 @@ if (cluster.isMaster) { console.log(`Worker ${process.pid} started`); } - -const onExit = async () => { - console.log("Shutting down gracefully..."); - - if (cluster.workers) { - for (const worker of Object.keys(cluster.workers || {})) { - cluster.workers[worker].process.kill(); - } - } - - if (process.env.USE_DB_AUTHENTICATION) { - const wsq = getWebScraperQueue(); - const activeJobCount = await wsq.getActiveCount(); - console.log("Updating", activeJobCount, "in-progress jobs"); - - const activeJobs = (await Promise.all(new Array(Math.ceil(activeJobCount / 10)).fill(0).map((_, i) => { - return wsq.getActive(i, i+10) - }))).flat(1); - - for (const job of activeJobs) { - console.log(job.id); - try { - await logJob({ - job_id: job.id as string, - success: false, - message: "Interrupted, retrying", - num_docs: 0, - docs: [], - time_taken: 0, - team_id: job.data.team_id, - mode: "crawl", - url: job.data.url, - crawlerOptions: job.data.crawlerOptions, - pageOptions: job.data.pageOptions, - origin: job.data.origin, - retry: true, - }); - - await wsq.client.del(await job.lockKey()); - await job.takeLock(); - await job.moveToFailed({ message: "interrupted" }); - await job.remove(); - } catch (error) { - console.error("Failed to update job status:", error); - } - } - } - - console.log("Bye!"); - process.exit(); -}; - -process.on("SIGINT", onExit); -process.on("SIGTERM", onExit); \ No newline at end of file diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 8721691c..d6801856 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -11,10 +11,13 @@ import { logJob } from "./logging/log_job"; // initSDK({ consoleCapture: true, additionalInstrumentations: []}); // } -getWebScraperQueue().process( +const wsq = getWebScraperQueue(); +const myJobs = []; + +wsq.process( Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)), async function (job, done) { - + myJobs.push(job.id); try { job.progress({ current: 1, @@ -96,5 +99,33 @@ getWebScraperQueue().process( }); done(null, data); } + myJobs.splice(myJobs.indexOf(job.id), 1); } ); + +process.on("SIGINT", async () => { + console.log("Gracefully shutting down..."); + + await wsq.pause(true, true); + + if (myJobs.length > 0) { + const jobs = await Promise.all(myJobs.map(x => wsq.getJob(x))); + console.log("Removing", jobs.length, "jobs..."); + await Promise.all(jobs.map(async x => { + // await wsq.client.del(await x.lockKey()); + // await x.takeLock(); + await x.moveToFailed({ message: "interrupted" }); + await x.remove(); + })); + console.log("Re-adding", jobs.length, "jobs..."); + await wsq.addBulk(jobs.map(x => ({ + data: x.data, + opts: { + jobId: x.id, + }, + }))); + console.log("Done!"); + } + + process.exit(0); +});