diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index fe1ccb69..060f6754 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -120,36 +120,9 @@ if (cluster.isMaster) { return res.status(200).json({ ok: true }); try { - const wsq = getWebScraperQueue(); - console.log("Gracefully shutting down..."); - - await wsq.pause(false, true); - - const jobs = await wsq.getActive(); - - if (jobs.length > 0) { - console.log("Removing", jobs.length, "jobs..."); - - await Promise.all(jobs.map(async x => { - await wsq.client.del(await x.lockKey()); - await x.takeLock(); - await x.moveToFailed({ message: "interrupted" }); - await x.remove(); - })); - - console.log("Re-adding", jobs.length, "jobs..."); - await wsq.addBulk(jobs.map(x => ({ - data: x.data, - opts: { - jobId: x.id, - }, - }))); - - console.log("Done!"); - - res.json({ ok: true }); - } + await getWebScraperQueue().pause(false, true); + res.json({ ok: true }); } catch (error) { console.error(error); return res.status(500).json({ error: error.message }); @@ -157,10 +130,44 @@ if (cluster.isMaster) { }); app.post(`/admin/${process.env.BULL_AUTH_KEY}/unpause`, async (req, res) => { - return res.status(200).json({ ok: true }); + try { + const wsq = getWebScraperQueue(); - await getWebScraperQueue().resume(false); - res.json({ ok: true }); + const jobs = await wsq.getActive(); + + console.log("Requeueing", jobs.length, "jobs..."); + + if (jobs.length > 0) { + console.log(" Removing", jobs.length, "jobs..."); + + await Promise.all(jobs.map(async x => { + try { + await wsq.client.del(await x.lockKey()); + await x.takeLock(); + await x.moveToFailed({ message: "interrupted" }); + await x.remove(); + } catch (e) { + console.warn("Failed to remove job", x.id, e); + } + })); + + console.log(" Re-adding", jobs.length, "jobs..."); + await wsq.addBulk(jobs.map(x => ({ + data: x.data, + opts: { + jobId: x.id, + }, + }))); + + console.log(" Done!"); + } + + await getWebScraperQueue().resume(false); + res.json({ ok: true }); + } catch (error) { + console.error(error); + return res.status(500).json({ error: error.message }); + } }); app.get(`/serverHealthCheck`, async (req, res) => {