From 10957b748b4d51ca5b57abb4ba802044f8aea8b0 Mon Sep 17 00:00:00 2001 From: Gergo Moricz Date: Fri, 12 Jul 2024 13:55:53 +0200 Subject: [PATCH 1/2] fix(bull): requeue jobs after restart --- apps/api/src/index.ts | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index dc89c096..c2496eac 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -117,17 +117,26 @@ if (cluster.isMaster) { }); app.post(`/admin/${process.env.BULL_AUTH_KEY}/shutdown`, async (req, res) => { + try { + console.log("Gracefully shutting down..."); + await getWebScraperQueue().pause(false, true); + res.json({ ok: true }); + } catch (error) { + console.error(error); + return res.status(500).json({ error: error.message }); + } + }); + + app.post(`/admin/${process.env.BULL_AUTH_KEY}/unpause`, async (req, res) => { try { const wsq = getWebScraperQueue(); - console.log("Gracefully shutting down..."); - - await wsq.pause(false, true); - const jobs = await wsq.getActive(); - + + console.log("Requeueing", jobs.length, "jobs..."); + if (jobs.length > 0) { - console.log("Removing", jobs.length, "jobs..."); + console.log(" Removing", jobs.length, "jobs..."); await Promise.all(jobs.map(async x => { await wsq.client.del(await x.lockKey()); @@ -136,7 +145,7 @@ if (cluster.isMaster) { await x.remove(); })); - console.log("Re-adding", jobs.length, "jobs..."); + console.log(" Re-adding", jobs.length, "jobs..."); await wsq.addBulk(jobs.map(x => ({ data: x.data, opts: { @@ -144,21 +153,17 @@ if (cluster.isMaster) { }, }))); - console.log("Done!"); - - res.json({ ok: true }); + console.log(" Done!"); } + + await getWebScraperQueue().resume(false); + res.json({ ok: true }); } catch (error) { console.error(error); return res.status(500).json({ error: error.message }); } }); - app.post(`/admin/${process.env.BULL_AUTH_KEY}/unpause`, async (req, res) => { - await getWebScraperQueue().resume(false); - res.json({ ok: true }); - }); - app.get(`/serverHealthCheck`, async (req, res) => { try { const webScraperQueue = getWebScraperQueue(); From 0d3e09e798ad9918a611ab0341d8f72de270cd4b Mon Sep 17 00:00:00 2001 From: Gergo Moricz Date: Fri, 12 Jul 2024 16:35:34 +0200 Subject: [PATCH 2/2] fix: try-catch job removal --- apps/api/src/index.ts | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 818cc5d0..060f6754 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -141,10 +141,14 @@ if (cluster.isMaster) { console.log(" Removing", jobs.length, "jobs..."); await Promise.all(jobs.map(async x => { - await wsq.client.del(await x.lockKey()); - await x.takeLock(); - await x.moveToFailed({ message: "interrupted" }); - await x.remove(); + try { + await wsq.client.del(await x.lockKey()); + await x.takeLock(); + await x.moveToFailed({ message: "interrupted" }); + await x.remove(); + } catch (e) { + console.warn("Failed to remove job", x.id, e); + } })); console.log(" Re-adding", jobs.length, "jobs...");