From 8571b5a99d1f1d774f2935baaa45428952d3e326 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Fri, 23 May 2025 15:42:15 +0200 Subject: [PATCH] Revert "feat(queue-worker/afterJobDone): improved ccq insert logic" This reverts commit 97c635676d228ed1342cdd1468cb2a1aef4fcfc9. --- apps/api/src/services/queue-worker.ts | 52 ++++++++++++--------------- 1 file changed, 22 insertions(+), 30 deletions(-) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 2738b8ee..79ab6c69 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -54,7 +54,6 @@ import { scrapeOptions } from "../controllers/v1/types"; import { cleanOldConcurrencyLimitEntries, cleanOldCrawlConcurrencyLimitEntries, - getConcurrencyLimitActiveJobs, pushConcurrencyLimitActiveJob, pushCrawlConcurrencyLimitActiveJob, removeConcurrencyLimitActiveJob, @@ -87,7 +86,6 @@ import http from "http"; import https from "https"; import { cacheableLookup } from "../scraper/scrapeURL/lib/cacheableLookup"; import { robustFetch } from "../scraper/scrapeURL/lib/fetch"; -import { RateLimiterMode } from "../types"; configDotenv(); @@ -807,37 +805,31 @@ const workerFun = async ( } if (job.id && job.data && job.data.team_id) { - const maxConcurrency = (await getACUCTeam(job.data.team_id, false, true, job.data.is_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2; - await removeConcurrencyLimitActiveJob(job.data.team_id, job.id); - await cleanOldConcurrencyLimitEntries(job.data.team_id); + cleanOldConcurrencyLimitEntries(job.data.team_id); - // Check if we're under the concurrency limit before adding a new job - const currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(job.data.team_id)).length; - const concurrencyLimited = currentActiveConcurrency >= maxConcurrency; + // No need to check if we're under the limit here -- if the current job is finished, + // we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG + const nextJob = await takeConcurrencyLimitedJob(job.data.team_id); + if (nextJob !== null) { + await pushConcurrencyLimitActiveJob( + job.data.team_id, + nextJob.id, + 60 * 1000, + ); // 60s initial timeout - if (!concurrencyLimited) { - const nextJob = await takeConcurrencyLimitedJob(job.data.team_id); - if (nextJob !== null) { - await pushConcurrencyLimitActiveJob( - job.data.team_id, - nextJob.id, - 60 * 1000, - ); // 60s initial timeout - - await queue.add( - nextJob.id, - { - ...nextJob.data, - concurrencyLimitHit: true, - }, - { - ...nextJob.opts, - jobId: nextJob.id, - priority: nextJob.priority, - }, - ); - } + await queue.add( + nextJob.id, + { + ...nextJob.data, + concurrencyLimitHit: true, + }, + { + ...nextJob.opts, + jobId: nextJob.id, + priority: nextJob.priority, + }, + ); } } }