Merge pull request #429 from mendableai/mog/fix-job-stuck-2

Fix queue stuck bug via lock settings changes
This commit is contained in:
Nicolas 2024-07-18 12:39:21 -04:00 committed by GitHub
commit 01b5e8fc73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 92 additions and 166 deletions

View File

@ -8,9 +8,6 @@ primary_region = 'mia'
kill_signal = 'SIGINT'
kill_timeout = '30s'
[deploy]
release_command = 'node dist/src/trigger-shutdown.js https://staging-firecrawl-scraper-js.fly.dev'
[build]
[processes]

View File

@ -8,9 +8,6 @@ primary_region = 'mia'
kill_signal = 'SIGINT'
kill_timeout = '30s'
[deploy]
release_command = 'node dist/src/trigger-shutdown.js https://api.firecrawl.dev'
[build]
[processes]

View File

@ -19,8 +19,8 @@
"mongo-docker": "docker run -d -p 2717:27017 -v ./mongo-data:/data/db --name mongodb mongo:latest",
"mongo-docker-console": "docker exec -it mongodb mongosh",
"run-example": "npx ts-node src/example.ts",
"deploy:fly": "flyctl deploy && node postdeploy.js https://api.firecrawl.dev",
"deploy:fly:staging": "fly deploy -c fly.staging.toml && node postdeploy.js https://staging-firecrawl-scraper-js.fly.dev"
"deploy:fly": "flyctl deploy",
"deploy:fly:staging": "fly deploy -c fly.staging.toml"
},
"author": "",
"license": "ISC",

View File

@ -1,11 +0,0 @@
require("dotenv").config();
fetch(process.argv[2] + "/admin/" + process.env.BULL_AUTH_KEY + "/unpause", {
method: "POST"
}).then(async x => {
console.log(await x.text());
process.exit(0);
}).catch(e => {
console.error(e);
process.exit(1);
});

View File

@ -119,63 +119,6 @@ if (cluster.isMaster) {
}
});
app.post(`/admin/${process.env.BULL_AUTH_KEY}/shutdown`, async (req, res) => {
// return res.status(200).json({ ok: true });
try {
console.log("Gracefully shutting down...");
await getWebScraperQueue().pause(false, true);
res.json({ ok: true });
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
});
app.post(`/admin/${process.env.BULL_AUTH_KEY}/unpause`, async (req, res) => {
try {
const wsq = getWebScraperQueue();
const jobs = await wsq.getActive();
console.log("Requeueing", jobs.length, "jobs...");
if (jobs.length > 0) {
console.log(" Removing", jobs.length, "jobs...");
await Promise.all(
jobs.map(async (x) => {
try {
await wsq.client.del(await x.lockKey());
await x.takeLock();
await x.moveToFailed({ message: "interrupted" });
await x.remove();
} catch (e) {
console.warn("Failed to remove job", x.id, e);
}
})
);
console.log(" Re-adding", jobs.length, "jobs...");
await wsq.addBulk(
jobs.map((x) => ({
data: x.data,
opts: {
jobId: x.id,
},
}))
);
console.log(" Done!");
}
await getWebScraperQueue().resume(false);
res.json({ ok: true });
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
});
app.get(`/serverHealthCheck`, async (req, res) => {
try {
const webScraperQueue = getWebScraperQueue();

View File

@ -7,8 +7,10 @@ export function getWebScraperQueue() {
if (!webScraperQueue) {
webScraperQueue = new Queue("web-scraper", process.env.REDIS_URL, {
settings: {
lockDuration: 2 * 60 * 60 * 1000, // 2 hours in milliseconds,
lockRenewTime: 30 * 60 * 1000, // 30 minutes in milliseconds
lockDuration: 2 * 60 * 1000, // 1 minute in milliseconds,
lockRenewTime: 15 * 1000, // 15 seconds in milliseconds
stalledInterval: 30 * 1000,
maxStalledCount: 10,
},
});
console.log("Web scraper queue created");

View File

@ -6,6 +6,7 @@ import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook";
import { logJob } from "./logging/log_job";
import { initSDK } from '@hyperdx/node-opentelemetry';
import { Job } from "bull";
if(process.env.ENV === 'production') {
initSDK({
@ -16,9 +17,8 @@ if(process.env.ENV === 'production') {
const wsq = getWebScraperQueue();
wsq.process(
Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
async function (job, done) {
async function processJob(job: Job, done) {
console.log("taking job", job.id);
try {
job.progress({
current: 1,
@ -58,9 +58,12 @@ wsq.process(
pageOptions: job.data.pageOptions,
origin: job.data.origin,
});
console.log("job done", job.id);
done(null, data);
} catch (error) {
console.log("job errored", job.id, error);
if (await getWebScraperQueue().isPaused(false)) {
console.log("queue is paused, ignoring");
return;
}
@ -105,4 +108,8 @@ wsq.process(
done(null, data);
}
}
wsq.process(
Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
processJob
);

View File

@ -1,9 +0,0 @@
fetch(process.argv[2] + "/admin/" + process.env.BULL_AUTH_KEY + "/shutdown", {
method: "POST"
}).then(async x => {
console.log(await x.text());
process.exit(0);
}).catch(e => {
console.error(e);
process.exit(1);
});