From 97c635676d228ed1342cdd1468cb2a1aef4fcfc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Fri, 23 May 2025 15:41:57 +0200 Subject: [PATCH] feat(queue-worker/afterJobDone): improved ccq insert logic --- apps/api/src/services/queue-worker.ts | 52 +++++++++++++++------------ 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 79ab6c69..2738b8ee 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -54,6 +54,7 @@ import { scrapeOptions } from "../controllers/v1/types"; import { cleanOldConcurrencyLimitEntries, cleanOldCrawlConcurrencyLimitEntries, + getConcurrencyLimitActiveJobs, pushConcurrencyLimitActiveJob, pushCrawlConcurrencyLimitActiveJob, removeConcurrencyLimitActiveJob, @@ -86,6 +87,7 @@ import http from "http"; import https from "https"; import { cacheableLookup } from "../scraper/scrapeURL/lib/cacheableLookup"; import { robustFetch } from "../scraper/scrapeURL/lib/fetch"; +import { RateLimiterMode } from "../types"; configDotenv(); @@ -805,31 +807,37 @@ const workerFun = async ( } if (job.id && job.data && job.data.team_id) { + const maxConcurrency = (await getACUCTeam(job.data.team_id, false, true, job.data.is_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2; + await removeConcurrencyLimitActiveJob(job.data.team_id, job.id); - cleanOldConcurrencyLimitEntries(job.data.team_id); + await cleanOldConcurrencyLimitEntries(job.data.team_id); - // No need to check if we're under the limit here -- if the current job is finished, - // we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG - const nextJob = await takeConcurrencyLimitedJob(job.data.team_id); - if (nextJob !== null) { - await pushConcurrencyLimitActiveJob( - job.data.team_id, - nextJob.id, - 60 * 1000, - ); // 60s initial timeout + // Check if we're under the concurrency limit before adding a new job + const currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(job.data.team_id)).length; + const concurrencyLimited = currentActiveConcurrency >= maxConcurrency; - await queue.add( - nextJob.id, - { - ...nextJob.data, - concurrencyLimitHit: true, - }, - { - ...nextJob.opts, - jobId: nextJob.id, - priority: nextJob.priority, - }, - ); + if (!concurrencyLimited) { + const nextJob = await takeConcurrencyLimitedJob(job.data.team_id); + if (nextJob !== null) { + await pushConcurrencyLimitActiveJob( + job.data.team_id, + nextJob.id, + 60 * 1000, + ); // 60s initial timeout + + await queue.add( + nextJob.id, + { + ...nextJob.data, + concurrencyLimitHit: true, + }, + { + ...nextJob.opts, + jobId: nextJob.id, + priority: nextJob.priority, + }, + ); + } } } }