(feat/conc) Move fully to a concurrency limit system (#1045)

* Nick: conc limits init

* Nick: test suite plans

* fix(email_notification): move expiry check to redis

* fix(email_notification): add db check in case redis resets

* Update rate-limiter.ts

* Update queue-jobs.ts

* Create concurrency-limit.test.ts

* Update concurrency-limit.test.ts

* Create queue-concurrency-integration.test.ts

* Update queue-concurrency-integration.test.ts

---------

Co-authored-by: Móricz Gergő <mo.geryy@gmail.com>
This commit is contained in:
Nicolas 2025-01-30 13:47:29 -03:00 committed by GitHub
parent e2c3932dd2
commit 7c0b3ad098
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 764 additions and 24 deletions

View File

@ -0,0 +1,239 @@
import { redisConnection } from "../services/queue-service";
import {
cleanOldConcurrencyLimitEntries,
getConcurrencyLimitActiveJobs,
pushConcurrencyLimitActiveJob,
removeConcurrencyLimitActiveJob,
takeConcurrencyLimitedJob,
pushConcurrencyLimitedJob,
getConcurrencyQueueJobsCount,
ConcurrencyLimitedJob,
} from "../lib/concurrency-limit";
import { CONCURRENCY_LIMIT, getConcurrencyLimitMax } from "../services/rate-limiter";
import { PlanType } from "../types";
// Mock Redis client
jest.mock("../services/queue-service", () => ({
redisConnection: {
zremrangebyscore: jest.fn(),
zrangebyscore: jest.fn(),
zadd: jest.fn(),
zrem: jest.fn(),
zmpop: jest.fn(),
zcard: jest.fn(),
},
}));
describe("Concurrency Limit", () => {
const mockTeamId = "test-team-id";
const mockJobId = "test-job-id";
const mockNow = 1000000;
beforeEach(() => {
jest.clearAllMocks();
});
describe("cleanOldConcurrencyLimitEntries", () => {
it("should remove entries older than current timestamp", async () => {
await cleanOldConcurrencyLimitEntries(mockTeamId, mockNow);
expect(redisConnection.zremrangebyscore).toHaveBeenCalledWith(
"concurrency-limiter:test-team-id",
-Infinity,
mockNow
);
});
});
describe("getConcurrencyLimitActiveJobs", () => {
it("should return active jobs after given timestamp", async () => {
const mockActiveJobs = ["job1", "job2"];
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue(mockActiveJobs);
const result = await getConcurrencyLimitActiveJobs(mockTeamId, mockNow);
expect(result).toEqual(mockActiveJobs);
expect(redisConnection.zrangebyscore).toHaveBeenCalledWith(
"concurrency-limiter:test-team-id",
mockNow,
Infinity
);
});
it("should return empty array when no active jobs", async () => {
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue([]);
const result = await getConcurrencyLimitActiveJobs(mockTeamId, mockNow);
expect(result).toEqual([]);
});
});
describe("pushConcurrencyLimitActiveJob", () => {
it("should add job with expiration timestamp", async () => {
await pushConcurrencyLimitActiveJob(mockTeamId, mockJobId, mockNow);
expect(redisConnection.zadd).toHaveBeenCalledWith(
"concurrency-limiter:test-team-id",
mockNow + 2 * 60 * 1000, // stalledJobTimeoutMs
mockJobId
);
});
});
describe("removeConcurrencyLimitActiveJob", () => {
it("should remove job from active jobs", async () => {
await removeConcurrencyLimitActiveJob(mockTeamId, mockJobId);
expect(redisConnection.zrem).toHaveBeenCalledWith(
"concurrency-limiter:test-team-id",
mockJobId
);
});
});
describe("Queue Operations", () => {
const mockJob: ConcurrencyLimitedJob = {
id: mockJobId,
data: { test: "data" },
opts: {},
priority: 1,
};
describe("takeConcurrencyLimitedJob", () => {
it("should return null when queue is empty", async () => {
(redisConnection.zmpop as jest.Mock).mockResolvedValue(null);
const result = await takeConcurrencyLimitedJob(mockTeamId);
expect(result).toBeNull();
});
it("should return and remove the highest priority job", async () => {
(redisConnection.zmpop as jest.Mock).mockResolvedValue([
"key",
[[JSON.stringify(mockJob)]],
]);
const result = await takeConcurrencyLimitedJob(mockTeamId);
expect(result).toEqual(mockJob);
expect(redisConnection.zmpop).toHaveBeenCalledWith(
1,
"concurrency-limit-queue:test-team-id",
"MIN"
);
});
});
describe("pushConcurrencyLimitedJob", () => {
it("should add job to queue with priority", async () => {
await pushConcurrencyLimitedJob(mockTeamId, mockJob);
expect(redisConnection.zadd).toHaveBeenCalledWith(
"concurrency-limit-queue:test-team-id",
mockJob.priority,
JSON.stringify(mockJob)
);
});
it("should use default priority 1 when not specified", async () => {
const jobWithoutPriority = { ...mockJob };
delete jobWithoutPriority.priority;
await pushConcurrencyLimitedJob(mockTeamId, jobWithoutPriority);
expect(redisConnection.zadd).toHaveBeenCalledWith(
"concurrency-limit-queue:test-team-id",
1,
JSON.stringify(jobWithoutPriority)
);
});
});
describe("getConcurrencyQueueJobsCount", () => {
it("should return the number of jobs in queue", async () => {
const mockCount = 5;
(redisConnection.zcard as jest.Mock).mockResolvedValue(mockCount);
const result = await getConcurrencyQueueJobsCount(mockTeamId);
expect(result).toBe(mockCount);
expect(redisConnection.zcard).toHaveBeenCalledWith(
"concurrency-limit-queue:test-team-id"
);
});
it("should return 0 for empty queue", async () => {
(redisConnection.zcard as jest.Mock).mockResolvedValue(0);
const result = await getConcurrencyQueueJobsCount(mockTeamId);
expect(result).toBe(0);
});
});
});
describe("getConcurrencyLimitMax", () => {
it("should return correct limit for free plan", () => {
const result = getConcurrencyLimitMax("free");
expect(result).toBe(2);
});
it("should return correct limit for standard plan", () => {
const result = getConcurrencyLimitMax("standard");
expect(result).toBe(CONCURRENCY_LIMIT.standard);
});
it("should return correct limit for scale plan", () => {
const result = getConcurrencyLimitMax("scale");
expect(result).toBe(CONCURRENCY_LIMIT.scale);
});
it("should return default limit for unknown plan", () => {
const result = getConcurrencyLimitMax("unknown" as PlanType);
expect(result).toBe(10);
});
it("should handle special team IDs", () => {
process.env.DEV_B_TEAM_ID = "dev-b-team";
const result = getConcurrencyLimitMax("free", "dev-b-team");
expect(result).toBe(120);
});
});
describe("Integration Scenarios", () => {
it("should handle complete job lifecycle", async () => {
const mockJob: ConcurrencyLimitedJob = {
id: "lifecycle-test",
data: { test: "lifecycle" },
opts: {},
};
// Push job to queue
await pushConcurrencyLimitedJob(mockTeamId, mockJob);
expect(redisConnection.zadd).toHaveBeenCalled();
// Take job from queue
(redisConnection.zmpop as jest.Mock).mockResolvedValue([
"key",
[[JSON.stringify(mockJob)]],
]);
const takenJob = await takeConcurrencyLimitedJob(mockTeamId);
expect(takenJob).toEqual(mockJob);
// Add to active jobs
await pushConcurrencyLimitActiveJob(mockTeamId, mockJob.id, mockNow);
expect(redisConnection.zadd).toHaveBeenCalled();
// Verify active jobs
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue([mockJob.id]);
const activeJobs = await getConcurrencyLimitActiveJobs(mockTeamId, mockNow);
expect(activeJobs).toContain(mockJob.id);
// Remove from active jobs
await removeConcurrencyLimitActiveJob(mockTeamId, mockJob.id);
expect(redisConnection.zrem).toHaveBeenCalled();
});
});
});

View File

@ -0,0 +1,272 @@
import { redisConnection } from "../services/queue-service";
import { addScrapeJob, addScrapeJobs } from "../services/queue-jobs";
import {
cleanOldConcurrencyLimitEntries,
pushConcurrencyLimitActiveJob,
takeConcurrencyLimitedJob,
removeConcurrencyLimitActiveJob,
} from "../lib/concurrency-limit";
import { getConcurrencyLimitMax } from "../services/rate-limiter";
import { WebScraperOptions, PlanType } from "../types";
// Mock all the dependencies
const mockAdd = jest.fn();
jest.mock("../services/queue-service", () => ({
redisConnection: {
zremrangebyscore: jest.fn(),
zrangebyscore: jest.fn(),
zadd: jest.fn(),
zrem: jest.fn(),
zmpop: jest.fn(),
zcard: jest.fn(),
smembers: jest.fn(),
},
getScrapeQueue: jest.fn(() => ({
add: mockAdd,
})),
}));
jest.mock("uuid", () => ({
v4: jest.fn(() => "mock-uuid"),
}));
describe("Queue Concurrency Integration", () => {
const mockTeamId = "test-team-id";
const mockPlan = "standard" as PlanType;
const mockNow = Date.now();
const defaultScrapeOptions = {
formats: ["markdown"] as (
| "markdown"
| "html"
| "rawHtml"
| "links"
| "screenshot"
| "screenshot@fullPage"
| "extract"
| "json"
)[],
onlyMainContent: true,
waitFor: 0,
mobile: false,
parsePDF: false,
timeout: 30000,
extract: {
mode: "llm" as const,
systemPrompt: "test",
schema: {},
},
extractOptions: { mode: "llm" as const, systemPrompt: "test" },
proxy: false,
proxyCountry: "us",
proxyOptions: {},
javascript: true,
headers: {},
cookies: [],
blockResources: true,
skipTlsVerification: false,
removeBase64Images: true,
fastMode: false,
blockAds: true,
};
beforeEach(() => {
jest.clearAllMocks();
jest.spyOn(Date, "now").mockImplementation(() => mockNow);
});
describe("Single Job Addition", () => {
const mockWebScraperOptions: WebScraperOptions = {
url: "https://test.com",
mode: "single_urls",
team_id: mockTeamId,
plan: mockPlan,
scrapeOptions: defaultScrapeOptions,
crawlerOptions: null,
};
it("should add job directly to BullMQ when under concurrency limit", async () => {
// Mock current active jobs to be under limit
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue([]);
await addScrapeJob(mockWebScraperOptions);
// Should have checked concurrency
expect(redisConnection.zrangebyscore).toHaveBeenCalled();
// Should have added to BullMQ
expect(mockAdd).toHaveBeenCalled();
// Should have added to active jobs
expect(redisConnection.zadd).toHaveBeenCalledWith(
expect.stringContaining("concurrency-limiter"),
expect.any(Number),
expect.any(String),
);
});
it("should add job to concurrency queue when at concurrency limit", async () => {
// Mock current active jobs to be at limit
const maxConcurrency = getConcurrencyLimitMax(mockPlan);
const activeJobs = Array(maxConcurrency).fill("active-job");
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue(
activeJobs,
);
await addScrapeJob(mockWebScraperOptions);
// Should have checked concurrency
expect(redisConnection.zrangebyscore).toHaveBeenCalled();
// Should NOT have added to BullMQ
expect(mockAdd).not.toHaveBeenCalled();
// Should have added to concurrency queue
expect(redisConnection.zadd).toHaveBeenCalledWith(
expect.stringContaining("concurrency-limit-queue"),
expect.any(Number),
expect.stringContaining("mock-uuid"),
);
});
});
describe("Batch Job Addition", () => {
const createMockJobs = (count: number) =>
Array(count)
.fill(null)
.map((_, i) => ({
data: {
url: `https://test${i}.com`,
mode: "single_urls",
team_id: mockTeamId,
plan: mockPlan,
scrapeOptions: defaultScrapeOptions,
} as WebScraperOptions,
opts: {
jobId: `job-${i}`,
priority: 1,
},
}));
it("should handle batch jobs respecting concurrency limits", async () => {
const maxConcurrency = getConcurrencyLimitMax(mockPlan);
const totalJobs = maxConcurrency + 5; // Some jobs should go to queue
const mockJobs = createMockJobs(totalJobs);
// Mock current active jobs to be empty
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValue([]);
await addScrapeJobs(mockJobs);
// Should have added maxConcurrency jobs to BullMQ
expect(mockAdd).toHaveBeenCalledTimes(maxConcurrency);
// Should have added remaining jobs to concurrency queue
expect(redisConnection.zadd).toHaveBeenCalledWith(
expect.stringContaining("concurrency-limit-queue"),
expect.any(Number),
expect.any(String),
);
});
it("should handle empty job array", async () => {
const result = await addScrapeJobs([]);
expect(result).toBe(true);
expect(mockAdd).not.toHaveBeenCalled();
expect(redisConnection.zadd).not.toHaveBeenCalled();
});
});
describe("Queue Worker Integration", () => {
it("should process next queued job when active job completes", async () => {
const mockJob = {
id: "test-job",
data: {
team_id: mockTeamId,
plan: mockPlan,
},
};
// Mock a queued job
const queuedJob = {
id: "queued-job",
data: { test: "data" },
opts: {},
};
(redisConnection.zmpop as jest.Mock).mockResolvedValueOnce([
"key",
[[JSON.stringify(queuedJob)]],
]);
// Simulate job completion in worker
await removeConcurrencyLimitActiveJob(mockTeamId, mockJob.id);
await cleanOldConcurrencyLimitEntries(mockTeamId);
const nextJob = await takeConcurrencyLimitedJob(mockTeamId);
// Should have taken next job from queue
expect(nextJob).toEqual(queuedJob);
// Should have added new job to active jobs
await pushConcurrencyLimitActiveJob(mockTeamId, nextJob!.id);
expect(redisConnection.zadd).toHaveBeenCalledWith(
expect.stringContaining("concurrency-limiter"),
expect.any(Number),
nextJob!.id,
);
});
it("should handle job failure and cleanup", async () => {
const mockJob = {
id: "failing-job",
data: {
team_id: mockTeamId,
plan: mockPlan,
},
};
// Add job to active jobs
await pushConcurrencyLimitActiveJob(mockTeamId, mockJob.id);
// Simulate job failure and cleanup
await removeConcurrencyLimitActiveJob(mockTeamId, mockJob.id);
await cleanOldConcurrencyLimitEntries(mockTeamId);
// Verify job was removed from active jobs
expect(redisConnection.zrem).toHaveBeenCalledWith(
expect.stringContaining("concurrency-limiter"),
mockJob.id,
);
});
});
describe("Edge Cases", () => {
it("should handle stalled jobs cleanup", async () => {
const stalledTime = mockNow - 3 * 60 * 1000; // 3 minutes ago
// Mock stalled jobs in Redis
(redisConnection.zrangebyscore as jest.Mock).mockResolvedValueOnce([
"stalled-job",
]);
await cleanOldConcurrencyLimitEntries(mockTeamId, mockNow);
// Should have cleaned up stalled jobs
expect(redisConnection.zremrangebyscore).toHaveBeenCalledWith(
expect.stringContaining("concurrency-limiter"),
-Infinity,
mockNow,
);
});
it("should handle race conditions in job queue processing", async () => {
// Mock a race condition where job is taken by another worker
(redisConnection.zmpop as jest.Mock).mockResolvedValueOnce(null);
const nextJob = await takeConcurrencyLimitedJob(mockTeamId);
// Should handle gracefully when no job is available
expect(nextJob).toBeNull();
});
});
});

View File

@ -1,5 +1,5 @@
import { parseApi } from "../lib/parseApi"; import { parseApi } from "../lib/parseApi";
import { getRateLimiter } from "../services/rate-limiter"; import { getRateLimiter, isTestSuiteToken } from "../services/rate-limiter";
import { import {
AuthResponse, AuthResponse,
NotificationType, NotificationType,
@ -345,6 +345,16 @@ export async function supaAuthenticateUser(
// return { success: false, error: "Unauthorized: Invalid token", status: 401 }; // return { success: false, error: "Unauthorized: Invalid token", status: 401 };
} }
if (token && isTestSuiteToken(token)) {
return {
success: true,
team_id: teamId ?? undefined,
// Now we have a test suite plan
plan: "testSuite",
chunk
};
}
return { return {
success: true, success: true,
team_id: teamId ?? undefined, team_id: teamId ?? undefined,

View File

@ -4,9 +4,11 @@ import {
ConcurrencyCheckResponse, ConcurrencyCheckResponse,
RequestWithAuth, RequestWithAuth,
} from "./types"; } from "./types";
import { RateLimiterMode } from "../../types"; import { RateLimiterMode, PlanType } from "../../types";
import { Response } from "express"; import { Response } from "express";
import { redisConnection } from "../../services/queue-service"; import { redisConnection } from "../../services/queue-service";
import { getConcurrencyLimitMax } from "../../services/rate-limiter";
// Basically just middleware and error wrapping // Basically just middleware and error wrapping
export async function concurrencyCheckController( export async function concurrencyCheckController(
req: RequestWithAuth<ConcurrencyCheckParams, undefined, undefined>, req: RequestWithAuth<ConcurrencyCheckParams, undefined, undefined>,
@ -19,7 +21,15 @@ export async function concurrencyCheckController(
now, now,
Infinity, Infinity,
); );
return res
.status(200) const maxConcurrency = getConcurrencyLimitMax(
.json({ success: true, concurrency: activeJobsOfTeam.length }); req.auth.plan as PlanType,
req.auth.team_id,
);
return res.status(200).json({
success: true,
concurrency: activeJobsOfTeam.length,
maxConcurrency: maxConcurrency,
});
} }

View File

@ -602,6 +602,7 @@ export type ConcurrencyCheckResponse =
| { | {
success: true; success: true;
concurrency: number; concurrency: number;
maxConcurrency: number;
}; };
export type CrawlStatusResponse = export type CrawlStatusResponse =

View File

@ -262,4 +262,4 @@ logger.info(`Worker ${process.pid} started`);
// sq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused")); // sq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
// sq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); // sq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
// sq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); // sq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));
// //

View File

@ -1,6 +1,6 @@
import { getRateLimiterPoints } from "../services/rate-limiter"; import { CONCURRENCY_LIMIT } from "../services/rate-limiter";
import { redisConnection } from "../services/queue-service"; import { redisConnection } from "../services/queue-service";
import { RateLimiterMode } from "../types"; import { PlanType } from "../types";
import { JobsOptions } from "bullmq"; import { JobsOptions } from "bullmq";
const constructKey = (team_id: string) => "concurrency-limiter:" + team_id; const constructKey = (team_id: string) => "concurrency-limiter:" + team_id;
@ -8,10 +8,6 @@ const constructQueueKey = (team_id: string) =>
"concurrency-limit-queue:" + team_id; "concurrency-limit-queue:" + team_id;
const stalledJobTimeoutMs = 2 * 60 * 1000; const stalledJobTimeoutMs = 2 * 60 * 1000;
export function getConcurrencyLimitMax(plan: string): number {
if (plan === "growth") return 100;
return getRateLimiterPoints(RateLimiterMode.Scrape, undefined, plan);
}
export async function cleanOldConcurrencyLimitEntries( export async function cleanOldConcurrencyLimitEntries(
team_id: string, team_id: string,
@ -78,3 +74,9 @@ export async function pushConcurrencyLimitedJob(
JSON.stringify(job), JSON.stringify(job),
); );
} }
export async function getConcurrencyQueueJobsCount(team_id: string): Promise<number> {
const count = await redisConnection.zcard(constructQueueKey(team_id));
return count;
}

View File

@ -52,6 +52,10 @@ export async function getJobPriority({
let bucketLimit = 0; let bucketLimit = 0;
switch (plan) { switch (plan) {
case "testSuite":
bucketLimit = 1000;
planModifier = 0.25;
break;
case "free": case "free":
bucketLimit = 25; bucketLimit = 25;
planModifier = 0.5; planModifier = 0.5;

View File

@ -7,6 +7,7 @@ import { sendSlackWebhook } from "../alerts/slack";
import { getNotificationString } from "./notification_string"; import { getNotificationString } from "./notification_string";
import { AuthCreditUsageChunk } from "../../controllers/v1/types"; import { AuthCreditUsageChunk } from "../../controllers/v1/types";
import { redlock } from "../redlock"; import { redlock } from "../redlock";
import { redisConnection } from "../queue-service";
const emailTemplates: Record< const emailTemplates: Record<
NotificationType, NotificationType,
@ -33,6 +34,14 @@ const emailTemplates: Record<
subject: "Auto recharge failed - Firecrawl", subject: "Auto recharge failed - Firecrawl",
html: "Hey there,<br/><p>Your auto recharge failed. Please try again manually. If the issue persists, please reach out to us at <a href='mailto:help@firecrawl.com'>help@firecrawl.com</a></p><br/>Thanks,<br/>Firecrawl Team<br/>", html: "Hey there,<br/><p>Your auto recharge failed. Please try again manually. If the issue persists, please reach out to us at <a href='mailto:help@firecrawl.com'>help@firecrawl.com</a></p><br/>Thanks,<br/>Firecrawl Team<br/>",
}, },
[NotificationType.CONCURRENCY_LIMIT_REACHED]: {
subject: "You could be scraping faster - Firecrawl",
html: `Hey there,
<br/>
<p>We've improved our system by transitioning to concurrency limits, allowing faster scraping by default and eliminating* the often rate limit errors.</p>
<p>You're hitting the concurrency limit for your plan quite often, which means Firecrawl can't scrape as fast as it could. But don't worry, it is not failing your requests and you are still getting your results.</p>
<p>This is just to let you know that you could be scraping more pages faster. Consider upgrading your plan at <a href='https://firecrawl.dev/pricing'>firecrawl.dev/pricing</a>.</p><br/>Thanks,<br/>Firecrawl Team<br/>`,
},
}; };
export async function sendNotification( export async function sendNotification(
@ -183,3 +192,104 @@ export async function sendNotificationInternal(
}, },
); );
} }
export async function sendNotificationWithCustomDays(
team_id: string,
notificationType: NotificationType,
daysBetweenEmails: number,
bypassRecentChecks: boolean = false,
) {
return withAuth(async (
team_id: string,
notificationType: NotificationType,
daysBetweenEmails: number,
bypassRecentChecks: boolean,
) => {
const redisKey = "notification_sent:" + notificationType + ":" + team_id;
const didSendRecentNotification = (await redisConnection.get(redisKey)) !== null;
if (didSendRecentNotification && !bypassRecentChecks) {
logger.debug(`Notification already sent within the last ${daysBetweenEmails} days for team_id: ${team_id} and notificationType: ${notificationType}`);
return { success: true };
}
await redisConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60);
const now = new Date();
const pastDate = new Date(now.getTime() - daysBetweenEmails * 24 * 60 * 60 * 1000);
const { data: recentNotifications, error: recentNotificationsError } = await supabase_service
.from("user_notifications")
.select("*")
.eq("team_id", team_id)
.eq("notification_type", notificationType)
.gte("sent_date", pastDate.toISOString());
if (recentNotificationsError) {
logger.debug(`Error fetching recent notifications: ${recentNotificationsError}`);
await redisConnection.del(redisKey); // free up redis, let it try again
return { success: false };
}
if (recentNotifications.length > 0 && !bypassRecentChecks) {
logger.debug(`Notification already sent within the last ${daysBetweenEmails} days for team_id: ${team_id} and notificationType: ${notificationType}`);
await redisConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60);
return { success: true };
}
console.log(
`Sending notification for team_id: ${team_id} and notificationType: ${notificationType}`,
);
// get the emails from the user with the team_id
const { data: emails, error: emailsError } = await supabase_service
.from("users")
.select("email")
.eq("team_id", team_id);
if (emailsError) {
logger.debug(`Error fetching emails: ${emailsError}`);
await redisConnection.del(redisKey); // free up redis, let it try again
return { success: false };
}
for (const email of emails) {
await sendEmailNotification(email.email, notificationType);
}
const { error: insertError } = await supabase_service
.from("user_notifications")
.insert([
{
team_id: team_id,
notification_type: notificationType,
sent_date: new Date().toISOString(),
timestamp: new Date().toISOString(),
},
]);
if (process.env.SLACK_ADMIN_WEBHOOK_URL && emails.length > 0) {
sendSlackWebhook(
`${getNotificationString(notificationType)}: Team ${team_id}, with email ${emails[0].email}.`,
false,
process.env.SLACK_ADMIN_WEBHOOK_URL,
).catch((error) => {
logger.debug(`Error sending slack notification: ${error}`);
});
}
if (insertError) {
logger.debug(`Error inserting notification record: ${insertError}`);
await redisConnection.del(redisKey); // free up redis, let it try again
return { success: false };
}
return { success: true };
}, undefined)(
team_id,
notificationType,
daysBetweenEmails,
bypassRecentChecks,
);
}

View File

@ -1,15 +1,18 @@
import { Job, JobsOptions } from "bullmq"; import { Job, JobsOptions } from "bullmq";
import { getScrapeQueue } from "./queue-service"; import { getScrapeQueue } from "./queue-service";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { WebScraperOptions } from "../types"; import { NotificationType, PlanType, WebScraperOptions } from "../types";
import * as Sentry from "@sentry/node"; import * as Sentry from "@sentry/node";
import { import {
cleanOldConcurrencyLimitEntries, cleanOldConcurrencyLimitEntries,
getConcurrencyLimitActiveJobs, getConcurrencyLimitActiveJobs,
getConcurrencyLimitMax, getConcurrencyQueueJobsCount,
pushConcurrencyLimitActiveJob, pushConcurrencyLimitActiveJob,
pushConcurrencyLimitedJob, pushConcurrencyLimitedJob,
} from "../lib/concurrency-limit"; } from "../lib/concurrency-limit";
import { logger } from "../lib/logger";
import { getConcurrencyLimitMax } from "./rate-limiter";
import { sendNotificationWithCustomDays } from "./notification/email_notification";
async function _addScrapeJobToConcurrencyQueue( async function _addScrapeJobToConcurrencyQueue(
webScraperOptions: any, webScraperOptions: any,
@ -57,21 +60,34 @@ async function addScrapeJobRaw(
jobPriority: number, jobPriority: number,
) { ) {
let concurrencyLimited = false; let concurrencyLimited = false;
let currentActiveConcurrency = 0;
let maxConcurrency = 0;
console.log("Concurrency check: ", webScraperOptions.team_id);
if ( if (
webScraperOptions && webScraperOptions &&
webScraperOptions.team_id && webScraperOptions.team_id
webScraperOptions.plan
) { ) {
const now = Date.now(); const now = Date.now();
const limit = await getConcurrencyLimitMax(webScraperOptions.plan); maxConcurrency = getConcurrencyLimitMax(webScraperOptions.plan ?? "free", webScraperOptions.team_id);
cleanOldConcurrencyLimitEntries(webScraperOptions.team_id, now); cleanOldConcurrencyLimitEntries(webScraperOptions.team_id, now);
concurrencyLimited = currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(webScraperOptions.team_id, now)).length;
(await getConcurrencyLimitActiveJobs(webScraperOptions.team_id, now)) concurrencyLimited = currentActiveConcurrency >= maxConcurrency;
.length >= limit;
} }
const concurrencyQueueJobs = await getConcurrencyQueueJobsCount(webScraperOptions.team_id);
if (concurrencyLimited) { if (concurrencyLimited) {
// Detect if they hit their concurrent limit
// If above by 2x, send them an email
// No need to 2x as if there are more than the max concurrency in the concurrency queue, it is already 2x
if(concurrencyQueueJobs > maxConcurrency) {
logger.info("Concurrency limited 2x (single) - ", "Concurrency queue jobs: ", concurrencyQueueJobs, "Max concurrency: ", maxConcurrency);
// sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 10, false).catch((error) => {
// logger.error("Error sending notification (concurrency limit reached): ", error);
// });
}
await _addScrapeJobToConcurrencyQueue( await _addScrapeJobToConcurrencyQueue(
webScraperOptions, webScraperOptions,
options, options,
@ -134,15 +150,18 @@ export async function addScrapeJobs(
if (jobs.length === 0) return true; if (jobs.length === 0) return true;
let countCanBeDirectlyAdded = Infinity; let countCanBeDirectlyAdded = Infinity;
let currentActiveConcurrency = 0;
let maxConcurrency = 0;
if (jobs[0].data && jobs[0].data.team_id && jobs[0].data.plan) { if (jobs[0].data && jobs[0].data.team_id && jobs[0].data.plan) {
const now = Date.now(); const now = Date.now();
const limit = await getConcurrencyLimitMax(jobs[0].data.plan); maxConcurrency = getConcurrencyLimitMax(jobs[0].data.plan as PlanType, jobs[0].data.team_id);
cleanOldConcurrencyLimitEntries(jobs[0].data.team_id, now); cleanOldConcurrencyLimitEntries(jobs[0].data.team_id, now);
currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(jobs[0].data.team_id, now)).length;
countCanBeDirectlyAdded = Math.max( countCanBeDirectlyAdded = Math.max(
limit - maxConcurrency - currentActiveConcurrency,
(await getConcurrencyLimitActiveJobs(jobs[0].data.team_id, now)).length,
0, 0,
); );
} }
@ -150,6 +169,14 @@ export async function addScrapeJobs(
const addToBull = jobs.slice(0, countCanBeDirectlyAdded); const addToBull = jobs.slice(0, countCanBeDirectlyAdded);
const addToCQ = jobs.slice(countCanBeDirectlyAdded); const addToCQ = jobs.slice(countCanBeDirectlyAdded);
// equals 2x the max concurrency
if(addToCQ.length > maxConcurrency) {
logger.info("Concurrency limited 2x (multiple) - ", "Concurrency queue jobs: ", addToCQ.length, "Max concurrency: ", maxConcurrency);
// sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 10, false).catch((error) => {
// logger.error("Error sending notification (concurrency limit reached): ", error);
// });
}
await Promise.all( await Promise.all(
addToBull.map(async (job) => { addToBull.map(async (job) => {
const size = JSON.stringify(job.data).length; const size = JSON.stringify(job.data).length;

View File

@ -1,7 +1,30 @@
import { RateLimiterRedis } from "rate-limiter-flexible"; import { RateLimiterRedis } from "rate-limiter-flexible";
import { RateLimiterMode } from "../../src/types"; import { PlanType, RateLimiterMode } from "../../src/types";
import Redis from "ioredis"; import Redis from "ioredis";
export const CONCURRENCY_LIMIT: Omit<Record<PlanType, number>, ""> = {
free: 2,
hobby: 4,
starter: 10,
standard: 10,
standardNew: 10,
standardnew: 10,
scale: 100,
growth: 100,
growthdouble: 100,
etier2c: 300,
etier1a: 200,
etier2a: 300,
etierscale1: 150,
testSuite: 200,
devB: 120,
etier2d: 250,
manual: 200,
extract_starter: 20,
extract_explorer: 100,
extract_pro: 200
};
const RATE_LIMITS = { const RATE_LIMITS = {
crawl: { crawl: {
default: 3, default: 3,
@ -266,3 +289,38 @@ export function getRateLimiter(
getRateLimiterPoints(mode, token, plan, teamId), getRateLimiterPoints(mode, token, plan, teamId),
); );
} }
export function getConcurrencyLimitMax(
plan: PlanType,
teamId?: string,
): number {
// Moved this to auth check, plan will come as testSuite if token is present
// if (token && testSuiteTokens.some((testToken) => token.includes(testToken))) {
// return CONCURRENCY_LIMIT.testSuite;
// }
if (teamId && teamId === process.env.DEV_B_TEAM_ID) {
return CONCURRENCY_LIMIT.devB;
}
if (teamId && teamId === process.env.ETIER1A_TEAM_ID) {
return CONCURRENCY_LIMIT.etier1a;
}
if (teamId && teamId === process.env.ETIER2A_TEAM_ID) {
return CONCURRENCY_LIMIT.etier2a;
}
if (teamId && teamId === process.env.ETIER2D_TEAM_ID) {
return CONCURRENCY_LIMIT.etier2a;
}
if (teamId && manual.includes(teamId)) {
return CONCURRENCY_LIMIT.manual;
}
return CONCURRENCY_LIMIT[plan] ?? 10;
}
export function isTestSuiteToken(token: string): boolean {
return testSuiteTokens.some((testToken) => token.includes(testToken));
}

View File

@ -158,6 +158,7 @@ export enum NotificationType {
RATE_LIMIT_REACHED = "rateLimitReached", RATE_LIMIT_REACHED = "rateLimitReached",
AUTO_RECHARGE_SUCCESS = "autoRechargeSuccess", AUTO_RECHARGE_SUCCESS = "autoRechargeSuccess",
AUTO_RECHARGE_FAILED = "autoRechargeFailed", AUTO_RECHARGE_FAILED = "autoRechargeFailed",
CONCURRENCY_LIMIT_REACHED = "concurrencyLimitReached",
} }
export type ScrapeLog = { export type ScrapeLog = {
@ -181,12 +182,18 @@ export type PlanType =
| "scale" | "scale"
| "hobby" | "hobby"
| "standardnew" | "standardnew"
| "standardNew"
| "growth" | "growth"
| "growthdouble" | "growthdouble"
| "etier2c" | "etier2c"
| "etier1a" | "etier1a"
| "etierscale1" | "etierscale1"
| "etier2a"
| "free" | "free"
| "testSuite"
| "devB"
| "etier2d"
| "manual"
| "extract_starter" | "extract_starter"
| "extract_explorer" | "extract_explorer"
| "extract_pro" | "extract_pro"