diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index aaad7f52..0aa28cba 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -143,16 +143,21 @@ const workerFun = async ( redisConnection.zremrangebyscore(concurrencyLimiterKey, -Infinity, now); const activeJobsOfTeam = await redisConnection.zrangebyscore(concurrencyLimiterKey, now, Infinity); if (activeJobsOfTeam.length >= concurrencyLimit) { - Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit"); - // Concurrency limit hit + // Nick: removed the log because it was too spammy, tested and confirmed that the job is added back to the queue + // Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit"); + // Concurrency limit hit, throttles the job await redisConnection.zadd(concurrencyLimiterThrottledKey, now + throttledJobTimeoutMs, job.id); + // We move to failed with a specific error await job.moveToFailed(new Error("Concurrency limit hit"), token, false); + // Remove the job from the queue await job.remove(); - let newJobPriority = Math.round((job.opts.priority ?? 10) * 1.01); - // max priority is 200k, limit is 2 million + // Increment the priority of the job exponentially by 5% + let newJobPriority = Math.round((job.opts.priority ?? 10) * 1.05); + // Max priority is 200k, limit is 2 million if(newJobPriority > 200000) { newJobPriority = 200000; } + // Add the job back to the queue with the new priority await queue.add(job.name, { ...job.data, concurrencyLimitHit: true, @@ -165,7 +170,9 @@ const workerFun = async ( await sleep(gotJobInterval); continue; } else { + // If we are not throttled, add the job back to the queue with the new priority await redisConnection.zadd(concurrencyLimiterKey, now + stalledJobTimeoutMs, job.id); + // Remove the job from the throttled list await redisConnection.zrem(concurrencyLimiterThrottledKey, job.id); } }