diff --git a/apps/api/src/controllers/auth.ts b/apps/api/src/controllers/auth.ts index 151733c0..1796acc2 100644 --- a/apps/api/src/controllers/auth.ts +++ b/apps/api/src/controllers/auth.ts @@ -3,6 +3,7 @@ import { getRateLimiter } from "../../src/services/rate-limiter"; import { AuthResponse, NotificationType, + PlanType, RateLimiterMode, } from "../../src/types"; import { supabase_service } from "../../src/services/supabase"; @@ -88,7 +89,7 @@ export async function supaAuthenticateUser( team_id?: string; error?: string; status?: number; - plan?: string; + plan?: PlanType; }> { const authHeader = req.headers.authorization; if (!authHeader) { @@ -327,10 +328,10 @@ export async function supaAuthenticateUser( return { success: true, team_id: subscriptionData.team_id, - plan: subscriptionData.plan ?? "", + plan: (subscriptionData.plan ?? "") as PlanType, }; } -function getPlanByPriceId(price_id: string) { +function getPlanByPriceId(price_id: string): PlanType { switch (price_id) { case process.env.STRIPE_PRICE_ID_STARTER: return "starter"; diff --git a/apps/api/src/controllers/crawl.ts b/apps/api/src/controllers/crawl.ts index c5f440e2..d2123d82 100644 --- a/apps/api/src/controllers/crawl.ts +++ b/apps/api/src/controllers/crawl.ts @@ -25,11 +25,12 @@ import { } from "../../src/lib/crawl-redis"; import { getScrapeQueue } from "../../src/services/queue-service"; import { checkAndUpdateURL } from "../../src/lib/validateUrl"; +import { getJobPriority } from "../../src/lib/job-priority"; import * as Sentry from "@sentry/node"; export async function crawlController(req: Request, res: Response) { try { - const { success, team_id, error, status } = await authenticateUser( + const { success, team_id, error, status, plan } = await authenticateUser( req, res, RateLimiterMode.Crawl @@ -148,6 +149,7 @@ export async function crawlController(req: Request, res: Response) { crawlerOptions, pageOptions, team_id, + plan, createdAt: Date.now(), }; @@ -163,7 +165,15 @@ export async function crawlController(req: Request, res: Response) { ? null : await crawler.tryGetSitemap(); + if (sitemap !== null && sitemap.length > 0) { + let jobPriority = 20; + // If it is over 1000, we need to get the job priority, + // otherwise we can use the default priority of 20 + if(sitemap.length > 1000){ + // set base to 21 + jobPriority = await getJobPriority({plan, team_id, basePriority: 21}) + } const jobs = sitemap.map((x) => { const url = x.url; const uuid = uuidv4(); @@ -181,7 +191,7 @@ export async function crawlController(req: Request, res: Response) { }, opts: { jobId: uuid, - priority: 20, + priority: jobPriority, }, }; }); @@ -204,6 +214,10 @@ export async function crawlController(req: Request, res: Response) { } } else { await lockURL(id, sc, url); + + // Not needed, first one should be 15. + // const jobPriority = await getJobPriority({plan, team_id, basePriority: 10}) + const job = await addScrapeJob( { url, diff --git a/apps/api/src/controllers/crawlPreview.ts b/apps/api/src/controllers/crawlPreview.ts index 59b54458..3e43f07f 100644 --- a/apps/api/src/controllers/crawlPreview.ts +++ b/apps/api/src/controllers/crawlPreview.ts @@ -11,7 +11,7 @@ import * as Sentry from "@sentry/node"; export async function crawlPreviewController(req: Request, res: Response) { try { - const { success, error, status } = await authenticateUser( + const { success, error, status, team_id:a, plan } = await authenticateUser( req, res, RateLimiterMode.Preview @@ -89,6 +89,7 @@ export async function crawlPreviewController(req: Request, res: Response) { crawlerOptions, pageOptions, team_id, + plan, robots, createdAt: Date.now(), }; diff --git a/apps/api/src/controllers/scrape.ts b/apps/api/src/controllers/scrape.ts index 0b4df13c..1ff9a426 100644 --- a/apps/api/src/controllers/scrape.ts +++ b/apps/api/src/controllers/scrape.ts @@ -2,7 +2,7 @@ import { Request, Response } from "express"; import { billTeam, checkTeamCredits } from "../services/billing/credit_billing"; import { authenticateUser } from "./auth"; -import { RateLimiterMode } from "../types"; +import { PlanType, RateLimiterMode } from "../types"; import { logJob } from "../services/logging/log_job"; import { Document } from "../lib/entities"; import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function @@ -12,6 +12,7 @@ import { addScrapeJob } from '../services/queue-jobs'; import { getScrapeQueue } from '../services/queue-service'; import { v4 as uuidv4 } from "uuid"; import { Logger } from '../lib/logger'; +import { getJobPriority } from '../lib/job-priority'; import * as Sentry from "@sentry/node"; export async function scrapeHelper( @@ -22,7 +23,7 @@ export async function scrapeHelper( pageOptions: PageOptions, extractorOptions: ExtractorOptions, timeout: number, - plan?: string + plan?: PlanType ): Promise<{ success: boolean; error?: string; @@ -38,6 +39,8 @@ export async function scrapeHelper( return { success: false, error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.", returnCode: 403 }; } + const jobPriority = await getJobPriority({plan, team_id, basePriority: 10}) + const job = await addScrapeJob({ url, mode: "single_urls", @@ -46,7 +49,7 @@ export async function scrapeHelper( pageOptions, extractorOptions, origin: req.body.origin ?? defaultOrigin, - }, {}, jobId); + }, {}, jobId, jobPriority); let doc; diff --git a/apps/api/src/controllers/search.ts b/apps/api/src/controllers/search.ts index aeb044d8..92efe1df 100644 --- a/apps/api/src/controllers/search.ts +++ b/apps/api/src/controllers/search.ts @@ -2,13 +2,14 @@ import { Request, Response } from "express"; import { WebScraperDataProvider } from "../scraper/WebScraper"; import { billTeam, checkTeamCredits } from "../services/billing/credit_billing"; import { authenticateUser } from "./auth"; -import { RateLimiterMode } from "../types"; +import { PlanType, RateLimiterMode } from "../types"; import { logJob } from "../services/logging/log_job"; import { PageOptions, SearchOptions } from "../lib/entities"; import { search } from "../search"; import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; import { v4 as uuidv4 } from "uuid"; import { Logger } from "../lib/logger"; +import { getJobPriority } from "../lib/job-priority"; import { getScrapeQueue } from "../services/queue-service"; import * as Sentry from "@sentry/node"; import { addScrapeJob } from "../services/queue-jobs"; @@ -20,6 +21,7 @@ export async function searchHelper( crawlerOptions: any, pageOptions: PageOptions, searchOptions: SearchOptions, + plan: PlanType ): Promise<{ success: boolean; error?: string; @@ -76,6 +78,8 @@ export async function searchHelper( return { success: true, error: "No search results found", returnCode: 200 }; } + const jobPriority = await getJobPriority({plan, team_id, basePriority: 20}); + // filter out social media links const jobDatas = res.map(x => { @@ -92,7 +96,7 @@ export async function searchHelper( }, opts: { jobId: uuid, - priority: 20, + priority: jobPriority, } }; }) @@ -152,7 +156,7 @@ export async function searchHelper( export async function searchController(req: Request, res: Response) { try { // make sure to authenticate user first, Bearer - const { success, team_id, error, status } = await authenticateUser( + const { success, team_id, error, status, plan } = await authenticateUser( req, res, RateLimiterMode.Search @@ -194,6 +198,7 @@ export async function searchController(req: Request, res: Response) { crawlerOptions, pageOptions, searchOptions, + plan ); const endTime = new Date().getTime(); const timeTakenInSeconds = (endTime - startTime) / 1000; diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 0674a46f..4d096894 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -189,6 +189,8 @@ if (cluster.isMaster) { Logger.info(`Worker ${process.pid} started`); } + + // const sq = getScrapeQueue(); // sq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); diff --git a/apps/api/src/lib/__tests__/job-priority.test.ts b/apps/api/src/lib/__tests__/job-priority.test.ts new file mode 100644 index 00000000..82477379 --- /dev/null +++ b/apps/api/src/lib/__tests__/job-priority.test.ts @@ -0,0 +1,134 @@ +import { + getJobPriority, + addJobPriority, + deleteJobPriority, +} from "../job-priority"; +import { redisConnection } from "../../services/queue-service"; +import { PlanType } from "../../types"; + +jest.mock("../../services/queue-service", () => ({ + redisConnection: { + sadd: jest.fn(), + srem: jest.fn(), + scard: jest.fn(), + expire: jest.fn(), + }, +})); + +describe("Job Priority Tests", () => { + afterEach(() => { + jest.clearAllMocks(); + }); + + test("addJobPriority should add job_id to the set and set expiration", async () => { + const team_id = "team1"; + const job_id = "job1"; + await addJobPriority(team_id, job_id); + expect(redisConnection.sadd).toHaveBeenCalledWith( + `limit_team_id:${team_id}`, + job_id + ); + expect(redisConnection.expire).toHaveBeenCalledWith( + `limit_team_id:${team_id}`, + 60 + ); + }); + + test("deleteJobPriority should remove job_id from the set", async () => { + const team_id = "team1"; + const job_id = "job1"; + await deleteJobPriority(team_id, job_id); + expect(redisConnection.srem).toHaveBeenCalledWith( + `limit_team_id:${team_id}`, + job_id + ); + }); + + test("getJobPriority should return correct priority based on plan and set length", async () => { + const team_id = "team1"; + const plan: PlanType = "standard"; + (redisConnection.scard as jest.Mock).mockResolvedValue(150); + + const priority = await getJobPriority({ plan, team_id }); + expect(priority).toBe(10); + + (redisConnection.scard as jest.Mock).mockResolvedValue(250); + const priorityExceeded = await getJobPriority({ plan, team_id }); + expect(priorityExceeded).toBe(20); // basePriority + Math.ceil((250 - 200) * 0.4) + }); + + test("getJobPriority should handle different plans correctly", async () => { + const team_id = "team1"; + + (redisConnection.scard as jest.Mock).mockResolvedValue(50); + let plan: PlanType = "hobby"; + let priority = await getJobPriority({ plan, team_id }); + expect(priority).toBe(10); + + (redisConnection.scard as jest.Mock).mockResolvedValue(150); + plan = "hobby"; + priority = await getJobPriority({ plan, team_id }); + expect(priority).toBe(25); // basePriority + Math.ceil((150 - 50) * 0.3) + + (redisConnection.scard as jest.Mock).mockResolvedValue(25); + plan = "free"; + priority = await getJobPriority({ plan, team_id }); + expect(priority).toBe(10); + + (redisConnection.scard as jest.Mock).mockResolvedValue(60); + plan = "free"; + priority = await getJobPriority({ plan, team_id }); + expect(priority).toBe(28); // basePriority + Math.ceil((60 - 25) * 0.5) + }); + + test("addJobPriority should reset expiration time when adding new job", async () => { + const team_id = "team1"; + const job_id1 = "job1"; + const job_id2 = "job2"; + + await addJobPriority(team_id, job_id1); + expect(redisConnection.expire).toHaveBeenCalledWith( + `limit_team_id:${team_id}`, + 60 + ); + + // Clear the mock calls + (redisConnection.expire as jest.Mock).mockClear(); + + // Add another job + await addJobPriority(team_id, job_id2); + expect(redisConnection.expire).toHaveBeenCalledWith( + `limit_team_id:${team_id}`, + 60 + ); + }); + + test("Set should expire after 60 seconds", async () => { + const team_id = "team1"; + const job_id = "job1"; + + jest.useFakeTimers(); + + await addJobPriority(team_id, job_id); + expect(redisConnection.expire).toHaveBeenCalledWith( + `limit_team_id:${team_id}`, + 60 + ); + + // Fast-forward time by 59 seconds + jest.advanceTimersByTime(59000); + + // The set should still exist + expect(redisConnection.scard).not.toHaveBeenCalled(); + + // Fast-forward time by 2 more seconds (total 61 seconds) + jest.advanceTimersByTime(2000); + + // Check if the set has been removed (scard should return 0) + (redisConnection.scard as jest.Mock).mockResolvedValue(0); + const setSize = await redisConnection.scard(`limit_team_id:${team_id}`); + expect(setSize).toBe(0); + + jest.useRealTimers(); + }); +}); diff --git a/apps/api/src/lib/crawl-redis.ts b/apps/api/src/lib/crawl-redis.ts index 9e8a0cf6..b8c1c151 100644 --- a/apps/api/src/lib/crawl-redis.ts +++ b/apps/api/src/lib/crawl-redis.ts @@ -6,6 +6,7 @@ export type StoredCrawl = { crawlerOptions: any; pageOptions: any; team_id: string; + plan: string; robots?: string; cancelled?: boolean; createdAt: number; diff --git a/apps/api/src/lib/job-priority.ts b/apps/api/src/lib/job-priority.ts new file mode 100644 index 00000000..bb6158f9 --- /dev/null +++ b/apps/api/src/lib/job-priority.ts @@ -0,0 +1,91 @@ +import { redisConnection } from "../../src/services/queue-service"; +import { PlanType } from "../../src/types"; +import { Logger } from "./logger"; + +const SET_KEY_PREFIX = "limit_team_id:"; +export async function addJobPriority(team_id, job_id) { + try { + const setKey = SET_KEY_PREFIX + team_id; + + // Add scrape job id to the set + await redisConnection.sadd(setKey, job_id); + + // This approach will reset the expiration time to 60 seconds every time a new job is added to the set. + await redisConnection.expire(setKey, 60); + } catch (e) { + Logger.error(`Add job priority (sadd) failed: ${team_id}, ${job_id}`); + } +} + +export async function deleteJobPriority(team_id, job_id) { + try { + const setKey = SET_KEY_PREFIX + team_id; + + // remove job_id from the set + await redisConnection.srem(setKey, job_id); + } catch (e) { + Logger.error(`Delete job priority (srem) failed: ${team_id}, ${job_id}`); + } +} + +export async function getJobPriority({ + plan, + team_id, + basePriority = 10, +}: { + plan: PlanType; + team_id: string; + basePriority?: number; +}): Promise { + try { + const setKey = SET_KEY_PREFIX + team_id; + + // Get the length of the set + const setLength = await redisConnection.scard(setKey); + + // Determine the priority based on the plan and set length + let planModifier = 1; + let bucketLimit = 0; + + switch (plan) { + case "free": + bucketLimit = 25; + planModifier = 0.5; + break; + case "hobby": + bucketLimit = 100; + planModifier = 0.3; + break; + case "standard": + case "standardnew": + bucketLimit = 200; + planModifier = 0.2; + break; + case "growth": + case "growthdouble": + bucketLimit = 400; + planModifier = 0.1; + break; + + default: + bucketLimit = 25; + planModifier = 1; + break; + } + + // if length set is smaller than set, just return base priority + if (setLength <= bucketLimit) { + return basePriority; + } else { + // If not, we keep base priority + planModifier + return Math.ceil( + basePriority + Math.ceil((setLength - bucketLimit) * planModifier) + ); + } + } catch (e) { + Logger.error( + `Get job priority failed: ${team_id}, ${plan}, ${basePriority}` + ); + return basePriority; + } +} diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 888cdefc..740f48a2 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -8,10 +8,11 @@ async function addScrapeJobRaw( webScraperOptions: any, options: any, jobId: string, + jobPriority: number = 10 ): Promise { return await getScrapeQueue().add(jobId, webScraperOptions, { ...options, - priority: webScraperOptions.crawl_id ? 20 : 10, + priority: jobPriority, jobId, }); } @@ -20,7 +21,9 @@ export async function addScrapeJob( webScraperOptions: WebScraperOptions, options: any = {}, jobId: string = uuidv4(), + jobPriority: number = 10 ): Promise { + if (Sentry.isInitialized()) { const size = JSON.stringify(webScraperOptions).length; return await Sentry.startSpan({ @@ -39,10 +42,12 @@ export async function addScrapeJob( baggage: Sentry.spanToBaggageHeader(span), size, }, - }, options, jobId); + }, options, jobId, jobPriority); }); } else { - return await addScrapeJobRaw(webScraperOptions, options, jobId); + return await addScrapeJobRaw(webScraperOptions, options, jobId, jobPriority); } } + + diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index a7d20383..ca1a4cbd 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -21,6 +21,8 @@ import { addCrawlJob, addCrawlJobDone, crawlToCrawler, finishCrawl, getCrawl, ge import { StoredCrawl } from "../lib/crawl-redis"; import { addScrapeJob } from "./queue-jobs"; import { supabaseGetJobById } from "../../src/lib/supabase-jobs"; +import { addJobPriority, deleteJobPriority, getJobPriority } from "../../src/lib/job-priority"; +import { PlanType } from "../types"; if (process.env.ENV === "production") { initSDK({ @@ -50,6 +52,7 @@ const processJobInternal = async (token: string, job: Job) => { await job.extendLock(token, jobLockExtensionTime); }, jobLockExtendInterval); + await addJobPriority(job.data.team_id, job.id ); let err = null; try { const result = await processJob(job, token); @@ -67,6 +70,7 @@ const processJobInternal = async (token: string, job: Job) => { err = error; await job.moveToFailed(error, token, false); } finally { + await deleteJobPriority(job.data.team_id, job.id ); clearInterval(extendLockInterval); } @@ -249,6 +253,16 @@ async function processJob(job: Job, token: string) { for (const link of links) { if (await lockURL(job.data.crawl_id, sc, link)) { + + // This seems to work really welel + const jobPriority = await getJobPriority({plan:sc.plan as PlanType, team_id: sc.team_id, basePriority: job.data.crawl_id ? 20 : 10}) + const jobId = uuidv4(); + + // console.log("plan: ", sc.plan); + // console.log("team_id: ", sc.team_id) + // console.log("base priority: ", job.data.crawl_id ? 20 : 10) + // console.log("job priority: " , jobPriority, "\n\n\n") + const newJob = await addScrapeJob({ url: link, mode: "single_urls", @@ -257,7 +271,7 @@ async function processJob(job: Job, token: string) { pageOptions: sc.pageOptions, origin: job.data.origin, crawl_id: job.data.crawl_id, - }); + }, {}, jobId, jobPriority); await addCrawlJob(job.data.crawl_id, newJob.id); } diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index b092d310..f2932b5a 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -113,7 +113,7 @@ export interface AuthResponse { team_id?: string; error?: string; status?: number; - plan?: string; + plan?: PlanType; } @@ -136,4 +136,15 @@ export type ScrapeLog = { html?: string; ipv4_support?: boolean | null; ipv6_support?: boolean | null; -}; \ No newline at end of file +}; + +export type PlanType = + | "starter" + | "standard" + | "scale" + | "hobby" + | "standardnew" + | "growth" + | "growthdouble" + | "free" + | ""; \ No newline at end of file