fix(queue-jobs): fix concurrency limit

This commit is contained in:
Gergő Móricz 2024-12-15 23:54:52 +01:00
parent 72d6a8179e
commit 2de659d810
2 changed files with 126 additions and 32 deletions

View File

@ -18,7 +18,7 @@ import {
} from "../../lib/crawl-redis";
import { logCrawl } from "../../services/logging/crawl_log";
import { getScrapeQueue } from "../../services/queue-service";
import { addScrapeJob } from "../../services/queue-jobs";
import { addScrapeJob, addScrapeJobs } from "../../services/queue-jobs";
import { logger as _logger } from "../../lib/logger";
import { getJobPriority } from "../../lib/job-priority";
import { callWebhook } from "../../services/webhook";
@ -139,9 +139,9 @@ export async function crawlController(
name: uuid,
data: {
url,
mode: "single_urls",
mode: "single_urls" as const,
team_id: req.auth.team_id,
plan: req.auth.plan,
plan: req.auth.plan!,
crawlerOptions,
scrapeOptions,
internalOptions: sc.internalOptions,
@ -170,7 +170,7 @@ export async function crawlController(
jobs.map((x) => x.opts.jobId),
);
logger.debug("Adding scrape jobs to BullMQ...");
await getScrapeQueue().addBulk(jobs);
await addScrapeJobs(jobs);
} else {
logger.debug("Sitemap not found or ignored.", {
ignoreSitemap: sc.crawlerOptions.ignoreSitemap,

View File

@ -11,11 +11,50 @@ import {
pushConcurrencyLimitedJob,
} from "../lib/concurrency-limit";
async function _addScrapeJobToConcurrencyQueue(
webScraperOptions: any,
options: any,
jobId: string,
jobPriority: number,
) {
await pushConcurrencyLimitedJob(webScraperOptions.team_id, {
id: jobId,
data: webScraperOptions,
opts: {
...options,
priority: jobPriority,
jobId: jobId,
},
priority: jobPriority,
});
}
async function _addScrapeJobToBullMQ(
webScraperOptions: any,
options: any,
jobId: string,
jobPriority: number,
) {
if (
webScraperOptions &&
webScraperOptions.team_id &&
webScraperOptions.plan
) {
await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId);
}
await getScrapeQueue().add(jobId, webScraperOptions, {
...options,
priority: jobPriority,
jobId,
});
}
async function addScrapeJobRaw(
webScraperOptions: any,
options: any,
jobId: string,
jobPriority: number = 10,
jobPriority: number,
) {
let concurrencyLimited = false;
@ -33,30 +72,9 @@ async function addScrapeJobRaw(
}
if (concurrencyLimited) {
await pushConcurrencyLimitedJob(webScraperOptions.team_id, {
id: jobId,
data: webScraperOptions,
opts: {
...options,
priority: jobPriority,
jobId: jobId,
},
priority: jobPriority,
});
await _addScrapeJobToConcurrencyQueue(webScraperOptions, options, jobId, jobPriority);
} else {
if (
webScraperOptions &&
webScraperOptions.team_id &&
webScraperOptions.plan
) {
await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId);
}
await getScrapeQueue().add(jobId, webScraperOptions, {
...options,
priority: jobPriority,
jobId,
});
await _addScrapeJobToBullMQ(webScraperOptions, options, jobId, jobPriority);
}
}
@ -109,11 +127,87 @@ export async function addScrapeJobs(
}[],
) {
if (jobs.length === 0) return true;
// TODO: better
let countCanBeDirectlyAdded = Infinity;
if (
jobs[0].data &&
jobs[0].data.team_id &&
jobs[0].data.plan
) {
const now = Date.now();
const limit = await getConcurrencyLimitMax(jobs[0].data.plan);
console.log("CC limit", limit);
cleanOldConcurrencyLimitEntries(jobs[0].data.team_id, now);
countCanBeDirectlyAdded = Math.max(limit - (await getConcurrencyLimitActiveJobs(jobs[0].data.team_id, now)).length, 0);
}
const addToBull = jobs.slice(0, countCanBeDirectlyAdded);
const addToCQ = jobs.slice(countCanBeDirectlyAdded);
await Promise.all(
jobs.map((job) =>
addScrapeJob(job.data, job.opts, job.opts.jobId, job.opts.priority),
),
addToBull.map(async (job) => {
const size = JSON.stringify(job.data).length;
return await Sentry.startSpan(
{
name: "Add scrape job",
op: "queue.publish",
attributes: {
"messaging.message.id": job.opts.jobId,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": size,
},
},
async (span) => {
await _addScrapeJobToBullMQ(
{
...job.data,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
},
job.opts,
job.opts.jobId,
job.opts.priority,
);
},
);
}),
);
await Promise.all(
addToCQ.map(async (job) => {
const size = JSON.stringify(job.data).length;
return await Sentry.startSpan(
{
name: "Add scrape job",
op: "queue.publish",
attributes: {
"messaging.message.id": job.opts.jobId,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": size,
},
},
async (span) => {
await _addScrapeJobToConcurrencyQueue(
{
...job.data,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
},
job.opts,
job.opts.jobId,
job.opts.priority,
);
},
);
}),
);
}