diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 7a698772..746d4b97 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -63,8 +63,11 @@ export function waitForJob(jobId: string, timeout: number) { resolve((await getScrapeQueue().getJob(jobId)).returnvalue); } else if (state === "failed") { // console.log("failed", (await getScrapeQueue().getJob(jobId)).failedReason); - clearInterval(int); - reject((await getScrapeQueue().getJob(jobId)).failedReason); + const job = await getScrapeQueue().getJob(jobId); + if (job.failedReason !== "Concurrency limit hit") { + clearInterval(int); + reject(job.failedReason); + } } } }, 500); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 71bd6366..ed13cab7 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -132,7 +132,7 @@ const workerFun = async ( const concurrencyLimiterKey = "concurrency-limiter:" + job.data?.team_id; if (job.data && job.data.team_id) { - const concurrencyLimit = 100; // TODO: determine based on price id + const concurrencyLimit = 10; // TODO: determine based on price id const now = Date.now(); const stalledJobTimeoutMs = 2 * 60 * 1000; @@ -142,11 +142,15 @@ const workerFun = async ( Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit"); // Concurrency limit hit await job.moveToFailed(new Error("Concurrency limit hit"), token, false); + await job.remove(); await queue.add(job.name, job.data, { ...job.opts, jobId: job.id, priority: Math.round((job.opts.priority ?? 10) * 1.25), // exponential backoff for stuck jobs - }) + }); + + await sleep(gotJobInterval); + continue; } else { await redisConnection.zadd(concurrencyLimiterKey, now + stalledJobTimeoutMs, job.id); }