diff --git a/apps/api/Dockerfile b/apps/api/Dockerfile index b32dab53..a7be4fe0 100644 --- a/apps/api/Dockerfile +++ b/apps/api/Dockerfile @@ -31,6 +31,3 @@ COPY --from=build /app /app # Start the server by default, this can be overwritten at runtime EXPOSE 8080 ENV PUPPETEER_EXECUTABLE_PATH="/usr/bin/chromium" -CMD [ "pnpm", "run", "start:production" ] -CMD [ "pnpm", "run", "worker:production" ] - diff --git a/apps/api/_ORIGIN_fly.toml b/apps/api/_ORIGIN_fly.toml index da145cdb..481290f0 100644 --- a/apps/api/_ORIGIN_fly.toml +++ b/apps/api/_ORIGIN_fly.toml @@ -8,11 +8,14 @@ primary_region = 'mia' kill_signal = 'SIGINT' kill_timeout = '30s' +[deploy] + release_command = 'node dist/src/trigger-shutdown.js https://api.firecrawl.dev' + [build] [processes] - app = 'npm run start:production' - worker = 'npm run worker:production' + app = 'node dist/src/index.js' + worker = 'node dist/src/services/queue-worker.js' [http_service] internal_port = 8080 diff --git a/apps/api/fly.toml b/apps/api/fly.toml index 6d87f5c0..09fa135e 100644 --- a/apps/api/fly.toml +++ b/apps/api/fly.toml @@ -8,11 +8,14 @@ primary_region = 'mia' kill_signal = 'SIGINT' kill_timeout = '30s' +[deploy] + release_command = 'node dist/src/trigger-shutdown.js https://staging-firecrawl-scraper-js.fly.dev' + [build] [processes] - app = 'npm run start:production' - worker = 'npm run worker:production' + app = 'node dist/src/index.js' + worker = 'node dist/src/services/queue-worker.js' [http_service] internal_port = 8080 diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 4d44814c..3091870a 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -115,6 +115,44 @@ if (cluster.isMaster) { } }); + app.post(`/admin/${process.env.BULL_AUTH_KEY}/shutdown`, async (req, res) => { + try { + const wsq = getWebScraperQueue(); + + console.log("Gracefully shutting down..."); + + await wsq.pause(false, true); + + const jobs = await wsq.getActive(); + + if (jobs.length > 0) { + console.log("Removing", jobs.length, "jobs..."); + + await Promise.all(jobs.map(async x => { + await wsq.client.del(await x.lockKey()); + await x.takeLock(); + await x.moveToFailed({ message: "interrupted" }); + await x.remove(); + })); + + console.log("Re-adding", jobs.length, "jobs..."); + await wsq.addBulk(jobs.map(x => ({ + data: x.data, + opts: { + jobId: x.id, + }, + }))); + + console.log("Done!"); + + res.json({ ok: true }); + } + } catch (error) { + console.error(error); + return res.status(500).json({ error: error.message }); + } + }); + app.get(`/serverHealthCheck`, async (req, res) => { try { const webScraperQueue = getWebScraperQueue(); @@ -235,4 +273,11 @@ if (cluster.isMaster) { }); console.log(`Worker ${process.pid} started`); + + (async () => { + const wsq = getWebScraperQueue(); + if (await wsq.isPaused(false)) { + await wsq.resume(false); + } + })(); } diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 90d0dac5..24343487 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -5,19 +5,20 @@ import { logtail } from "./logtail"; import { startWebScraperPipeline } from "../main/runWebScraper"; import { callWebhook } from "./webhook"; import { logJob } from "./logging/log_job"; -// import { initSDK } from '@hyperdx/node-opentelemetry'; +import { initSDK } from '@hyperdx/node-opentelemetry'; -// if(process.env.ENV === 'production') { -// initSDK({ consoleCapture: true, additionalInstrumentations: []}); -// } +if(process.env.ENV === 'production') { + initSDK({ + consoleCapture: true, + additionalInstrumentations: [], + }); +} const wsq = getWebScraperQueue(); -const myJobs = []; wsq.process( Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)), async function (job, done) { - myJobs.push(job.id); try { job.progress({ current: 1, @@ -59,6 +60,10 @@ wsq.process( }); done(null, data); } catch (error) { + if (await getWebScraperQueue().isPaused(false)) { + return; + } + if (error instanceof CustomError) { // Here we handle the error, then save the failed job console.error(error.message); // or any other error handling @@ -99,36 +104,5 @@ wsq.process( }); done(null, data); } - myJobs.splice(myJobs.indexOf(job.id), 1); } ); - -let shuttingDown = false; - -process.on("SIGINT", async () => { - if (shuttingDown) return; - shuttingDown = true; - - console.log("Gracefully shutting down..."); - - await wsq.pause(true, true); - - if (myJobs.length > 0) { - const jobs = await Promise.all(myJobs.map(x => wsq.getJob(x))); - console.log("Removing", jobs.length, "jobs..."); - await Promise.all(jobs.map(async x => { - await x.moveToFailed({ message: "interrupted" }); - await x.remove(); - })); - console.log("Re-adding", jobs.length, "jobs..."); - await wsq.addBulk(jobs.map(x => ({ - data: x.data, - opts: { - jobId: x.id, - }, - }))); - console.log("Done!"); - } - - process.exit(0); -}); diff --git a/apps/api/src/trigger-shutdown.ts b/apps/api/src/trigger-shutdown.ts new file mode 100644 index 00000000..5b36f81f --- /dev/null +++ b/apps/api/src/trigger-shutdown.ts @@ -0,0 +1,9 @@ +fetch(process.argv[2] + "/admin/" + process.env.BULL_AUTH_KEY + "/shutdown", { + method: "POST" +}).then(async x => { + console.log(await x.text()); + process.exit(0); +}).catch(e => { + console.error(e); + process.exit(1); +});