Merge pull request #566 from mendableai/nsc/job-priority

Internal Concurrency Limits <> Job Priorities
This commit is contained in:
Nicolas 2024-08-27 17:24:16 -03:00 committed by GitHub
commit ff08d7093e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 300 additions and 18 deletions

View File

@ -3,6 +3,7 @@ import { getRateLimiter } from "../../src/services/rate-limiter";
import { import {
AuthResponse, AuthResponse,
NotificationType, NotificationType,
PlanType,
RateLimiterMode, RateLimiterMode,
} from "../../src/types"; } from "../../src/types";
import { supabase_service } from "../../src/services/supabase"; import { supabase_service } from "../../src/services/supabase";
@ -88,7 +89,7 @@ export async function supaAuthenticateUser(
team_id?: string; team_id?: string;
error?: string; error?: string;
status?: number; status?: number;
plan?: string; plan?: PlanType;
}> { }> {
const authHeader = req.headers.authorization; const authHeader = req.headers.authorization;
if (!authHeader) { if (!authHeader) {
@ -327,10 +328,10 @@ export async function supaAuthenticateUser(
return { return {
success: true, success: true,
team_id: subscriptionData.team_id, team_id: subscriptionData.team_id,
plan: subscriptionData.plan ?? "", plan: (subscriptionData.plan ?? "") as PlanType,
}; };
} }
function getPlanByPriceId(price_id: string) { function getPlanByPriceId(price_id: string): PlanType {
switch (price_id) { switch (price_id) {
case process.env.STRIPE_PRICE_ID_STARTER: case process.env.STRIPE_PRICE_ID_STARTER:
return "starter"; return "starter";

View File

@ -25,11 +25,12 @@ import {
} from "../../src/lib/crawl-redis"; } from "../../src/lib/crawl-redis";
import { getScrapeQueue } from "../../src/services/queue-service"; import { getScrapeQueue } from "../../src/services/queue-service";
import { checkAndUpdateURL } from "../../src/lib/validateUrl"; import { checkAndUpdateURL } from "../../src/lib/validateUrl";
import { getJobPriority } from "../../src/lib/job-priority";
import * as Sentry from "@sentry/node"; import * as Sentry from "@sentry/node";
export async function crawlController(req: Request, res: Response) { export async function crawlController(req: Request, res: Response) {
try { try {
const { success, team_id, error, status } = await authenticateUser( const { success, team_id, error, status, plan } = await authenticateUser(
req, req,
res, res,
RateLimiterMode.Crawl RateLimiterMode.Crawl
@ -148,6 +149,7 @@ export async function crawlController(req: Request, res: Response) {
crawlerOptions, crawlerOptions,
pageOptions, pageOptions,
team_id, team_id,
plan,
createdAt: Date.now(), createdAt: Date.now(),
}; };
@ -163,7 +165,15 @@ export async function crawlController(req: Request, res: Response) {
? null ? null
: await crawler.tryGetSitemap(); : await crawler.tryGetSitemap();
if (sitemap !== null && sitemap.length > 0) { if (sitemap !== null && sitemap.length > 0) {
let jobPriority = 20;
// If it is over 1000, we need to get the job priority,
// otherwise we can use the default priority of 20
if(sitemap.length > 1000){
// set base to 21
jobPriority = await getJobPriority({plan, team_id, basePriority: 21})
}
const jobs = sitemap.map((x) => { const jobs = sitemap.map((x) => {
const url = x.url; const url = x.url;
const uuid = uuidv4(); const uuid = uuidv4();
@ -181,7 +191,7 @@ export async function crawlController(req: Request, res: Response) {
}, },
opts: { opts: {
jobId: uuid, jobId: uuid,
priority: 20, priority: jobPriority,
}, },
}; };
}); });
@ -204,6 +214,10 @@ export async function crawlController(req: Request, res: Response) {
} }
} else { } else {
await lockURL(id, sc, url); await lockURL(id, sc, url);
// Not needed, first one should be 15.
// const jobPriority = await getJobPriority({plan, team_id, basePriority: 10})
const job = await addScrapeJob( const job = await addScrapeJob(
{ {
url, url,

View File

@ -11,7 +11,7 @@ import * as Sentry from "@sentry/node";
export async function crawlPreviewController(req: Request, res: Response) { export async function crawlPreviewController(req: Request, res: Response) {
try { try {
const { success, error, status } = await authenticateUser( const { success, error, status, team_id:a, plan } = await authenticateUser(
req, req,
res, res,
RateLimiterMode.Preview RateLimiterMode.Preview
@ -89,6 +89,7 @@ export async function crawlPreviewController(req: Request, res: Response) {
crawlerOptions, crawlerOptions,
pageOptions, pageOptions,
team_id, team_id,
plan,
robots, robots,
createdAt: Date.now(), createdAt: Date.now(),
}; };

View File

@ -2,7 +2,7 @@
import { Request, Response } from "express"; import { Request, Response } from "express";
import { billTeam, checkTeamCredits } from "../services/billing/credit_billing"; import { billTeam, checkTeamCredits } from "../services/billing/credit_billing";
import { authenticateUser } from "./auth"; import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../types"; import { PlanType, RateLimiterMode } from "../types";
import { logJob } from "../services/logging/log_job"; import { logJob } from "../services/logging/log_job";
import { Document } from "../lib/entities"; import { Document } from "../lib/entities";
import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function
@ -12,6 +12,7 @@ import { addScrapeJob } from '../services/queue-jobs';
import { getScrapeQueue } from '../services/queue-service'; import { getScrapeQueue } from '../services/queue-service';
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { Logger } from '../lib/logger'; import { Logger } from '../lib/logger';
import { getJobPriority } from '../lib/job-priority';
import * as Sentry from "@sentry/node"; import * as Sentry from "@sentry/node";
export async function scrapeHelper( export async function scrapeHelper(
@ -22,7 +23,7 @@ export async function scrapeHelper(
pageOptions: PageOptions, pageOptions: PageOptions,
extractorOptions: ExtractorOptions, extractorOptions: ExtractorOptions,
timeout: number, timeout: number,
plan?: string plan?: PlanType
): Promise<{ ): Promise<{
success: boolean; success: boolean;
error?: string; error?: string;
@ -38,6 +39,8 @@ export async function scrapeHelper(
return { success: false, error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.", returnCode: 403 }; return { success: false, error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.", returnCode: 403 };
} }
const jobPriority = await getJobPriority({plan, team_id, basePriority: 10})
const job = await addScrapeJob({ const job = await addScrapeJob({
url, url,
mode: "single_urls", mode: "single_urls",
@ -46,7 +49,7 @@ export async function scrapeHelper(
pageOptions, pageOptions,
extractorOptions, extractorOptions,
origin: req.body.origin ?? defaultOrigin, origin: req.body.origin ?? defaultOrigin,
}, {}, jobId); }, {}, jobId, jobPriority);
let doc; let doc;

View File

@ -2,13 +2,14 @@ import { Request, Response } from "express";
import { WebScraperDataProvider } from "../scraper/WebScraper"; import { WebScraperDataProvider } from "../scraper/WebScraper";
import { billTeam, checkTeamCredits } from "../services/billing/credit_billing"; import { billTeam, checkTeamCredits } from "../services/billing/credit_billing";
import { authenticateUser } from "./auth"; import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../types"; import { PlanType, RateLimiterMode } from "../types";
import { logJob } from "../services/logging/log_job"; import { logJob } from "../services/logging/log_job";
import { PageOptions, SearchOptions } from "../lib/entities"; import { PageOptions, SearchOptions } from "../lib/entities";
import { search } from "../search"; import { search } from "../search";
import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { Logger } from "../lib/logger"; import { Logger } from "../lib/logger";
import { getJobPriority } from "../lib/job-priority";
import { getScrapeQueue } from "../services/queue-service"; import { getScrapeQueue } from "../services/queue-service";
import * as Sentry from "@sentry/node"; import * as Sentry from "@sentry/node";
import { addScrapeJob } from "../services/queue-jobs"; import { addScrapeJob } from "../services/queue-jobs";
@ -20,6 +21,7 @@ export async function searchHelper(
crawlerOptions: any, crawlerOptions: any,
pageOptions: PageOptions, pageOptions: PageOptions,
searchOptions: SearchOptions, searchOptions: SearchOptions,
plan: PlanType
): Promise<{ ): Promise<{
success: boolean; success: boolean;
error?: string; error?: string;
@ -76,6 +78,8 @@ export async function searchHelper(
return { success: true, error: "No search results found", returnCode: 200 }; return { success: true, error: "No search results found", returnCode: 200 };
} }
const jobPriority = await getJobPriority({plan, team_id, basePriority: 20});
// filter out social media links // filter out social media links
const jobDatas = res.map(x => { const jobDatas = res.map(x => {
@ -92,7 +96,7 @@ export async function searchHelper(
}, },
opts: { opts: {
jobId: uuid, jobId: uuid,
priority: 20, priority: jobPriority,
} }
}; };
}) })
@ -152,7 +156,7 @@ export async function searchHelper(
export async function searchController(req: Request, res: Response) { export async function searchController(req: Request, res: Response) {
try { try {
// make sure to authenticate user first, Bearer <token> // make sure to authenticate user first, Bearer <token>
const { success, team_id, error, status } = await authenticateUser( const { success, team_id, error, status, plan } = await authenticateUser(
req, req,
res, res,
RateLimiterMode.Search RateLimiterMode.Search
@ -194,6 +198,7 @@ export async function searchController(req: Request, res: Response) {
crawlerOptions, crawlerOptions,
pageOptions, pageOptions,
searchOptions, searchOptions,
plan
); );
const endTime = new Date().getTime(); const endTime = new Date().getTime();
const timeTakenInSeconds = (endTime - startTime) / 1000; const timeTakenInSeconds = (endTime - startTime) / 1000;

View File

@ -189,6 +189,8 @@ if (cluster.isMaster) {
Logger.info(`Worker ${process.pid} started`); Logger.info(`Worker ${process.pid} started`);
} }
// const sq = getScrapeQueue(); // const sq = getScrapeQueue();
// sq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); // sq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));

View File

@ -0,0 +1,134 @@
import {
getJobPriority,
addJobPriority,
deleteJobPriority,
} from "../job-priority";
import { redisConnection } from "../../services/queue-service";
import { PlanType } from "../../types";
jest.mock("../../services/queue-service", () => ({
redisConnection: {
sadd: jest.fn(),
srem: jest.fn(),
scard: jest.fn(),
expire: jest.fn(),
},
}));
describe("Job Priority Tests", () => {
afterEach(() => {
jest.clearAllMocks();
});
test("addJobPriority should add job_id to the set and set expiration", async () => {
const team_id = "team1";
const job_id = "job1";
await addJobPriority(team_id, job_id);
expect(redisConnection.sadd).toHaveBeenCalledWith(
`limit_team_id:${team_id}`,
job_id
);
expect(redisConnection.expire).toHaveBeenCalledWith(
`limit_team_id:${team_id}`,
60
);
});
test("deleteJobPriority should remove job_id from the set", async () => {
const team_id = "team1";
const job_id = "job1";
await deleteJobPriority(team_id, job_id);
expect(redisConnection.srem).toHaveBeenCalledWith(
`limit_team_id:${team_id}`,
job_id
);
});
test("getJobPriority should return correct priority based on plan and set length", async () => {
const team_id = "team1";
const plan: PlanType = "standard";
(redisConnection.scard as jest.Mock).mockResolvedValue(150);
const priority = await getJobPriority({ plan, team_id });
expect(priority).toBe(10);
(redisConnection.scard as jest.Mock).mockResolvedValue(250);
const priorityExceeded = await getJobPriority({ plan, team_id });
expect(priorityExceeded).toBe(20); // basePriority + Math.ceil((250 - 200) * 0.4)
});
test("getJobPriority should handle different plans correctly", async () => {
const team_id = "team1";
(redisConnection.scard as jest.Mock).mockResolvedValue(50);
let plan: PlanType = "hobby";
let priority = await getJobPriority({ plan, team_id });
expect(priority).toBe(10);
(redisConnection.scard as jest.Mock).mockResolvedValue(150);
plan = "hobby";
priority = await getJobPriority({ plan, team_id });
expect(priority).toBe(25); // basePriority + Math.ceil((150 - 50) * 0.3)
(redisConnection.scard as jest.Mock).mockResolvedValue(25);
plan = "free";
priority = await getJobPriority({ plan, team_id });
expect(priority).toBe(10);
(redisConnection.scard as jest.Mock).mockResolvedValue(60);
plan = "free";
priority = await getJobPriority({ plan, team_id });
expect(priority).toBe(28); // basePriority + Math.ceil((60 - 25) * 0.5)
});
test("addJobPriority should reset expiration time when adding new job", async () => {
const team_id = "team1";
const job_id1 = "job1";
const job_id2 = "job2";
await addJobPriority(team_id, job_id1);
expect(redisConnection.expire).toHaveBeenCalledWith(
`limit_team_id:${team_id}`,
60
);
// Clear the mock calls
(redisConnection.expire as jest.Mock).mockClear();
// Add another job
await addJobPriority(team_id, job_id2);
expect(redisConnection.expire).toHaveBeenCalledWith(
`limit_team_id:${team_id}`,
60
);
});
test("Set should expire after 60 seconds", async () => {
const team_id = "team1";
const job_id = "job1";
jest.useFakeTimers();
await addJobPriority(team_id, job_id);
expect(redisConnection.expire).toHaveBeenCalledWith(
`limit_team_id:${team_id}`,
60
);
// Fast-forward time by 59 seconds
jest.advanceTimersByTime(59000);
// The set should still exist
expect(redisConnection.scard).not.toHaveBeenCalled();
// Fast-forward time by 2 more seconds (total 61 seconds)
jest.advanceTimersByTime(2000);
// Check if the set has been removed (scard should return 0)
(redisConnection.scard as jest.Mock).mockResolvedValue(0);
const setSize = await redisConnection.scard(`limit_team_id:${team_id}`);
expect(setSize).toBe(0);
jest.useRealTimers();
});
});

View File

@ -6,6 +6,7 @@ export type StoredCrawl = {
crawlerOptions: any; crawlerOptions: any;
pageOptions: any; pageOptions: any;
team_id: string; team_id: string;
plan: string;
robots?: string; robots?: string;
cancelled?: boolean; cancelled?: boolean;
createdAt: number; createdAt: number;

View File

@ -0,0 +1,91 @@
import { redisConnection } from "../../src/services/queue-service";
import { PlanType } from "../../src/types";
import { Logger } from "./logger";
const SET_KEY_PREFIX = "limit_team_id:";
export async function addJobPriority(team_id, job_id) {
try {
const setKey = SET_KEY_PREFIX + team_id;
// Add scrape job id to the set
await redisConnection.sadd(setKey, job_id);
// This approach will reset the expiration time to 60 seconds every time a new job is added to the set.
await redisConnection.expire(setKey, 60);
} catch (e) {
Logger.error(`Add job priority (sadd) failed: ${team_id}, ${job_id}`);
}
}
export async function deleteJobPriority(team_id, job_id) {
try {
const setKey = SET_KEY_PREFIX + team_id;
// remove job_id from the set
await redisConnection.srem(setKey, job_id);
} catch (e) {
Logger.error(`Delete job priority (srem) failed: ${team_id}, ${job_id}`);
}
}
export async function getJobPriority({
plan,
team_id,
basePriority = 10,
}: {
plan: PlanType;
team_id: string;
basePriority?: number;
}): Promise<number> {
try {
const setKey = SET_KEY_PREFIX + team_id;
// Get the length of the set
const setLength = await redisConnection.scard(setKey);
// Determine the priority based on the plan and set length
let planModifier = 1;
let bucketLimit = 0;
switch (plan) {
case "free":
bucketLimit = 25;
planModifier = 0.5;
break;
case "hobby":
bucketLimit = 100;
planModifier = 0.3;
break;
case "standard":
case "standardnew":
bucketLimit = 200;
planModifier = 0.2;
break;
case "growth":
case "growthdouble":
bucketLimit = 400;
planModifier = 0.1;
break;
default:
bucketLimit = 25;
planModifier = 1;
break;
}
// if length set is smaller than set, just return base priority
if (setLength <= bucketLimit) {
return basePriority;
} else {
// If not, we keep base priority + planModifier
return Math.ceil(
basePriority + Math.ceil((setLength - bucketLimit) * planModifier)
);
}
} catch (e) {
Logger.error(
`Get job priority failed: ${team_id}, ${plan}, ${basePriority}`
);
return basePriority;
}
}

View File

@ -8,10 +8,11 @@ async function addScrapeJobRaw(
webScraperOptions: any, webScraperOptions: any,
options: any, options: any,
jobId: string, jobId: string,
jobPriority: number = 10
): Promise<Job> { ): Promise<Job> {
return await getScrapeQueue().add(jobId, webScraperOptions, { return await getScrapeQueue().add(jobId, webScraperOptions, {
...options, ...options,
priority: webScraperOptions.crawl_id ? 20 : 10, priority: jobPriority,
jobId, jobId,
}); });
} }
@ -20,7 +21,9 @@ export async function addScrapeJob(
webScraperOptions: WebScraperOptions, webScraperOptions: WebScraperOptions,
options: any = {}, options: any = {},
jobId: string = uuidv4(), jobId: string = uuidv4(),
jobPriority: number = 10
): Promise<Job> { ): Promise<Job> {
if (Sentry.isInitialized()) { if (Sentry.isInitialized()) {
const size = JSON.stringify(webScraperOptions).length; const size = JSON.stringify(webScraperOptions).length;
return await Sentry.startSpan({ return await Sentry.startSpan({
@ -39,10 +42,12 @@ export async function addScrapeJob(
baggage: Sentry.spanToBaggageHeader(span), baggage: Sentry.spanToBaggageHeader(span),
size, size,
}, },
}, options, jobId); }, options, jobId, jobPriority);
}); });
} else { } else {
return await addScrapeJobRaw(webScraperOptions, options, jobId); return await addScrapeJobRaw(webScraperOptions, options, jobId, jobPriority);
} }
} }

View File

@ -21,6 +21,8 @@ import { addCrawlJob, addCrawlJobDone, crawlToCrawler, finishCrawl, getCrawl, ge
import { StoredCrawl } from "../lib/crawl-redis"; import { StoredCrawl } from "../lib/crawl-redis";
import { addScrapeJob } from "./queue-jobs"; import { addScrapeJob } from "./queue-jobs";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs"; import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
import { addJobPriority, deleteJobPriority, getJobPriority } from "../../src/lib/job-priority";
import { PlanType } from "../types";
if (process.env.ENV === "production") { if (process.env.ENV === "production") {
initSDK({ initSDK({
@ -50,6 +52,7 @@ const processJobInternal = async (token: string, job: Job) => {
await job.extendLock(token, jobLockExtensionTime); await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval); }, jobLockExtendInterval);
await addJobPriority(job.data.team_id, job.id );
let err = null; let err = null;
try { try {
const result = await processJob(job, token); const result = await processJob(job, token);
@ -67,6 +70,7 @@ const processJobInternal = async (token: string, job: Job) => {
err = error; err = error;
await job.moveToFailed(error, token, false); await job.moveToFailed(error, token, false);
} finally { } finally {
await deleteJobPriority(job.data.team_id, job.id );
clearInterval(extendLockInterval); clearInterval(extendLockInterval);
} }
@ -249,6 +253,16 @@ async function processJob(job: Job, token: string) {
for (const link of links) { for (const link of links) {
if (await lockURL(job.data.crawl_id, sc, link)) { if (await lockURL(job.data.crawl_id, sc, link)) {
// This seems to work really welel
const jobPriority = await getJobPriority({plan:sc.plan as PlanType, team_id: sc.team_id, basePriority: job.data.crawl_id ? 20 : 10})
const jobId = uuidv4();
// console.log("plan: ", sc.plan);
// console.log("team_id: ", sc.team_id)
// console.log("base priority: ", job.data.crawl_id ? 20 : 10)
// console.log("job priority: " , jobPriority, "\n\n\n")
const newJob = await addScrapeJob({ const newJob = await addScrapeJob({
url: link, url: link,
mode: "single_urls", mode: "single_urls",
@ -257,7 +271,7 @@ async function processJob(job: Job, token: string) {
pageOptions: sc.pageOptions, pageOptions: sc.pageOptions,
origin: job.data.origin, origin: job.data.origin,
crawl_id: job.data.crawl_id, crawl_id: job.data.crawl_id,
}); }, {}, jobId, jobPriority);
await addCrawlJob(job.data.crawl_id, newJob.id); await addCrawlJob(job.data.crawl_id, newJob.id);
} }

View File

@ -113,7 +113,7 @@ export interface AuthResponse {
team_id?: string; team_id?: string;
error?: string; error?: string;
status?: number; status?: number;
plan?: string; plan?: PlanType;
} }
@ -137,3 +137,14 @@ export type ScrapeLog = {
ipv4_support?: boolean | null; ipv4_support?: boolean | null;
ipv6_support?: boolean | null; ipv6_support?: boolean | null;
}; };
export type PlanType =
| "starter"
| "standard"
| "scale"
| "hobby"
| "standardnew"
| "growth"
| "growthdouble"
| "free"
| "";