From 426151c9c97394b3e752a75b85adddb3f8dda0fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adem=C3=ADlson=20F=2E=20Tonato?= Date: Thu, 3 Apr 2025 17:02:51 +0100 Subject: [PATCH] feat(queue-jobs): add function to determine job type and update notification logic for concurrency limits --- apps/api/src/services/queue-jobs.ts | 37 +++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 535f51b5..1b4530b9 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -14,6 +14,17 @@ import { getConcurrencyLimitMax } from "./rate-limiter"; import { sendNotificationWithCustomDays } from './notification/email_notification'; import { shouldSendConcurrencyLimitNotification } from './notification/notification-check'; +/** + * Checks if a job is a crawl or batch scrape based on its options + * @param options The job options containing crawlerOptions and crawl_id + * @returns true if the job is either a crawl or batch scrape + */ +function isCrawlOrBatchScrape(options: { crawlerOptions?: any; crawl_id?: string }): boolean { + // If crawlerOptions exists, it's a crawl + // If crawl_id exists but no crawlerOptions, it's a batch scrape + return !!options.crawlerOptions || !!options.crawl_id; +} + async function _addScrapeJobToConcurrencyQueue( webScraperOptions: any, options: any, @@ -83,11 +94,14 @@ async function addScrapeJobRaw( if(concurrencyQueueJobs > maxConcurrency) { logger.info("Concurrency limited 2x (single) - ", "Concurrency queue jobs: ", concurrencyQueueJobs, "Max concurrency: ", maxConcurrency, "Team ID: ", webScraperOptions.team_id); - const shouldSendNotification = await shouldSendConcurrencyLimitNotification(webScraperOptions.team_id); - if (shouldSendNotification) { - sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { - logger.error("Error sending notification (concurrency limit reached): ", error); - }); + // Only send notification if it's not a crawl or batch scrape + if (!isCrawlOrBatchScrape(webScraperOptions)) { + const shouldSendNotification = await shouldSendConcurrencyLimitNotification(webScraperOptions.team_id); + if (shouldSendNotification) { + sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { + logger.error("Error sending notification (concurrency limit reached): ", error); + }); + } } } @@ -178,11 +192,14 @@ export async function addScrapeJobs( if(addToCQ.length > maxConcurrency) { logger.info("Concurrency limited 2x (multiple) - ", "Concurrency queue jobs: ", addToCQ.length, "Max concurrency: ", maxConcurrency, "Team ID: ", jobs[0].data.team_id); - const shouldSendNotification = await shouldSendConcurrencyLimitNotification(jobs[0].data.team_id); - if (shouldSendNotification) { - sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { - logger.error("Error sending notification (concurrency limit reached): ", error); - }); + // Only send notification if it's not a crawl or batch scrape + if (!isCrawlOrBatchScrape(jobs[0].data)) { + const shouldSendNotification = await shouldSendConcurrencyLimitNotification(jobs[0].data.team_id); + if (shouldSendNotification) { + sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { + logger.error("Error sending notification (concurrency limit reached): ", error); + }); + } } }