Merge pull request #1409 from mendableai/feat/crawl-scrape-limit-notification

feat(queue-jobs): add function to determine job type and update notification logic for concurrency limits
This commit is contained in:
Ademílson Tonato 2025-04-03 18:29:00 +01:00 committed by GitHub
commit b57d5f2c4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -14,6 +14,17 @@ import { getConcurrencyLimitMax } from "./rate-limiter";
import { sendNotificationWithCustomDays } from './notification/email_notification';
import { shouldSendConcurrencyLimitNotification } from './notification/notification-check';
/**
* Checks if a job is a crawl or batch scrape based on its options
* @param options The job options containing crawlerOptions and crawl_id
* @returns true if the job is either a crawl or batch scrape
*/
function isCrawlOrBatchScrape(options: { crawlerOptions?: any; crawl_id?: string }): boolean {
// If crawlerOptions exists, it's a crawl
// If crawl_id exists but no crawlerOptions, it's a batch scrape
return !!options.crawlerOptions || !!options.crawl_id;
}
async function _addScrapeJobToConcurrencyQueue(
webScraperOptions: any,
options: any,
@ -83,11 +94,14 @@ async function addScrapeJobRaw(
if(concurrencyQueueJobs > maxConcurrency) {
logger.info("Concurrency limited 2x (single) - ", "Concurrency queue jobs: ", concurrencyQueueJobs, "Max concurrency: ", maxConcurrency, "Team ID: ", webScraperOptions.team_id);
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(webScraperOptions.team_id);
if (shouldSendNotification) {
sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
logger.error("Error sending notification (concurrency limit reached): ", error);
});
// Only send notification if it's not a crawl or batch scrape
if (!isCrawlOrBatchScrape(webScraperOptions)) {
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(webScraperOptions.team_id);
if (shouldSendNotification) {
sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
logger.error("Error sending notification (concurrency limit reached): ", error);
});
}
}
}
@ -178,11 +192,14 @@ export async function addScrapeJobs(
if(addToCQ.length > maxConcurrency) {
logger.info("Concurrency limited 2x (multiple) - ", "Concurrency queue jobs: ", addToCQ.length, "Max concurrency: ", maxConcurrency, "Team ID: ", jobs[0].data.team_id);
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(jobs[0].data.team_id);
if (shouldSendNotification) {
sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
logger.error("Error sending notification (concurrency limit reached): ", error);
});
// Only send notification if it's not a crawl or batch scrape
if (!isCrawlOrBatchScrape(jobs[0].data)) {
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(jobs[0].data.team_id);
if (shouldSendNotification) {
sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
logger.error("Error sending notification (concurrency limit reached): ", error);
});
}
}
}