Update queue-worker.ts

This commit is contained in:
Nicolas 2024-10-01 15:38:24 -03:00
parent 37299fc035
commit c0541cc990

View File

@ -143,16 +143,21 @@ const workerFun = async (
redisConnection.zremrangebyscore(concurrencyLimiterKey, -Infinity, now); redisConnection.zremrangebyscore(concurrencyLimiterKey, -Infinity, now);
const activeJobsOfTeam = await redisConnection.zrangebyscore(concurrencyLimiterKey, now, Infinity); const activeJobsOfTeam = await redisConnection.zrangebyscore(concurrencyLimiterKey, now, Infinity);
if (activeJobsOfTeam.length >= concurrencyLimit) { if (activeJobsOfTeam.length >= concurrencyLimit) {
Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit"); // Nick: removed the log because it was too spammy, tested and confirmed that the job is added back to the queue
// Concurrency limit hit // Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit");
// Concurrency limit hit, throttles the job
await redisConnection.zadd(concurrencyLimiterThrottledKey, now + throttledJobTimeoutMs, job.id); await redisConnection.zadd(concurrencyLimiterThrottledKey, now + throttledJobTimeoutMs, job.id);
// We move to failed with a specific error
await job.moveToFailed(new Error("Concurrency limit hit"), token, false); await job.moveToFailed(new Error("Concurrency limit hit"), token, false);
// Remove the job from the queue
await job.remove(); await job.remove();
let newJobPriority = Math.round((job.opts.priority ?? 10) * 1.01); // Increment the priority of the job exponentially by 5%
// max priority is 200k, limit is 2 million let newJobPriority = Math.round((job.opts.priority ?? 10) * 1.05);
// Max priority is 200k, limit is 2 million
if(newJobPriority > 200000) { if(newJobPriority > 200000) {
newJobPriority = 200000; newJobPriority = 200000;
} }
// Add the job back to the queue with the new priority
await queue.add(job.name, { await queue.add(job.name, {
...job.data, ...job.data,
concurrencyLimitHit: true, concurrencyLimitHit: true,
@ -165,7 +170,9 @@ const workerFun = async (
await sleep(gotJobInterval); await sleep(gotJobInterval);
continue; continue;
} else { } else {
// If we are not throttled, add the job back to the queue with the new priority
await redisConnection.zadd(concurrencyLimiterKey, now + stalledJobTimeoutMs, job.id); await redisConnection.zadd(concurrencyLimiterKey, now + stalledJobTimeoutMs, job.id);
// Remove the job from the throttled list
await redisConnection.zrem(concurrencyLimiterThrottledKey, job.id); await redisConnection.zrem(concurrencyLimiterThrottledKey, job.id);
} }
} }