feat(queue-worker/afterJobDone): improved ccq insert logic (#1595)

This commit is contained in:
Gergő Móricz 2025-05-23 16:50:14 +02:00 committed by GitHub
parent a7894a2714
commit 3df687e4db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -54,6 +54,7 @@ import { scrapeOptions } from "../controllers/v1/types";
import {
cleanOldConcurrencyLimitEntries,
cleanOldCrawlConcurrencyLimitEntries,
getConcurrencyLimitActiveJobs,
pushConcurrencyLimitActiveJob,
pushCrawlConcurrencyLimitActiveJob,
removeConcurrencyLimitActiveJob,
@ -86,6 +87,7 @@ import http from "http";
import https from "https";
import { cacheableLookup } from "../scraper/scrapeURL/lib/cacheableLookup";
import { robustFetch } from "../scraper/scrapeURL/lib/fetch";
import { RateLimiterMode } from "../types";
configDotenv();
@ -805,11 +807,16 @@ const workerFun = async (
}
if (job.id && job.data && job.data.team_id) {
await removeConcurrencyLimitActiveJob(job.data.team_id, job.id);
cleanOldConcurrencyLimitEntries(job.data.team_id);
const maxConcurrency = (await getACUCTeam(job.data.team_id, false, true, job.data.is_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
// No need to check if we're under the limit here -- if the current job is finished,
// we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG
await removeConcurrencyLimitActiveJob(job.data.team_id, job.id);
await cleanOldConcurrencyLimitEntries(job.data.team_id);
// Check if we're under the concurrency limit before adding a new job
const currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(job.data.team_id)).length;
const concurrencyLimited = currentActiveConcurrency >= maxConcurrency;
if (!concurrencyLimited) {
const nextJob = await takeConcurrencyLimitedJob(job.data.team_id);
if (nextJob !== null) {
await pushConcurrencyLimitActiveJob(
@ -833,6 +840,7 @@ const workerFun = async (
}
}
}
}
if (job.data && job.data.sentry && Sentry.isInitialized()) {
Sentry.continueTrace(