From 58e587d99e8e1134699181a060835f1a372c4d64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adem=C3=ADlson=20F=2E=20Tonato?= Date: Mon, 31 Mar 2025 13:27:36 +0100 Subject: [PATCH 1/2] feat(queue-jobs): update notification logic for concurrency limits and add parameter (jsdocs) to batchScrapeUrls --- apps/api/src/services/queue-jobs.ts | 15 ++++++++------- apps/js-sdk/firecrawl/src/index.ts | 1 + 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 1ce2211c..d2601d30 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -1,6 +1,6 @@ import { getScrapeQueue } from "./queue-service"; import { v4 as uuidv4 } from "uuid"; -import { PlanType, WebScraperOptions } from "../types"; +import { NotificationType, PlanType, WebScraperOptions } from "../types"; import * as Sentry from "@sentry/node"; import { cleanOldConcurrencyLimitEntries, @@ -11,6 +11,7 @@ import { } from "../lib/concurrency-limit"; import { logger } from "../lib/logger"; import { getConcurrencyLimitMax } from "./rate-limiter"; +import { sendNotificationWithCustomDays } from './notification/email_notification'; async function _addScrapeJobToConcurrencyQueue( webScraperOptions: any, @@ -80,9 +81,9 @@ async function addScrapeJobRaw( // No need to 2x as if there are more than the max concurrency in the concurrency queue, it is already 2x if(concurrencyQueueJobs > maxConcurrency) { logger.info("Concurrency limited 2x (single) - ", "Concurrency queue jobs: ", concurrencyQueueJobs, "Max concurrency: ", maxConcurrency, "Team ID: ", webScraperOptions.team_id); - // sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 10, false).catch((error) => { - // logger.error("Error sending notification (concurrency limit reached): ", error); - // }); + sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { + logger.error("Error sending notification (concurrency limit reached): ", error); + }); } webScraperOptions.concurrencyLimited = true; @@ -171,9 +172,9 @@ export async function addScrapeJobs( // equals 2x the max concurrency if(addToCQ.length > maxConcurrency) { logger.info("Concurrency limited 2x (multiple) - ", "Concurrency queue jobs: ", addToCQ.length, "Max concurrency: ", maxConcurrency, "Team ID: ", jobs[0].data.team_id); - // sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 10, false).catch((error) => { - // logger.error("Error sending notification (concurrency limit reached): ", error); - // }); + sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { + logger.error("Error sending notification (concurrency limit reached): ", error); + }); } await Promise.all( diff --git a/apps/js-sdk/firecrawl/src/index.ts b/apps/js-sdk/firecrawl/src/index.ts index 11fb8d74..41d13da0 100644 --- a/apps/js-sdk/firecrawl/src/index.ts +++ b/apps/js-sdk/firecrawl/src/index.ts @@ -922,6 +922,7 @@ export default class FirecrawlApp { * @param pollInterval - Time in seconds for job status checks. * @param idempotencyKey - Optional idempotency key for the request. * @param webhook - Optional webhook for the batch scrape. + * @param ignoreInvalidURLs - Optional flag to ignore invalid URLs. * @returns The response from the crawl operation. */ async batchScrapeUrls( From 74684645521fec0ea0a26655715d02b87bc85661 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adem=C3=ADlson=20F=2E=20Tonato?= Date: Tue, 1 Apr 2025 19:50:26 +0100 Subject: [PATCH 2/2] feat(queue-jobs): implement conditional notification for concurrency limits based on team subscription status --- .../notification/notification-check.ts | 9 +++++ apps/api/src/services/queue-jobs.ts | 21 +++++++--- .../services/subscription/enterprise-check.ts | 39 +++++++++++++++++++ 3 files changed, 63 insertions(+), 6 deletions(-) create mode 100644 apps/api/src/services/notification/notification-check.ts create mode 100644 apps/api/src/services/subscription/enterprise-check.ts diff --git a/apps/api/src/services/notification/notification-check.ts b/apps/api/src/services/notification/notification-check.ts new file mode 100644 index 00000000..86f3eada --- /dev/null +++ b/apps/api/src/services/notification/notification-check.ts @@ -0,0 +1,9 @@ +import { isEnterpriseTeamCreatedAfterRateLimitChange } from "../subscription/enterprise-check"; + +export async function shouldSendConcurrencyLimitNotification( + team_id: string, +): Promise { + const isEnterprise = + await isEnterpriseTeamCreatedAfterRateLimitChange(team_id); + return !isEnterprise; +} diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index d2601d30..535f51b5 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -12,6 +12,7 @@ import { import { logger } from "../lib/logger"; import { getConcurrencyLimitMax } from "./rate-limiter"; import { sendNotificationWithCustomDays } from './notification/email_notification'; +import { shouldSendConcurrencyLimitNotification } from './notification/notification-check'; async function _addScrapeJobToConcurrencyQueue( webScraperOptions: any, @@ -81,9 +82,13 @@ async function addScrapeJobRaw( // No need to 2x as if there are more than the max concurrency in the concurrency queue, it is already 2x if(concurrencyQueueJobs > maxConcurrency) { logger.info("Concurrency limited 2x (single) - ", "Concurrency queue jobs: ", concurrencyQueueJobs, "Max concurrency: ", maxConcurrency, "Team ID: ", webScraperOptions.team_id); - sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { - logger.error("Error sending notification (concurrency limit reached): ", error); - }); + + const shouldSendNotification = await shouldSendConcurrencyLimitNotification(webScraperOptions.team_id); + if (shouldSendNotification) { + sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { + logger.error("Error sending notification (concurrency limit reached): ", error); + }); + } } webScraperOptions.concurrencyLimited = true; @@ -172,9 +177,13 @@ export async function addScrapeJobs( // equals 2x the max concurrency if(addToCQ.length > maxConcurrency) { logger.info("Concurrency limited 2x (multiple) - ", "Concurrency queue jobs: ", addToCQ.length, "Max concurrency: ", maxConcurrency, "Team ID: ", jobs[0].data.team_id); - sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { - logger.error("Error sending notification (concurrency limit reached): ", error); - }); + + const shouldSendNotification = await shouldSendConcurrencyLimitNotification(jobs[0].data.team_id); + if (shouldSendNotification) { + sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { + logger.error("Error sending notification (concurrency limit reached): ", error); + }); + } } await Promise.all( diff --git a/apps/api/src/services/subscription/enterprise-check.ts b/apps/api/src/services/subscription/enterprise-check.ts new file mode 100644 index 00000000..eb966a8d --- /dev/null +++ b/apps/api/src/services/subscription/enterprise-check.ts @@ -0,0 +1,39 @@ +import { supabase_service } from "../supabase"; + +interface SubscriptionResponse { + prices: { + products: { + is_enterprise: boolean; + }; + }; +} + +const RATE_LIMIT_CHANGE_NOTIFICATION_START_DATE = new Date("2025-03-12"); + +export async function isEnterpriseTeamCreatedAfterRateLimitChange( + team_id: string, +): Promise { + const { data, error } = (await supabase_service + .from("subscriptions") + .select("prices(products(is_enterprise))") + .eq("status", "active") + .eq("team_id", team_id) + .gt( + "created", + RATE_LIMIT_CHANGE_NOTIFICATION_START_DATE.toISOString(), + )) as { + data: SubscriptionResponse[] | null; + error: any; + }; + + if (error || !data) { + // If there's an error or no subscription found, assume non-enterprise + return false; + } + + const isEnterprise = data.find( + (sub) => sub.prices?.products?.is_enterprise === true, + ); + + return !!isEnterprise; +}