Revert "feat(queue-worker/afterJobDone): improved ccq insert logic"

This reverts commit 97c635676d228ed1342cdd1468cb2a1aef4fcfc9.
This commit is contained in:
Gergő Móricz 2025-05-23 15:42:15 +02:00
parent 97c635676d
commit 8571b5a99d

View File

@ -54,7 +54,6 @@ import { scrapeOptions } from "../controllers/v1/types";
import { import {
cleanOldConcurrencyLimitEntries, cleanOldConcurrencyLimitEntries,
cleanOldCrawlConcurrencyLimitEntries, cleanOldCrawlConcurrencyLimitEntries,
getConcurrencyLimitActiveJobs,
pushConcurrencyLimitActiveJob, pushConcurrencyLimitActiveJob,
pushCrawlConcurrencyLimitActiveJob, pushCrawlConcurrencyLimitActiveJob,
removeConcurrencyLimitActiveJob, removeConcurrencyLimitActiveJob,
@ -87,7 +86,6 @@ import http from "http";
import https from "https"; import https from "https";
import { cacheableLookup } from "../scraper/scrapeURL/lib/cacheableLookup"; import { cacheableLookup } from "../scraper/scrapeURL/lib/cacheableLookup";
import { robustFetch } from "../scraper/scrapeURL/lib/fetch"; import { robustFetch } from "../scraper/scrapeURL/lib/fetch";
import { RateLimiterMode } from "../types";
configDotenv(); configDotenv();
@ -807,37 +805,31 @@ const workerFun = async (
} }
if (job.id && job.data && job.data.team_id) { if (job.id && job.data && job.data.team_id) {
const maxConcurrency = (await getACUCTeam(job.data.team_id, false, true, job.data.is_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
await removeConcurrencyLimitActiveJob(job.data.team_id, job.id); await removeConcurrencyLimitActiveJob(job.data.team_id, job.id);
await cleanOldConcurrencyLimitEntries(job.data.team_id); cleanOldConcurrencyLimitEntries(job.data.team_id);
// Check if we're under the concurrency limit before adding a new job // No need to check if we're under the limit here -- if the current job is finished,
const currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(job.data.team_id)).length; // we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG
const concurrencyLimited = currentActiveConcurrency >= maxConcurrency; const nextJob = await takeConcurrencyLimitedJob(job.data.team_id);
if (nextJob !== null) {
await pushConcurrencyLimitActiveJob(
job.data.team_id,
nextJob.id,
60 * 1000,
); // 60s initial timeout
if (!concurrencyLimited) { await queue.add(
const nextJob = await takeConcurrencyLimitedJob(job.data.team_id); nextJob.id,
if (nextJob !== null) { {
await pushConcurrencyLimitActiveJob( ...nextJob.data,
job.data.team_id, concurrencyLimitHit: true,
nextJob.id, },
60 * 1000, {
); // 60s initial timeout ...nextJob.opts,
jobId: nextJob.id,
await queue.add( priority: nextJob.priority,
nextJob.id, },
{ );
...nextJob.data,
concurrencyLimitHit: true,
},
{
...nextJob.opts,
jobId: nextJob.id,
priority: nextJob.priority,
},
);
}
} }
} }
} }