feat: fix iteration 3 (actually works)

This commit is contained in:
Gergo Moricz 2024-07-11 23:14:15 +02:00
parent 9cd7d79b64
commit 09bca05b20
6 changed files with 75 additions and 44 deletions

View File

@ -31,6 +31,3 @@ COPY --from=build /app /app
# Start the server by default, this can be overwritten at runtime # Start the server by default, this can be overwritten at runtime
EXPOSE 8080 EXPOSE 8080
ENV PUPPETEER_EXECUTABLE_PATH="/usr/bin/chromium" ENV PUPPETEER_EXECUTABLE_PATH="/usr/bin/chromium"
CMD [ "pnpm", "run", "start:production" ]
CMD [ "pnpm", "run", "worker:production" ]

View File

@ -8,11 +8,14 @@ primary_region = 'mia'
kill_signal = 'SIGINT' kill_signal = 'SIGINT'
kill_timeout = '30s' kill_timeout = '30s'
[deploy]
release_command = 'node dist/src/trigger-shutdown.js https://api.firecrawl.dev'
[build] [build]
[processes] [processes]
app = 'npm run start:production' app = 'node dist/src/index.js'
worker = 'npm run worker:production' worker = 'node dist/src/services/queue-worker.js'
[http_service] [http_service]
internal_port = 8080 internal_port = 8080

View File

@ -8,11 +8,14 @@ primary_region = 'mia'
kill_signal = 'SIGINT' kill_signal = 'SIGINT'
kill_timeout = '30s' kill_timeout = '30s'
[deploy]
release_command = 'node dist/src/trigger-shutdown.js https://staging-firecrawl-scraper-js.fly.dev'
[build] [build]
[processes] [processes]
app = 'npm run start:production' app = 'node dist/src/index.js'
worker = 'npm run worker:production' worker = 'node dist/src/services/queue-worker.js'
[http_service] [http_service]
internal_port = 8080 internal_port = 8080

View File

@ -115,6 +115,44 @@ if (cluster.isMaster) {
} }
}); });
app.post(`/admin/${process.env.BULL_AUTH_KEY}/shutdown`, async (req, res) => {
try {
const wsq = getWebScraperQueue();
console.log("Gracefully shutting down...");
await wsq.pause(false, true);
const jobs = await wsq.getActive();
if (jobs.length > 0) {
console.log("Removing", jobs.length, "jobs...");
await Promise.all(jobs.map(async x => {
await wsq.client.del(await x.lockKey());
await x.takeLock();
await x.moveToFailed({ message: "interrupted" });
await x.remove();
}));
console.log("Re-adding", jobs.length, "jobs...");
await wsq.addBulk(jobs.map(x => ({
data: x.data,
opts: {
jobId: x.id,
},
})));
console.log("Done!");
res.json({ ok: true });
}
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
});
app.get(`/serverHealthCheck`, async (req, res) => { app.get(`/serverHealthCheck`, async (req, res) => {
try { try {
const webScraperQueue = getWebScraperQueue(); const webScraperQueue = getWebScraperQueue();
@ -235,4 +273,11 @@ if (cluster.isMaster) {
}); });
console.log(`Worker ${process.pid} started`); console.log(`Worker ${process.pid} started`);
(async () => {
const wsq = getWebScraperQueue();
if (await wsq.isPaused(false)) {
await wsq.resume(false);
}
})();
} }

View File

@ -5,19 +5,20 @@ import { logtail } from "./logtail";
import { startWebScraperPipeline } from "../main/runWebScraper"; import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook"; import { callWebhook } from "./webhook";
import { logJob } from "./logging/log_job"; import { logJob } from "./logging/log_job";
// import { initSDK } from '@hyperdx/node-opentelemetry'; import { initSDK } from '@hyperdx/node-opentelemetry';
// if(process.env.ENV === 'production') { if(process.env.ENV === 'production') {
// initSDK({ consoleCapture: true, additionalInstrumentations: []}); initSDK({
// } consoleCapture: true,
additionalInstrumentations: [],
});
}
const wsq = getWebScraperQueue(); const wsq = getWebScraperQueue();
const myJobs = [];
wsq.process( wsq.process(
Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)), Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
async function (job, done) { async function (job, done) {
myJobs.push(job.id);
try { try {
job.progress({ job.progress({
current: 1, current: 1,
@ -59,6 +60,10 @@ wsq.process(
}); });
done(null, data); done(null, data);
} catch (error) { } catch (error) {
if (await getWebScraperQueue().isPaused(false)) {
return;
}
if (error instanceof CustomError) { if (error instanceof CustomError) {
// Here we handle the error, then save the failed job // Here we handle the error, then save the failed job
console.error(error.message); // or any other error handling console.error(error.message); // or any other error handling
@ -99,36 +104,5 @@ wsq.process(
}); });
done(null, data); done(null, data);
} }
myJobs.splice(myJobs.indexOf(job.id), 1);
} }
); );
let shuttingDown = false;
process.on("SIGINT", async () => {
if (shuttingDown) return;
shuttingDown = true;
console.log("Gracefully shutting down...");
await wsq.pause(true, true);
if (myJobs.length > 0) {
const jobs = await Promise.all(myJobs.map(x => wsq.getJob(x)));
console.log("Removing", jobs.length, "jobs...");
await Promise.all(jobs.map(async x => {
await x.moveToFailed({ message: "interrupted" });
await x.remove();
}));
console.log("Re-adding", jobs.length, "jobs...");
await wsq.addBulk(jobs.map(x => ({
data: x.data,
opts: {
jobId: x.id,
},
})));
console.log("Done!");
}
process.exit(0);
});

View File

@ -0,0 +1,9 @@
fetch(process.argv[2] + "/admin/" + process.env.BULL_AUTH_KEY + "/shutdown", {
method: "POST"
}).then(async x => {
console.log(await x.text());
process.exit(0);
}).catch(e => {
console.error(e);
process.exit(1);
});