fix(queue-worker, queue-jobs): logic fixes

This commit is contained in:
Gergő Móricz 2024-09-26 20:39:19 +02:00
parent d2881927c1
commit dec4171937
2 changed files with 11 additions and 4 deletions

View File

@ -63,8 +63,11 @@ export function waitForJob(jobId: string, timeout: number) {
resolve((await getScrapeQueue().getJob(jobId)).returnvalue);
} else if (state === "failed") {
// console.log("failed", (await getScrapeQueue().getJob(jobId)).failedReason);
clearInterval(int);
reject((await getScrapeQueue().getJob(jobId)).failedReason);
const job = await getScrapeQueue().getJob(jobId);
if (job.failedReason !== "Concurrency limit hit") {
clearInterval(int);
reject(job.failedReason);
}
}
}
}, 500);

View File

@ -132,7 +132,7 @@ const workerFun = async (
const concurrencyLimiterKey = "concurrency-limiter:" + job.data?.team_id;
if (job.data && job.data.team_id) {
const concurrencyLimit = 100; // TODO: determine based on price id
const concurrencyLimit = 10; // TODO: determine based on price id
const now = Date.now();
const stalledJobTimeoutMs = 2 * 60 * 1000;
@ -142,11 +142,15 @@ const workerFun = async (
Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit");
// Concurrency limit hit
await job.moveToFailed(new Error("Concurrency limit hit"), token, false);
await job.remove();
await queue.add(job.name, job.data, {
...job.opts,
jobId: job.id,
priority: Math.round((job.opts.priority ?? 10) * 1.25), // exponential backoff for stuck jobs
})
});
await sleep(gotJobInterval);
continue;
} else {
await redisConnection.zadd(concurrencyLimiterKey, now + stalledJobTimeoutMs, job.id);
}