From 2de659d81050903014709f22a6cc7170625e47c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Sun, 15 Dec 2024 23:54:52 +0100 Subject: [PATCH] fix(queue-jobs): fix concurrency limit --- apps/api/src/controllers/v1/crawl.ts | 8 +- apps/api/src/services/queue-jobs.ts | 150 ++++++++++++++++++++++----- 2 files changed, 126 insertions(+), 32 deletions(-) diff --git a/apps/api/src/controllers/v1/crawl.ts b/apps/api/src/controllers/v1/crawl.ts index 1fb470f9..c2e3369f 100644 --- a/apps/api/src/controllers/v1/crawl.ts +++ b/apps/api/src/controllers/v1/crawl.ts @@ -18,7 +18,7 @@ import { } from "../../lib/crawl-redis"; import { logCrawl } from "../../services/logging/crawl_log"; import { getScrapeQueue } from "../../services/queue-service"; -import { addScrapeJob } from "../../services/queue-jobs"; +import { addScrapeJob, addScrapeJobs } from "../../services/queue-jobs"; import { logger as _logger } from "../../lib/logger"; import { getJobPriority } from "../../lib/job-priority"; import { callWebhook } from "../../services/webhook"; @@ -139,9 +139,9 @@ export async function crawlController( name: uuid, data: { url, - mode: "single_urls", + mode: "single_urls" as const, team_id: req.auth.team_id, - plan: req.auth.plan, + plan: req.auth.plan!, crawlerOptions, scrapeOptions, internalOptions: sc.internalOptions, @@ -170,7 +170,7 @@ export async function crawlController( jobs.map((x) => x.opts.jobId), ); logger.debug("Adding scrape jobs to BullMQ..."); - await getScrapeQueue().addBulk(jobs); + await addScrapeJobs(jobs); } else { logger.debug("Sitemap not found or ignored.", { ignoreSitemap: sc.crawlerOptions.ignoreSitemap, diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index ee9e6177..6ce48a81 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -11,11 +11,50 @@ import { pushConcurrencyLimitedJob, } from "../lib/concurrency-limit"; +async function _addScrapeJobToConcurrencyQueue( + webScraperOptions: any, + options: any, + jobId: string, + jobPriority: number, +) { + await pushConcurrencyLimitedJob(webScraperOptions.team_id, { + id: jobId, + data: webScraperOptions, + opts: { + ...options, + priority: jobPriority, + jobId: jobId, + }, + priority: jobPriority, + }); +} + +async function _addScrapeJobToBullMQ( + webScraperOptions: any, + options: any, + jobId: string, + jobPriority: number, +) { + if ( + webScraperOptions && + webScraperOptions.team_id && + webScraperOptions.plan + ) { + await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId); + } + + await getScrapeQueue().add(jobId, webScraperOptions, { + ...options, + priority: jobPriority, + jobId, + }); +} + async function addScrapeJobRaw( webScraperOptions: any, options: any, jobId: string, - jobPriority: number = 10, + jobPriority: number, ) { let concurrencyLimited = false; @@ -33,30 +72,9 @@ async function addScrapeJobRaw( } if (concurrencyLimited) { - await pushConcurrencyLimitedJob(webScraperOptions.team_id, { - id: jobId, - data: webScraperOptions, - opts: { - ...options, - priority: jobPriority, - jobId: jobId, - }, - priority: jobPriority, - }); + await _addScrapeJobToConcurrencyQueue(webScraperOptions, options, jobId, jobPriority); } else { - if ( - webScraperOptions && - webScraperOptions.team_id && - webScraperOptions.plan - ) { - await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId); - } - - await getScrapeQueue().add(jobId, webScraperOptions, { - ...options, - priority: jobPriority, - jobId, - }); + await _addScrapeJobToBullMQ(webScraperOptions, options, jobId, jobPriority); } } @@ -109,11 +127,87 @@ export async function addScrapeJobs( }[], ) { if (jobs.length === 0) return true; - // TODO: better + + let countCanBeDirectlyAdded = Infinity; + + if ( + jobs[0].data && + jobs[0].data.team_id && + jobs[0].data.plan + ) { + const now = Date.now(); + const limit = await getConcurrencyLimitMax(jobs[0].data.plan); + console.log("CC limit", limit); + cleanOldConcurrencyLimitEntries(jobs[0].data.team_id, now); + + countCanBeDirectlyAdded = Math.max(limit - (await getConcurrencyLimitActiveJobs(jobs[0].data.team_id, now)).length, 0); + } + + const addToBull = jobs.slice(0, countCanBeDirectlyAdded); + const addToCQ = jobs.slice(countCanBeDirectlyAdded); + await Promise.all( - jobs.map((job) => - addScrapeJob(job.data, job.opts, job.opts.jobId, job.opts.priority), - ), + addToBull.map(async (job) => { + const size = JSON.stringify(job.data).length; + return await Sentry.startSpan( + { + name: "Add scrape job", + op: "queue.publish", + attributes: { + "messaging.message.id": job.opts.jobId, + "messaging.destination.name": getScrapeQueue().name, + "messaging.message.body.size": size, + }, + }, + async (span) => { + await _addScrapeJobToBullMQ( + { + ...job.data, + sentry: { + trace: Sentry.spanToTraceHeader(span), + baggage: Sentry.spanToBaggageHeader(span), + size, + }, + }, + job.opts, + job.opts.jobId, + job.opts.priority, + ); + }, + ); + }), + ); + + await Promise.all( + addToCQ.map(async (job) => { + const size = JSON.stringify(job.data).length; + return await Sentry.startSpan( + { + name: "Add scrape job", + op: "queue.publish", + attributes: { + "messaging.message.id": job.opts.jobId, + "messaging.destination.name": getScrapeQueue().name, + "messaging.message.body.size": size, + }, + }, + async (span) => { + await _addScrapeJobToConcurrencyQueue( + { + ...job.data, + sentry: { + trace: Sentry.spanToTraceHeader(span), + baggage: Sentry.spanToBaggageHeader(span), + size, + }, + }, + job.opts, + job.opts.jobId, + job.opts.priority, + ); + }, + ); + }), ); }