Merge pull request #1398 from mendableai/refactor/email-concurrency-limit-reached

feat(queue-jobs): update notification logic for concurrency limits and add parameter (jsdocs) to batchScrapeUrls
This commit is contained in:
Ademílson Tonato 2025-04-02 11:18:18 +01:00 committed by GitHub
commit 73a297d6c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 66 additions and 7 deletions

View File

@ -0,0 +1,9 @@
import { isEnterpriseTeamCreatedAfterRateLimitChange } from "../subscription/enterprise-check";
export async function shouldSendConcurrencyLimitNotification(
team_id: string,
): Promise<boolean> {
const isEnterprise =
await isEnterpriseTeamCreatedAfterRateLimitChange(team_id);
return !isEnterprise;
}

View File

@ -1,6 +1,6 @@
import { getScrapeQueue } from "./queue-service";
import { v4 as uuidv4 } from "uuid";
import { PlanType, WebScraperOptions } from "../types";
import { NotificationType, PlanType, WebScraperOptions } from "../types";
import * as Sentry from "@sentry/node";
import {
cleanOldConcurrencyLimitEntries,
@ -11,6 +11,8 @@ import {
} from "../lib/concurrency-limit";
import { logger } from "../lib/logger";
import { getConcurrencyLimitMax } from "./rate-limiter";
import { sendNotificationWithCustomDays } from './notification/email_notification';
import { shouldSendConcurrencyLimitNotification } from './notification/notification-check';
async function _addScrapeJobToConcurrencyQueue(
webScraperOptions: any,
@ -80,9 +82,13 @@ async function addScrapeJobRaw(
// No need to 2x as if there are more than the max concurrency in the concurrency queue, it is already 2x
if(concurrencyQueueJobs > maxConcurrency) {
logger.info("Concurrency limited 2x (single) - ", "Concurrency queue jobs: ", concurrencyQueueJobs, "Max concurrency: ", maxConcurrency, "Team ID: ", webScraperOptions.team_id);
// sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 10, false).catch((error) => {
// logger.error("Error sending notification (concurrency limit reached): ", error);
// });
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(webScraperOptions.team_id);
if (shouldSendNotification) {
sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
logger.error("Error sending notification (concurrency limit reached): ", error);
});
}
}
webScraperOptions.concurrencyLimited = true;
@ -171,9 +177,13 @@ export async function addScrapeJobs(
// equals 2x the max concurrency
if(addToCQ.length > maxConcurrency) {
logger.info("Concurrency limited 2x (multiple) - ", "Concurrency queue jobs: ", addToCQ.length, "Max concurrency: ", maxConcurrency, "Team ID: ", jobs[0].data.team_id);
// sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 10, false).catch((error) => {
// logger.error("Error sending notification (concurrency limit reached): ", error);
// });
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(jobs[0].data.team_id);
if (shouldSendNotification) {
sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
logger.error("Error sending notification (concurrency limit reached): ", error);
});
}
}
await Promise.all(

View File

@ -0,0 +1,39 @@
import { supabase_service } from "../supabase";
interface SubscriptionResponse {
prices: {
products: {
is_enterprise: boolean;
};
};
}
const RATE_LIMIT_CHANGE_NOTIFICATION_START_DATE = new Date("2025-03-12");
export async function isEnterpriseTeamCreatedAfterRateLimitChange(
team_id: string,
): Promise<boolean> {
const { data, error } = (await supabase_service
.from("subscriptions")
.select("prices(products(is_enterprise))")
.eq("status", "active")
.eq("team_id", team_id)
.gt(
"created",
RATE_LIMIT_CHANGE_NOTIFICATION_START_DATE.toISOString(),
)) as {
data: SubscriptionResponse[] | null;
error: any;
};
if (error || !data) {
// If there's an error or no subscription found, assume non-enterprise
return false;
}
const isEnterprise = data.find(
(sub) => sub.prices?.products?.is_enterprise === true,
);
return !!isEnterprise;
}

View File

@ -922,6 +922,7 @@ export default class FirecrawlApp {
* @param pollInterval - Time in seconds for job status checks.
* @param idempotencyKey - Optional idempotency key for the request.
* @param webhook - Optional webhook for the batch scrape.
* @param ignoreInvalidURLs - Optional flag to ignore invalid URLs.
* @returns The response from the crawl operation.
*/
async batchScrapeUrls(