Merge pull request #393 from mendableai/mog/job-stuck-fix

fix(bull): requeue jobs after restart
This commit is contained in:
Nicolas 2024-07-12 10:43:02 -04:00 committed by GitHub
commit c07a1912e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -120,36 +120,9 @@ if (cluster.isMaster) {
return res.status(200).json({ ok: true });
try {
const wsq = getWebScraperQueue();
console.log("Gracefully shutting down...");
await wsq.pause(false, true);
const jobs = await wsq.getActive();
if (jobs.length > 0) {
console.log("Removing", jobs.length, "jobs...");
await Promise.all(jobs.map(async x => {
await wsq.client.del(await x.lockKey());
await x.takeLock();
await x.moveToFailed({ message: "interrupted" });
await x.remove();
}));
console.log("Re-adding", jobs.length, "jobs...");
await wsq.addBulk(jobs.map(x => ({
data: x.data,
opts: {
jobId: x.id,
},
})));
console.log("Done!");
await getWebScraperQueue().pause(false, true);
res.json({ ok: true });
}
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
@ -157,10 +130,44 @@ if (cluster.isMaster) {
});
app.post(`/admin/${process.env.BULL_AUTH_KEY}/unpause`, async (req, res) => {
return res.status(200).json({ ok: true });
try {
const wsq = getWebScraperQueue();
const jobs = await wsq.getActive();
console.log("Requeueing", jobs.length, "jobs...");
if (jobs.length > 0) {
console.log(" Removing", jobs.length, "jobs...");
await Promise.all(jobs.map(async x => {
try {
await wsq.client.del(await x.lockKey());
await x.takeLock();
await x.moveToFailed({ message: "interrupted" });
await x.remove();
} catch (e) {
console.warn("Failed to remove job", x.id, e);
}
}));
console.log(" Re-adding", jobs.length, "jobs...");
await wsq.addBulk(jobs.map(x => ({
data: x.data,
opts: {
jobId: x.id,
},
})));
console.log(" Done!");
}
await getWebScraperQueue().resume(false);
res.json({ ok: true });
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
});
app.get(`/serverHealthCheck`, async (req, res) => {