mirror of
https://git.mirrors.martin98.com/https://github.com/mendableai/firecrawl
synced 2025-08-12 21:18:59 +08:00
feat(queue-jobs): implement conditional notification for concurrency limits based on team subscription status
This commit is contained in:
parent
58e587d99e
commit
7468464552
9
apps/api/src/services/notification/notification-check.ts
Normal file
9
apps/api/src/services/notification/notification-check.ts
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
import { isEnterpriseTeamCreatedAfterRateLimitChange } from "../subscription/enterprise-check";
|
||||||
|
|
||||||
|
export async function shouldSendConcurrencyLimitNotification(
|
||||||
|
team_id: string,
|
||||||
|
): Promise<boolean> {
|
||||||
|
const isEnterprise =
|
||||||
|
await isEnterpriseTeamCreatedAfterRateLimitChange(team_id);
|
||||||
|
return !isEnterprise;
|
||||||
|
}
|
@ -12,6 +12,7 @@ import {
|
|||||||
import { logger } from "../lib/logger";
|
import { logger } from "../lib/logger";
|
||||||
import { getConcurrencyLimitMax } from "./rate-limiter";
|
import { getConcurrencyLimitMax } from "./rate-limiter";
|
||||||
import { sendNotificationWithCustomDays } from './notification/email_notification';
|
import { sendNotificationWithCustomDays } from './notification/email_notification';
|
||||||
|
import { shouldSendConcurrencyLimitNotification } from './notification/notification-check';
|
||||||
|
|
||||||
async function _addScrapeJobToConcurrencyQueue(
|
async function _addScrapeJobToConcurrencyQueue(
|
||||||
webScraperOptions: any,
|
webScraperOptions: any,
|
||||||
@ -81,9 +82,13 @@ async function addScrapeJobRaw(
|
|||||||
// No need to 2x as if there are more than the max concurrency in the concurrency queue, it is already 2x
|
// No need to 2x as if there are more than the max concurrency in the concurrency queue, it is already 2x
|
||||||
if(concurrencyQueueJobs > maxConcurrency) {
|
if(concurrencyQueueJobs > maxConcurrency) {
|
||||||
logger.info("Concurrency limited 2x (single) - ", "Concurrency queue jobs: ", concurrencyQueueJobs, "Max concurrency: ", maxConcurrency, "Team ID: ", webScraperOptions.team_id);
|
logger.info("Concurrency limited 2x (single) - ", "Concurrency queue jobs: ", concurrencyQueueJobs, "Max concurrency: ", maxConcurrency, "Team ID: ", webScraperOptions.team_id);
|
||||||
sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
|
|
||||||
logger.error("Error sending notification (concurrency limit reached): ", error);
|
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(webScraperOptions.team_id);
|
||||||
});
|
if (shouldSendNotification) {
|
||||||
|
sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
|
||||||
|
logger.error("Error sending notification (concurrency limit reached): ", error);
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
webScraperOptions.concurrencyLimited = true;
|
webScraperOptions.concurrencyLimited = true;
|
||||||
@ -172,9 +177,13 @@ export async function addScrapeJobs(
|
|||||||
// equals 2x the max concurrency
|
// equals 2x the max concurrency
|
||||||
if(addToCQ.length > maxConcurrency) {
|
if(addToCQ.length > maxConcurrency) {
|
||||||
logger.info("Concurrency limited 2x (multiple) - ", "Concurrency queue jobs: ", addToCQ.length, "Max concurrency: ", maxConcurrency, "Team ID: ", jobs[0].data.team_id);
|
logger.info("Concurrency limited 2x (multiple) - ", "Concurrency queue jobs: ", addToCQ.length, "Max concurrency: ", maxConcurrency, "Team ID: ", jobs[0].data.team_id);
|
||||||
sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
|
|
||||||
logger.error("Error sending notification (concurrency limit reached): ", error);
|
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(jobs[0].data.team_id);
|
||||||
});
|
if (shouldSendNotification) {
|
||||||
|
sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
|
||||||
|
logger.error("Error sending notification (concurrency limit reached): ", error);
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
await Promise.all(
|
await Promise.all(
|
||||||
|
39
apps/api/src/services/subscription/enterprise-check.ts
Normal file
39
apps/api/src/services/subscription/enterprise-check.ts
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
import { supabase_service } from "../supabase";
|
||||||
|
|
||||||
|
interface SubscriptionResponse {
|
||||||
|
prices: {
|
||||||
|
products: {
|
||||||
|
is_enterprise: boolean;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const RATE_LIMIT_CHANGE_NOTIFICATION_START_DATE = new Date("2025-03-12");
|
||||||
|
|
||||||
|
export async function isEnterpriseTeamCreatedAfterRateLimitChange(
|
||||||
|
team_id: string,
|
||||||
|
): Promise<boolean> {
|
||||||
|
const { data, error } = (await supabase_service
|
||||||
|
.from("subscriptions")
|
||||||
|
.select("prices(products(is_enterprise))")
|
||||||
|
.eq("status", "active")
|
||||||
|
.eq("team_id", team_id)
|
||||||
|
.gt(
|
||||||
|
"created",
|
||||||
|
RATE_LIMIT_CHANGE_NOTIFICATION_START_DATE.toISOString(),
|
||||||
|
)) as {
|
||||||
|
data: SubscriptionResponse[] | null;
|
||||||
|
error: any;
|
||||||
|
};
|
||||||
|
|
||||||
|
if (error || !data) {
|
||||||
|
// If there's an error or no subscription found, assume non-enterprise
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const isEnterprise = data.find(
|
||||||
|
(sub) => sub.prices?.products?.is_enterprise === true,
|
||||||
|
);
|
||||||
|
|
||||||
|
return !!isEnterprise;
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user