From 0ea0a5db46c027c699b9a311fd8d985dc7b3a8fe Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 21 Aug 2024 20:54:39 -0300 Subject: [PATCH 01/11] Nick: wip --- apps/api/src/controllers/auth.ts | 7 +- apps/api/src/controllers/scrape.ts | 9 ++- apps/api/src/index.ts | 27 +++++++ .../src/lib/__tests__/job-priority.test.ts | 68 ++++++++++++++++ apps/api/src/lib/job-priority.ts | 78 +++++++++++++++++++ apps/api/src/services/queue-jobs.ts | 3 +- apps/api/src/services/queue-worker.ts | 4 +- apps/api/src/types.ts | 15 +++- 8 files changed, 201 insertions(+), 10 deletions(-) create mode 100644 apps/api/src/lib/__tests__/job-priority.test.ts create mode 100644 apps/api/src/lib/job-priority.ts diff --git a/apps/api/src/controllers/auth.ts b/apps/api/src/controllers/auth.ts index 9d46d005..b2741296 100644 --- a/apps/api/src/controllers/auth.ts +++ b/apps/api/src/controllers/auth.ts @@ -3,6 +3,7 @@ import { getRateLimiter } from "../../src/services/rate-limiter"; import { AuthResponse, NotificationType, + PlanType, RateLimiterMode, } from "../../src/types"; import { supabase_service } from "../../src/services/supabase"; @@ -82,7 +83,7 @@ export async function supaAuthenticateUser( team_id?: string; error?: string; status?: number; - plan?: string; + plan?: PlanType; }> { const authHeader = req.headers.authorization; if (!authHeader) { @@ -316,10 +317,10 @@ export async function supaAuthenticateUser( return { success: true, team_id: subscriptionData.team_id, - plan: subscriptionData.plan ?? "", + plan: (subscriptionData.plan ?? "") as PlanType, }; } -function getPlanByPriceId(price_id: string) { +function getPlanByPriceId(price_id: string): PlanType { switch (price_id) { case process.env.STRIPE_PRICE_ID_STARTER: return "starter"; diff --git a/apps/api/src/controllers/scrape.ts b/apps/api/src/controllers/scrape.ts index 273b4c56..436ad4fd 100644 --- a/apps/api/src/controllers/scrape.ts +++ b/apps/api/src/controllers/scrape.ts @@ -2,7 +2,7 @@ import { ExtractorOptions, PageOptions } from './../lib/entities'; import { Request, Response } from "express"; import { billTeam, checkTeamCredits } from "../services/billing/credit_billing"; import { authenticateUser } from "./auth"; -import { RateLimiterMode } from "../types"; +import { PlanType, RateLimiterMode } from "../types"; import { logJob } from "../services/logging/log_job"; import { Document } from "../lib/entities"; import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function @@ -12,6 +12,7 @@ import { addScrapeJob } from '../services/queue-jobs'; import { scrapeQueueEvents } from '../services/queue-service'; import { v4 as uuidv4 } from "uuid"; import { Logger } from '../lib/logger'; +import { getJobPriority } from '../lib/job-priority'; export async function scrapeHelper( jobId: string, @@ -21,7 +22,7 @@ export async function scrapeHelper( pageOptions: PageOptions, extractorOptions: ExtractorOptions, timeout: number, - plan?: string + plan?: PlanType ): Promise<{ success: boolean; error?: string; @@ -37,6 +38,8 @@ export async function scrapeHelper( return { success: false, error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.", returnCode: 403 }; } + const jobPriority = await getJobPriority({plan, team_id}) + const job = await addScrapeJob({ url, mode: "single_urls", @@ -45,7 +48,7 @@ export async function scrapeHelper( pageOptions, extractorOptions, origin: req.body.origin ?? defaultOrigin, - }, {}, jobId); + }, {}, jobId, jobPriority); let doc; try { diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 6a6437b3..ca16e4b1 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -187,6 +187,33 @@ if (cluster.isMaster) { Logger.info(`Worker ${process.pid} started`); } + + +async function sendScrapeRequests() { + await new Promise(resolve => setTimeout(resolve, 5000)); + const url = 'http://127.0.0.1:3002/v0/scrape'; + const headers = { + 'Authorization': 'Bearer fc-365b09a44b8844d08e0dc98f13e49bca', + 'Content-Type': 'application/json' + }; + const body = JSON.stringify({ + url: 'https://roastmywebsite.ai' + }); + + const requests = Array.from({ length: 20 }, (_, i) => + fetch(url, { + method: 'POST', + headers: headers, + body: body + }).catch(error => { + Logger.error(`Request ${i + 1} encountered an error: ${error.message}`); + }) + ); + + await Promise.all(requests); +} + +sendScrapeRequests(); // const sq = getScrapeQueue(); // sq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); diff --git a/apps/api/src/lib/__tests__/job-priority.test.ts b/apps/api/src/lib/__tests__/job-priority.test.ts new file mode 100644 index 00000000..e6f80f05 --- /dev/null +++ b/apps/api/src/lib/__tests__/job-priority.test.ts @@ -0,0 +1,68 @@ +import { getJobPriority, addJobPriority, deleteJobPriority } from "../job-priority"; +import { redisConnection } from "../../services/queue-service"; +import { PlanType } from "../../types"; + +jest.mock("../../services/queue-service", () => ({ + redisConnection: { + sadd: jest.fn(), + srem: jest.fn(), + scard: jest.fn(), + }, +})); + +describe("Job Priority Tests", () => { + afterEach(() => { + jest.clearAllMocks(); + }); + + test("addJobPriority should add job_id to the set", async () => { + const team_id = "team1"; + const job_id = "job1"; + await addJobPriority(team_id, job_id); + expect(redisConnection.sadd).toHaveBeenCalledWith(`limit_team_id:${team_id}`, job_id); + }); + + test("deleteFromSet should remove job_id from the set", async () => { + const team_id = "team1"; + const job_id = "job1"; + await deleteJobPriority(team_id, job_id); + expect(redisConnection.srem).toHaveBeenCalledWith(`limit_team_id:${team_id}`, job_id); + }); + + test("getJobPriority should return correct priority based on plan and set length", async () => { + const team_id = "team1"; + const plan: PlanType = "standard"; + (redisConnection.scard as jest.Mock).mockResolvedValue(150); + + const priority = await getJobPriority({ plan, team_id }); + expect(priority).toBe(10); + + (redisConnection.scard as jest.Mock).mockResolvedValue(250); + const priorityExceeded = await getJobPriority({ plan, team_id }); + expect(priorityExceeded).toBe(20); // basePriority + Math.ceil((250 - 200) * 0.2) + }); + + test("getJobPriority should handle different plans correctly", async () => { + const team_id = "team1"; + + (redisConnection.scard as jest.Mock).mockResolvedValue(50); + let plan: PlanType = "hobby"; + let priority = await getJobPriority({ plan, team_id }); + expect(priority).toBe(10); + + (redisConnection.scard as jest.Mock).mockResolvedValue(150); + plan = "hobby"; + priority = await getJobPriority({ plan, team_id }); + expect(priority).toBe(35); // basePriority + Math.ceil((150 - 100) * 0.5) + + (redisConnection.scard as jest.Mock).mockResolvedValue(50); + plan = "free"; + priority = await getJobPriority({ plan, team_id }); + expect(priority).toBe(10); + + (redisConnection.scard as jest.Mock).mockResolvedValue(60); + plan = "free"; + priority = await getJobPriority({ plan, team_id }); + expect(priority).toBe(20); // basePriority + Math.ceil((60 - 50) * 1) + }); +}); diff --git a/apps/api/src/lib/job-priority.ts b/apps/api/src/lib/job-priority.ts new file mode 100644 index 00000000..b418feba --- /dev/null +++ b/apps/api/src/lib/job-priority.ts @@ -0,0 +1,78 @@ +import { redisConnection } from "../../src/services/queue-service"; +import { PlanType } from "../../src/types"; +import { Logger } from "./logger"; + + + +const SET_KEY_PREFIX = "limit_team_id:" +export async function addJobPriority(team_id, job_id) { + try { + const setKey = SET_KEY_PREFIX + team_id; + + // Add scrape job id to the set + await redisConnection.sadd(setKey, job_id); + } catch (e) { + Logger.error(`Add job priority (sadd) failed: ${team_id}, ${job_id}`); + } +} + +export async function deleteJobPriority(team_id, job_id) { + try { + const setKey = SET_KEY_PREFIX + team_id; + + // remove job_id from the set + await redisConnection.srem(setKey, job_id); + } catch (e) { + Logger.error(`Delete job priority (srem) failed: ${team_id}, ${job_id}`); + } +} + +export async function getJobPriority({ + plan, + team_id, +}: { + plan: PlanType; + team_id: string; +}): Promise { + const setKey = SET_KEY_PREFIX + team_id; + + // Get the length of the set + const setLength = await redisConnection.scard(setKey); + + // Determine the priority based on the plan and set length + let basePriority = 10; + let planModifier = 1; + let bucketLimit = 0; + + switch (plan) { + case "standard": + case "standardnew": + bucketLimit = 100; + planModifier = 0.2; + break; + case "growth": + case "growthdouble": + bucketLimit = 200; + planModifier = 0.2; + break; + case "hobby": + bucketLimit = 50; + planModifier = 0.5; + break; + case "free": + default: + bucketLimit = 25; + planModifier = 1; + break; + } + + // if length set is smaller than set, just return base priority + if (setLength <= bucketLimit) { + return basePriority; + } else { + // If not, we keep base priority + planModifier + return Math.ceil( + basePriority + Math.ceil((setLength - bucketLimit) * planModifier) + ); + } +} diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 3099da68..be78e558 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -7,9 +7,10 @@ export async function addScrapeJob( webScraperOptions: WebScraperOptions, options: any = {}, jobId: string = uuidv4(), + jobPriority: number = 10 ): Promise { return await getScrapeQueue().add(jobId, webScraperOptions, { - priority: webScraperOptions.crawl_id ? 20 : 10, + priority: webScraperOptions.crawl_id ? 20 : jobPriority, ...options, jobId, }); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 2086d0a6..f4c720b4 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -21,6 +21,7 @@ import { addCrawlJob, addCrawlJobDone, crawlToCrawler, finishCrawl, getCrawl, ge import { StoredCrawl } from "../lib/crawl-redis"; import { addScrapeJob } from "./queue-jobs"; import { supabaseGetJobById } from "../../src/lib/supabase-jobs"; +import { addJobPriority, deleteJobPriority } from "../../src/lib/job-priority"; if (process.env.ENV === "production") { initSDK({ @@ -50,6 +51,7 @@ const processJobInternal = async (token: string, job: Job) => { await job.extendLock(token, jobLockExtensionTime); }, jobLockExtendInterval); + await addJobPriority(job.data.team_id, job.id ); try { const result = await processJob(job, token); try{ @@ -62,9 +64,9 @@ const processJobInternal = async (token: string, job: Job) => { } } catch (error) { console.log("Job failed, error:", error); - await job.moveToFailed(error, token, false); } finally { + await deleteJobPriority(job.data.team_id, job.id ); clearInterval(extendLockInterval); } }; diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index b092d310..f2932b5a 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -113,7 +113,7 @@ export interface AuthResponse { team_id?: string; error?: string; status?: number; - plan?: string; + plan?: PlanType; } @@ -136,4 +136,15 @@ export type ScrapeLog = { html?: string; ipv4_support?: boolean | null; ipv6_support?: boolean | null; -}; \ No newline at end of file +}; + +export type PlanType = + | "starter" + | "standard" + | "scale" + | "hobby" + | "standardnew" + | "growth" + | "growthdouble" + | "free" + | ""; \ No newline at end of file From c7bfe4ffe83325703359026906b9c478e05a9e60 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 21 Aug 2024 22:20:40 -0300 Subject: [PATCH 02/11] Nick: --- apps/api/src/controllers/crawl.ts | 8 +++++++- apps/api/src/controllers/crawlPreview.ts | 3 ++- apps/api/src/controllers/scrape.ts | 2 +- apps/api/src/index.ts | 2 +- apps/api/src/lib/crawl-redis.ts | 1 + apps/api/src/lib/job-priority.ts | 18 ++++++++++++------ apps/api/src/services/queue-jobs.ts | 2 +- apps/api/src/services/queue-worker.ts | 14 ++++++++++++-- 8 files changed, 37 insertions(+), 13 deletions(-) diff --git a/apps/api/src/controllers/crawl.ts b/apps/api/src/controllers/crawl.ts index 1dfe758f..4f3442d4 100644 --- a/apps/api/src/controllers/crawl.ts +++ b/apps/api/src/controllers/crawl.ts @@ -25,10 +25,11 @@ import { } from "../../src/lib/crawl-redis"; import { getScrapeQueue } from "../../src/services/queue-service"; import { checkAndUpdateURL } from "../../src/lib/validateUrl"; +import { getJobPriority } from "../../src/lib/job-priority"; export async function crawlController(req: Request, res: Response) { try { - const { success, team_id, error, status } = await authenticateUser( + const { success, team_id, error, status, plan } = await authenticateUser( req, res, RateLimiterMode.Crawl @@ -126,6 +127,7 @@ export async function crawlController(req: Request, res: Response) { crawlerOptions, pageOptions, team_id, + plan, createdAt: Date.now(), }; @@ -175,6 +177,10 @@ export async function crawlController(req: Request, res: Response) { await getScrapeQueue().addBulk(jobs); } else { await lockURL(id, sc, url); + + // Not needed, first one should be 15. + // const jobPriority = await getJobPriority({plan, team_id, basePriority: 10}) + const job = await addScrapeJob( { url, diff --git a/apps/api/src/controllers/crawlPreview.ts b/apps/api/src/controllers/crawlPreview.ts index cc10dc8e..20eae731 100644 --- a/apps/api/src/controllers/crawlPreview.ts +++ b/apps/api/src/controllers/crawlPreview.ts @@ -10,7 +10,7 @@ import { checkAndUpdateURL } from "../../src/lib/validateUrl"; export async function crawlPreviewController(req: Request, res: Response) { try { - const { success, error, status } = await authenticateUser( + const { success, error, status, team_id:a, plan } = await authenticateUser( req, res, RateLimiterMode.Preview @@ -88,6 +88,7 @@ export async function crawlPreviewController(req: Request, res: Response) { crawlerOptions, pageOptions, team_id, + plan, robots, createdAt: Date.now(), }; diff --git a/apps/api/src/controllers/scrape.ts b/apps/api/src/controllers/scrape.ts index 436ad4fd..bb991060 100644 --- a/apps/api/src/controllers/scrape.ts +++ b/apps/api/src/controllers/scrape.ts @@ -38,7 +38,7 @@ export async function scrapeHelper( return { success: false, error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.", returnCode: 403 }; } - const jobPriority = await getJobPriority({plan, team_id}) + const jobPriority = await getJobPriority({plan, team_id, basePriority: 10}) const job = await addScrapeJob({ url, diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index ca16e4b1..df9dcf0b 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -213,7 +213,7 @@ async function sendScrapeRequests() { await Promise.all(requests); } -sendScrapeRequests(); +// sendScrapeRequests(); // const sq = getScrapeQueue(); // sq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); diff --git a/apps/api/src/lib/crawl-redis.ts b/apps/api/src/lib/crawl-redis.ts index 9e8a0cf6..b8c1c151 100644 --- a/apps/api/src/lib/crawl-redis.ts +++ b/apps/api/src/lib/crawl-redis.ts @@ -6,6 +6,7 @@ export type StoredCrawl = { crawlerOptions: any; pageOptions: any; team_id: string; + plan: string; robots?: string; cancelled?: boolean; createdAt: number; diff --git a/apps/api/src/lib/job-priority.ts b/apps/api/src/lib/job-priority.ts index b418feba..fdea28b9 100644 --- a/apps/api/src/lib/job-priority.ts +++ b/apps/api/src/lib/job-priority.ts @@ -30,9 +30,11 @@ export async function deleteJobPriority(team_id, job_id) { export async function getJobPriority({ plan, team_id, + basePriority = 10 }: { plan: PlanType; team_id: string; + basePriority: number; }): Promise { const setKey = SET_KEY_PREFIX + team_id; @@ -40,11 +42,18 @@ export async function getJobPriority({ const setLength = await redisConnection.scard(setKey); // Determine the priority based on the plan and set length - let basePriority = 10; let planModifier = 1; let bucketLimit = 0; switch (plan) { + case "free": + bucketLimit = 25; + planModifier = 1; + break; + case "hobby": + bucketLimit = 50; + planModifier = 0.5; + break; case "standard": case "standardnew": bucketLimit = 100; @@ -55,11 +64,8 @@ export async function getJobPriority({ bucketLimit = 200; planModifier = 0.2; break; - case "hobby": - bucketLimit = 50; - planModifier = 0.5; - break; - case "free": + + default: bucketLimit = 25; planModifier = 1; diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index be78e558..080d52dc 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -10,7 +10,7 @@ export async function addScrapeJob( jobPriority: number = 10 ): Promise { return await getScrapeQueue().add(jobId, webScraperOptions, { - priority: webScraperOptions.crawl_id ? 20 : jobPriority, + priority: jobPriority, ...options, jobId, }); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index f4c720b4..955b7fbb 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -21,7 +21,8 @@ import { addCrawlJob, addCrawlJobDone, crawlToCrawler, finishCrawl, getCrawl, ge import { StoredCrawl } from "../lib/crawl-redis"; import { addScrapeJob } from "./queue-jobs"; import { supabaseGetJobById } from "../../src/lib/supabase-jobs"; -import { addJobPriority, deleteJobPriority } from "../../src/lib/job-priority"; +import { addJobPriority, deleteJobPriority, getJobPriority } from "../../src/lib/job-priority"; +import { PlanType } from "../types"; if (process.env.ENV === "production") { initSDK({ @@ -216,6 +217,15 @@ async function processJob(job: Job, token: string) { for (const link of links) { if (await lockURL(job.data.crawl_id, sc, link)) { + + const jobPriority = await getJobPriority({plan:sc.plan as PlanType, team_id: sc.team_id, basePriority: job.data.crawl_id ? 20 : 10}) + const jobId = uuidv4(); + + console.log("plan: ", sc.plan); + console.log("team_id: ", sc.team_id) + console.log("base priority: ", job.data.crawl_id ? 20 : 10) + console.log("job priority: " , jobPriority, "\n\n\n") + const newJob = await addScrapeJob({ url: link, mode: "single_urls", @@ -224,7 +234,7 @@ async function processJob(job: Job, token: string) { pageOptions: sc.pageOptions, origin: job.data.origin, crawl_id: job.data.crawl_id, - }); + }, {}, jobId, jobPriority); await addCrawlJob(job.data.crawl_id, newJob.id); } From 477c3257dcb17a1284bf880938ed7019de3ccd18 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 21 Aug 2024 22:53:33 -0300 Subject: [PATCH 03/11] Nick: --- apps/api/src/controllers/crawl.ts | 11 ++++++++++- apps/api/src/controllers/search.ts | 11 ++++++++--- apps/api/src/services/queue-jobs.ts | 2 ++ apps/api/src/services/queue-worker.ts | 9 +++++---- 4 files changed, 25 insertions(+), 8 deletions(-) diff --git a/apps/api/src/controllers/crawl.ts b/apps/api/src/controllers/crawl.ts index 4f3442d4..cfac8f56 100644 --- a/apps/api/src/controllers/crawl.ts +++ b/apps/api/src/controllers/crawl.ts @@ -144,6 +144,15 @@ export async function crawlController(req: Request, res: Response) { : await crawler.tryGetSitemap(); if (sitemap !== null) { + + let jobPriority = 20; + // If it is over 1000, we need to get the job priority, + // otherwise we can use the default priority of 20 + if(sitemap.length > 1000){ + // set base to 21 + jobPriority = await getJobPriority({plan, team_id, basePriority: 21}) + } + const jobs = sitemap.map((x) => { const url = x.url; const uuid = uuidv4(); @@ -161,7 +170,7 @@ export async function crawlController(req: Request, res: Response) { }, opts: { jobId: uuid, - priority: 20, + priority: jobPriority, }, }; }); diff --git a/apps/api/src/controllers/search.ts b/apps/api/src/controllers/search.ts index 759c7805..570f755f 100644 --- a/apps/api/src/controllers/search.ts +++ b/apps/api/src/controllers/search.ts @@ -2,7 +2,7 @@ import { Request, Response } from "express"; import { WebScraperDataProvider } from "../scraper/WebScraper"; import { billTeam, checkTeamCredits } from "../services/billing/credit_billing"; import { authenticateUser } from "./auth"; -import { RateLimiterMode } from "../types"; +import { PlanType, RateLimiterMode } from "../types"; import { logJob } from "../services/logging/log_job"; import { PageOptions, SearchOptions } from "../lib/entities"; import { search } from "../search"; @@ -10,6 +10,7 @@ import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; import { v4 as uuidv4 } from "uuid"; import { Logger } from "../lib/logger"; import { getScrapeQueue, scrapeQueueEvents } from "../services/queue-service"; +import { getJobPriority } from "../lib/job-priority"; export async function searchHelper( jobId: string, @@ -18,6 +19,7 @@ export async function searchHelper( crawlerOptions: any, pageOptions: PageOptions, searchOptions: SearchOptions, + plan: PlanType ): Promise<{ success: boolean; error?: string; @@ -74,6 +76,8 @@ export async function searchHelper( return { success: true, error: "No search results found", returnCode: 200 }; } + const jobPriority = await getJobPriority({plan, team_id, basePriority: 20}); + // filter out social media links const jobDatas = res.map(x => { @@ -90,7 +94,7 @@ export async function searchHelper( }, opts: { jobId: uuid, - priority: 20, + priority: jobPriority, } }; }) @@ -124,7 +128,7 @@ export async function searchHelper( export async function searchController(req: Request, res: Response) { try { // make sure to authenticate user first, Bearer - const { success, team_id, error, status } = await authenticateUser( + const { success, team_id, error, status, plan } = await authenticateUser( req, res, RateLimiterMode.Search @@ -165,6 +169,7 @@ export async function searchController(req: Request, res: Response) { crawlerOptions, pageOptions, searchOptions, + plan ); const endTime = new Date().getTime(); const timeTakenInSeconds = (endTime - startTime) / 1000; diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 080d52dc..eb7e4727 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -16,3 +16,5 @@ export async function addScrapeJob( }); } + + diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 955b7fbb..1a61e02c 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -218,13 +218,14 @@ async function processJob(job: Job, token: string) { for (const link of links) { if (await lockURL(job.data.crawl_id, sc, link)) { + // This seems to work really welel const jobPriority = await getJobPriority({plan:sc.plan as PlanType, team_id: sc.team_id, basePriority: job.data.crawl_id ? 20 : 10}) const jobId = uuidv4(); - console.log("plan: ", sc.plan); - console.log("team_id: ", sc.team_id) - console.log("base priority: ", job.data.crawl_id ? 20 : 10) - console.log("job priority: " , jobPriority, "\n\n\n") + // console.log("plan: ", sc.plan); + // console.log("team_id: ", sc.team_id) + // console.log("base priority: ", job.data.crawl_id ? 20 : 10) + // console.log("job priority: " , jobPriority, "\n\n\n") const newJob = await addScrapeJob({ url: link, From 53ca70462019bd1a131036bdaf6c2e930cabe2d3 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 21 Aug 2024 22:55:39 -0300 Subject: [PATCH 04/11] Update index.ts --- apps/api/src/index.ts | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index df9dcf0b..2b68e0f1 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -189,31 +189,6 @@ if (cluster.isMaster) { -async function sendScrapeRequests() { - await new Promise(resolve => setTimeout(resolve, 5000)); - const url = 'http://127.0.0.1:3002/v0/scrape'; - const headers = { - 'Authorization': 'Bearer fc-365b09a44b8844d08e0dc98f13e49bca', - 'Content-Type': 'application/json' - }; - const body = JSON.stringify({ - url: 'https://roastmywebsite.ai' - }); - - const requests = Array.from({ length: 20 }, (_, i) => - fetch(url, { - method: 'POST', - headers: headers, - body: body - }).catch(error => { - Logger.error(`Request ${i + 1} encountered an error: ${error.message}`); - }) - ); - - await Promise.all(requests); -} - -// sendScrapeRequests(); // const sq = getScrapeQueue(); // sq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); From 06b70a47e002f6ae4bf044782d09ba25eb8c2c9b Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 27 Aug 2024 15:04:10 -0300 Subject: [PATCH 05/11] Update job-priority.ts --- apps/api/src/lib/job-priority.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/api/src/lib/job-priority.ts b/apps/api/src/lib/job-priority.ts index fdea28b9..00bf9fa4 100644 --- a/apps/api/src/lib/job-priority.ts +++ b/apps/api/src/lib/job-priority.ts @@ -11,6 +11,8 @@ export async function addJobPriority(team_id, job_id) { // Add scrape job id to the set await redisConnection.sadd(setKey, job_id); + await redisConnection.expire(setKey, 60); + } catch (e) { Logger.error(`Add job priority (sadd) failed: ${team_id}, ${job_id}`); } From 8b53285a939e891d534e0a6fa7959350460e4f82 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 27 Aug 2024 15:06:13 -0300 Subject: [PATCH 06/11] Update job-priority.ts --- apps/api/src/lib/job-priority.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/api/src/lib/job-priority.ts b/apps/api/src/lib/job-priority.ts index 00bf9fa4..d108938e 100644 --- a/apps/api/src/lib/job-priority.ts +++ b/apps/api/src/lib/job-priority.ts @@ -11,6 +11,8 @@ export async function addJobPriority(team_id, job_id) { // Add scrape job id to the set await redisConnection.sadd(setKey, job_id); + + // This approach will reset the expiration time to 60 seconds every time a new job is added to the set. await redisConnection.expire(setKey, 60); } catch (e) { From c009013ff669bd2e6d574fb05a2c36685a9f4630 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 27 Aug 2024 15:26:43 -0300 Subject: [PATCH 07/11] Nick: expire tests --- .../src/lib/__tests__/job-priority.test.ts | 86 ++++++++++++++++--- apps/api/src/lib/job-priority.ts | 18 ++-- 2 files changed, 83 insertions(+), 21 deletions(-) diff --git a/apps/api/src/lib/__tests__/job-priority.test.ts b/apps/api/src/lib/__tests__/job-priority.test.ts index e6f80f05..5a30f92e 100644 --- a/apps/api/src/lib/__tests__/job-priority.test.ts +++ b/apps/api/src/lib/__tests__/job-priority.test.ts @@ -1,4 +1,8 @@ -import { getJobPriority, addJobPriority, deleteJobPriority } from "../job-priority"; +import { + getJobPriority, + addJobPriority, + deleteJobPriority, +} from "../job-priority"; import { redisConnection } from "../../services/queue-service"; import { PlanType } from "../../types"; @@ -7,6 +11,7 @@ jest.mock("../../services/queue-service", () => ({ sadd: jest.fn(), srem: jest.fn(), scard: jest.fn(), + expire: jest.fn(), }, })); @@ -15,18 +20,28 @@ describe("Job Priority Tests", () => { jest.clearAllMocks(); }); - test("addJobPriority should add job_id to the set", async () => { + test("addJobPriority should add job_id to the set and set expiration", async () => { const team_id = "team1"; const job_id = "job1"; await addJobPriority(team_id, job_id); - expect(redisConnection.sadd).toHaveBeenCalledWith(`limit_team_id:${team_id}`, job_id); + expect(redisConnection.sadd).toHaveBeenCalledWith( + `limit_team_id:${team_id}`, + job_id + ); + expect(redisConnection.expire).toHaveBeenCalledWith( + `limit_team_id:${team_id}`, + 60 + ); }); - test("deleteFromSet should remove job_id from the set", async () => { + test("deleteJobPriority should remove job_id from the set", async () => { const team_id = "team1"; const job_id = "job1"; await deleteJobPriority(team_id, job_id); - expect(redisConnection.srem).toHaveBeenCalledWith(`limit_team_id:${team_id}`, job_id); + expect(redisConnection.srem).toHaveBeenCalledWith( + `limit_team_id:${team_id}`, + job_id + ); }); test("getJobPriority should return correct priority based on plan and set length", async () => { @@ -35,11 +50,11 @@ describe("Job Priority Tests", () => { (redisConnection.scard as jest.Mock).mockResolvedValue(150); const priority = await getJobPriority({ plan, team_id }); - expect(priority).toBe(10); + expect(priority).toBe(20); (redisConnection.scard as jest.Mock).mockResolvedValue(250); const priorityExceeded = await getJobPriority({ plan, team_id }); - expect(priorityExceeded).toBe(20); // basePriority + Math.ceil((250 - 200) * 0.2) + expect(priorityExceeded).toBe(40); // basePriority + Math.ceil((250 - 200) * 0.2) }); test("getJobPriority should handle different plans correctly", async () => { @@ -53,9 +68,9 @@ describe("Job Priority Tests", () => { (redisConnection.scard as jest.Mock).mockResolvedValue(150); plan = "hobby"; priority = await getJobPriority({ plan, team_id }); - expect(priority).toBe(35); // basePriority + Math.ceil((150 - 100) * 0.5) + expect(priority).toBe(40); // basePriority + Math.ceil((150 - 50) * 0.3) - (redisConnection.scard as jest.Mock).mockResolvedValue(50); + (redisConnection.scard as jest.Mock).mockResolvedValue(25); plan = "free"; priority = await getJobPriority({ plan, team_id }); expect(priority).toBe(10); @@ -63,6 +78,57 @@ describe("Job Priority Tests", () => { (redisConnection.scard as jest.Mock).mockResolvedValue(60); plan = "free"; priority = await getJobPriority({ plan, team_id }); - expect(priority).toBe(20); // basePriority + Math.ceil((60 - 50) * 1) + expect(priority).toBe(28); // basePriority + Math.ceil((60 - 25) * 0.5) + }); + + test("addJobPriority should reset expiration time when adding new job", async () => { + const team_id = "team1"; + const job_id1 = "job1"; + const job_id2 = "job2"; + + await addJobPriority(team_id, job_id1); + expect(redisConnection.expire).toHaveBeenCalledWith( + `limit_team_id:${team_id}`, + 60 + ); + + // Clear the mock calls + (redisConnection.expire as jest.Mock).mockClear(); + + // Add another job + await addJobPriority(team_id, job_id2); + expect(redisConnection.expire).toHaveBeenCalledWith( + `limit_team_id:${team_id}`, + 60 + ); + }); + + test("Set should expire after 60 seconds", async () => { + const team_id = "team1"; + const job_id = "job1"; + + jest.useFakeTimers(); + + await addJobPriority(team_id, job_id); + expect(redisConnection.expire).toHaveBeenCalledWith( + `limit_team_id:${team_id}`, + 60 + ); + + // Fast-forward time by 59 seconds + jest.advanceTimersByTime(59000); + + // The set should still exist + expect(redisConnection.scard).not.toHaveBeenCalled(); + + // Fast-forward time by 2 more seconds (total 61 seconds) + jest.advanceTimersByTime(2000); + + // Check if the set has been removed (scard should return 0) + (redisConnection.scard as jest.Mock).mockResolvedValue(0); + const setSize = await redisConnection.scard(`limit_team_id:${team_id}`); + expect(setSize).toBe(0); + + jest.useRealTimers(); }); }); diff --git a/apps/api/src/lib/job-priority.ts b/apps/api/src/lib/job-priority.ts index d108938e..361c608d 100644 --- a/apps/api/src/lib/job-priority.ts +++ b/apps/api/src/lib/job-priority.ts @@ -2,19 +2,16 @@ import { redisConnection } from "../../src/services/queue-service"; import { PlanType } from "../../src/types"; import { Logger } from "./logger"; - - -const SET_KEY_PREFIX = "limit_team_id:" +const SET_KEY_PREFIX = "limit_team_id:"; export async function addJobPriority(team_id, job_id) { try { const setKey = SET_KEY_PREFIX + team_id; // Add scrape job id to the set await redisConnection.sadd(setKey, job_id); - + // This approach will reset the expiration time to 60 seconds every time a new job is added to the set. await redisConnection.expire(setKey, 60); - } catch (e) { Logger.error(`Add job priority (sadd) failed: ${team_id}, ${job_id}`); } @@ -34,11 +31,11 @@ export async function deleteJobPriority(team_id, job_id) { export async function getJobPriority({ plan, team_id, - basePriority = 10 + basePriority = 10, }: { plan: PlanType; team_id: string; - basePriority: number; + basePriority?: number; }): Promise { const setKey = SET_KEY_PREFIX + team_id; @@ -52,11 +49,11 @@ export async function getJobPriority({ switch (plan) { case "free": bucketLimit = 25; - planModifier = 1; + planModifier = 0.5; break; case "hobby": bucketLimit = 50; - planModifier = 0.5; + planModifier = 0.3; break; case "standard": case "standardnew": @@ -68,8 +65,7 @@ export async function getJobPriority({ bucketLimit = 200; planModifier = 0.2; break; - - + default: bucketLimit = 25; planModifier = 1; From a0f9a81ee6f5558e4fed865ef17c77504eec3231 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 27 Aug 2024 15:36:46 -0300 Subject: [PATCH 08/11] Nick: --- apps/api/src/services/rate-limiter.test.ts | 18 ++++++++++++------ apps/api/src/services/rate-limiter.ts | 2 +- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/apps/api/src/services/rate-limiter.test.ts b/apps/api/src/services/rate-limiter.test.ts index c49c85d9..e1c81e08 100644 --- a/apps/api/src/services/rate-limiter.test.ts +++ b/apps/api/src/services/rate-limiter.test.ts @@ -65,7 +65,7 @@ describe("Rate Limiter Service", () => { "test-prefix:someToken", "standard" ); - expect(limiter2.points).toBe(50); + expect(limiter2.points).toBe(100); const limiter3 = getRateLimiter( "search" as RateLimiterMode, @@ -188,14 +188,13 @@ describe("Rate Limiter Service", () => { "test-prefix:someTokenXY", "hobby" ); - // expect hobby to have 100 points - expect(limiter.points).toBe(10); + expect(limiter.points).toBe(20); const consumePoints = 5; const res = await limiter.consume("test-prefix:someTokenXY", consumePoints); expect(res.consumedPoints).toBe(5); - expect(res.remainingPoints).toBe(5); + expect(res.remainingPoints).toBe(15); }); it("should return the correct rate limiter for 'crawl' mode", () => { @@ -227,7 +226,7 @@ describe("Rate Limiter Service", () => { "test-prefix:someToken", "free" ); - expect(limiter.points).toBe(5); + expect(limiter.points).toBe(10); const limiter2 = getRateLimiter( "scrape" as RateLimiterMode, @@ -241,7 +240,14 @@ describe("Rate Limiter Service", () => { "test-prefix:someToken", "standard" ); - expect(limiter3.points).toBe(50); + expect(limiter3.points).toBe(100); + + const limiter4 = getRateLimiter( + "scrape" as RateLimiterMode, + "test-prefix:someToken", + "growth" + ); + expect(limiter4.points).toBe(1000); }); it("should return the correct rate limiter for 'search' mode", () => { diff --git a/apps/api/src/services/rate-limiter.ts b/apps/api/src/services/rate-limiter.ts index 8e4e9fc9..8852f71c 100644 --- a/apps/api/src/services/rate-limiter.ts +++ b/apps/api/src/services/rate-limiter.ts @@ -115,7 +115,7 @@ export function getRateLimiter( return testSuiteRateLimiter; } - if(teamId === process.env.DEV_B_TEAM_ID) { + if (teamId && teamId === process.env.DEV_B_TEAM_ID) { return devBRateLimiter; } From f0dfcd6a4949fbd033e07362f34f0a176a7e3633 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 27 Aug 2024 16:58:28 -0300 Subject: [PATCH 09/11] Update job-priority.ts --- apps/api/src/lib/job-priority.ts | 81 +++++++++++++++++--------------- 1 file changed, 44 insertions(+), 37 deletions(-) diff --git a/apps/api/src/lib/job-priority.ts b/apps/api/src/lib/job-priority.ts index 361c608d..c4744d7b 100644 --- a/apps/api/src/lib/job-priority.ts +++ b/apps/api/src/lib/job-priority.ts @@ -37,48 +37,55 @@ export async function getJobPriority({ team_id: string; basePriority?: number; }): Promise { - const setKey = SET_KEY_PREFIX + team_id; + try { + const setKey = SET_KEY_PREFIX + team_id; - // Get the length of the set - const setLength = await redisConnection.scard(setKey); + // Get the length of the set + const setLength = await redisConnection.scard(setKey); - // Determine the priority based on the plan and set length - let planModifier = 1; - let bucketLimit = 0; + // Determine the priority based on the plan and set length + let planModifier = 1; + let bucketLimit = 0; - switch (plan) { - case "free": - bucketLimit = 25; - planModifier = 0.5; - break; - case "hobby": - bucketLimit = 50; - planModifier = 0.3; - break; - case "standard": - case "standardnew": - bucketLimit = 100; - planModifier = 0.2; - break; - case "growth": - case "growthdouble": - bucketLimit = 200; - planModifier = 0.2; - break; + switch (plan) { + case "free": + bucketLimit = 25; + planModifier = 0.5; + break; + case "hobby": + bucketLimit = 50; + planModifier = 0.3; + break; + case "standard": + case "standardnew": + bucketLimit = 100; + planModifier = 0.2; + break; + case "growth": + case "growthdouble": + bucketLimit = 200; + planModifier = 0.2; + break; - default: - bucketLimit = 25; - planModifier = 1; - break; - } + default: + bucketLimit = 25; + planModifier = 1; + break; + } - // if length set is smaller than set, just return base priority - if (setLength <= bucketLimit) { - return basePriority; - } else { - // If not, we keep base priority + planModifier - return Math.ceil( - basePriority + Math.ceil((setLength - bucketLimit) * planModifier) + // if length set is smaller than set, just return base priority + if (setLength <= bucketLimit) { + return basePriority; + } else { + // If not, we keep base priority + planModifier + return Math.ceil( + basePriority + Math.ceil((setLength - bucketLimit) * planModifier) + ); + } + } catch (e) { + Logger.error( + `Get job priority failed: ${team_id}, ${plan}, ${basePriority}` ); + return basePriority; } } From ca34f1203b41062c574266fb2f0400c372e1d616 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 27 Aug 2024 17:03:46 -0300 Subject: [PATCH 10/11] Nick: bucket limit increase --- apps/api/src/lib/__tests__/job-priority.test.ts | 6 +++--- apps/api/src/lib/job-priority.ts | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/api/src/lib/__tests__/job-priority.test.ts b/apps/api/src/lib/__tests__/job-priority.test.ts index 5a30f92e..82477379 100644 --- a/apps/api/src/lib/__tests__/job-priority.test.ts +++ b/apps/api/src/lib/__tests__/job-priority.test.ts @@ -50,11 +50,11 @@ describe("Job Priority Tests", () => { (redisConnection.scard as jest.Mock).mockResolvedValue(150); const priority = await getJobPriority({ plan, team_id }); - expect(priority).toBe(20); + expect(priority).toBe(10); (redisConnection.scard as jest.Mock).mockResolvedValue(250); const priorityExceeded = await getJobPriority({ plan, team_id }); - expect(priorityExceeded).toBe(40); // basePriority + Math.ceil((250 - 200) * 0.2) + expect(priorityExceeded).toBe(20); // basePriority + Math.ceil((250 - 200) * 0.4) }); test("getJobPriority should handle different plans correctly", async () => { @@ -68,7 +68,7 @@ describe("Job Priority Tests", () => { (redisConnection.scard as jest.Mock).mockResolvedValue(150); plan = "hobby"; priority = await getJobPriority({ plan, team_id }); - expect(priority).toBe(40); // basePriority + Math.ceil((150 - 50) * 0.3) + expect(priority).toBe(25); // basePriority + Math.ceil((150 - 50) * 0.3) (redisConnection.scard as jest.Mock).mockResolvedValue(25); plan = "free"; diff --git a/apps/api/src/lib/job-priority.ts b/apps/api/src/lib/job-priority.ts index c4744d7b..1d311cfb 100644 --- a/apps/api/src/lib/job-priority.ts +++ b/apps/api/src/lib/job-priority.ts @@ -53,17 +53,17 @@ export async function getJobPriority({ planModifier = 0.5; break; case "hobby": - bucketLimit = 50; + bucketLimit = 100; planModifier = 0.3; break; case "standard": case "standardnew": - bucketLimit = 100; + bucketLimit = 200; planModifier = 0.2; break; case "growth": case "growthdouble": - bucketLimit = 200; + bucketLimit = 400; planModifier = 0.2; break; From e7d283c4efd719c44816ebe338d6a4408ed13876 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 27 Aug 2024 17:04:04 -0300 Subject: [PATCH 11/11] Update job-priority.ts --- apps/api/src/lib/job-priority.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/api/src/lib/job-priority.ts b/apps/api/src/lib/job-priority.ts index 1d311cfb..bb6158f9 100644 --- a/apps/api/src/lib/job-priority.ts +++ b/apps/api/src/lib/job-priority.ts @@ -64,7 +64,7 @@ export async function getJobPriority({ case "growth": case "growthdouble": bucketLimit = 400; - planModifier = 0.2; + planModifier = 0.1; break; default: