From 0ea0a5db46c027c699b9a311fd8d985dc7b3a8fe Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 21 Aug 2024 20:54:39 -0300 Subject: [PATCH] Nick: wip --- apps/api/src/controllers/auth.ts | 7 +- apps/api/src/controllers/scrape.ts | 9 ++- apps/api/src/index.ts | 27 +++++++ .../src/lib/__tests__/job-priority.test.ts | 68 ++++++++++++++++ apps/api/src/lib/job-priority.ts | 78 +++++++++++++++++++ apps/api/src/services/queue-jobs.ts | 3 +- apps/api/src/services/queue-worker.ts | 4 +- apps/api/src/types.ts | 15 +++- 8 files changed, 201 insertions(+), 10 deletions(-) create mode 100644 apps/api/src/lib/__tests__/job-priority.test.ts create mode 100644 apps/api/src/lib/job-priority.ts diff --git a/apps/api/src/controllers/auth.ts b/apps/api/src/controllers/auth.ts index 9d46d005..b2741296 100644 --- a/apps/api/src/controllers/auth.ts +++ b/apps/api/src/controllers/auth.ts @@ -3,6 +3,7 @@ import { getRateLimiter } from "../../src/services/rate-limiter"; import { AuthResponse, NotificationType, + PlanType, RateLimiterMode, } from "../../src/types"; import { supabase_service } from "../../src/services/supabase"; @@ -82,7 +83,7 @@ export async function supaAuthenticateUser( team_id?: string; error?: string; status?: number; - plan?: string; + plan?: PlanType; }> { const authHeader = req.headers.authorization; if (!authHeader) { @@ -316,10 +317,10 @@ export async function supaAuthenticateUser( return { success: true, team_id: subscriptionData.team_id, - plan: subscriptionData.plan ?? "", + plan: (subscriptionData.plan ?? "") as PlanType, }; } -function getPlanByPriceId(price_id: string) { +function getPlanByPriceId(price_id: string): PlanType { switch (price_id) { case process.env.STRIPE_PRICE_ID_STARTER: return "starter"; diff --git a/apps/api/src/controllers/scrape.ts b/apps/api/src/controllers/scrape.ts index 273b4c56..436ad4fd 100644 --- a/apps/api/src/controllers/scrape.ts +++ b/apps/api/src/controllers/scrape.ts @@ -2,7 +2,7 @@ import { ExtractorOptions, PageOptions } from './../lib/entities'; import { Request, Response } from "express"; import { billTeam, checkTeamCredits } from "../services/billing/credit_billing"; import { authenticateUser } from "./auth"; -import { RateLimiterMode } from "../types"; +import { PlanType, RateLimiterMode } from "../types"; import { logJob } from "../services/logging/log_job"; import { Document } from "../lib/entities"; import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function @@ -12,6 +12,7 @@ import { addScrapeJob } from '../services/queue-jobs'; import { scrapeQueueEvents } from '../services/queue-service'; import { v4 as uuidv4 } from "uuid"; import { Logger } from '../lib/logger'; +import { getJobPriority } from '../lib/job-priority'; export async function scrapeHelper( jobId: string, @@ -21,7 +22,7 @@ export async function scrapeHelper( pageOptions: PageOptions, extractorOptions: ExtractorOptions, timeout: number, - plan?: string + plan?: PlanType ): Promise<{ success: boolean; error?: string; @@ -37,6 +38,8 @@ export async function scrapeHelper( return { success: false, error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.", returnCode: 403 }; } + const jobPriority = await getJobPriority({plan, team_id}) + const job = await addScrapeJob({ url, mode: "single_urls", @@ -45,7 +48,7 @@ export async function scrapeHelper( pageOptions, extractorOptions, origin: req.body.origin ?? defaultOrigin, - }, {}, jobId); + }, {}, jobId, jobPriority); let doc; try { diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 6a6437b3..ca16e4b1 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -187,6 +187,33 @@ if (cluster.isMaster) { Logger.info(`Worker ${process.pid} started`); } + + +async function sendScrapeRequests() { + await new Promise(resolve => setTimeout(resolve, 5000)); + const url = 'http://127.0.0.1:3002/v0/scrape'; + const headers = { + 'Authorization': 'Bearer fc-365b09a44b8844d08e0dc98f13e49bca', + 'Content-Type': 'application/json' + }; + const body = JSON.stringify({ + url: 'https://roastmywebsite.ai' + }); + + const requests = Array.from({ length: 20 }, (_, i) => + fetch(url, { + method: 'POST', + headers: headers, + body: body + }).catch(error => { + Logger.error(`Request ${i + 1} encountered an error: ${error.message}`); + }) + ); + + await Promise.all(requests); +} + +sendScrapeRequests(); // const sq = getScrapeQueue(); // sq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); diff --git a/apps/api/src/lib/__tests__/job-priority.test.ts b/apps/api/src/lib/__tests__/job-priority.test.ts new file mode 100644 index 00000000..e6f80f05 --- /dev/null +++ b/apps/api/src/lib/__tests__/job-priority.test.ts @@ -0,0 +1,68 @@ +import { getJobPriority, addJobPriority, deleteJobPriority } from "../job-priority"; +import { redisConnection } from "../../services/queue-service"; +import { PlanType } from "../../types"; + +jest.mock("../../services/queue-service", () => ({ + redisConnection: { + sadd: jest.fn(), + srem: jest.fn(), + scard: jest.fn(), + }, +})); + +describe("Job Priority Tests", () => { + afterEach(() => { + jest.clearAllMocks(); + }); + + test("addJobPriority should add job_id to the set", async () => { + const team_id = "team1"; + const job_id = "job1"; + await addJobPriority(team_id, job_id); + expect(redisConnection.sadd).toHaveBeenCalledWith(`limit_team_id:${team_id}`, job_id); + }); + + test("deleteFromSet should remove job_id from the set", async () => { + const team_id = "team1"; + const job_id = "job1"; + await deleteJobPriority(team_id, job_id); + expect(redisConnection.srem).toHaveBeenCalledWith(`limit_team_id:${team_id}`, job_id); + }); + + test("getJobPriority should return correct priority based on plan and set length", async () => { + const team_id = "team1"; + const plan: PlanType = "standard"; + (redisConnection.scard as jest.Mock).mockResolvedValue(150); + + const priority = await getJobPriority({ plan, team_id }); + expect(priority).toBe(10); + + (redisConnection.scard as jest.Mock).mockResolvedValue(250); + const priorityExceeded = await getJobPriority({ plan, team_id }); + expect(priorityExceeded).toBe(20); // basePriority + Math.ceil((250 - 200) * 0.2) + }); + + test("getJobPriority should handle different plans correctly", async () => { + const team_id = "team1"; + + (redisConnection.scard as jest.Mock).mockResolvedValue(50); + let plan: PlanType = "hobby"; + let priority = await getJobPriority({ plan, team_id }); + expect(priority).toBe(10); + + (redisConnection.scard as jest.Mock).mockResolvedValue(150); + plan = "hobby"; + priority = await getJobPriority({ plan, team_id }); + expect(priority).toBe(35); // basePriority + Math.ceil((150 - 100) * 0.5) + + (redisConnection.scard as jest.Mock).mockResolvedValue(50); + plan = "free"; + priority = await getJobPriority({ plan, team_id }); + expect(priority).toBe(10); + + (redisConnection.scard as jest.Mock).mockResolvedValue(60); + plan = "free"; + priority = await getJobPriority({ plan, team_id }); + expect(priority).toBe(20); // basePriority + Math.ceil((60 - 50) * 1) + }); +}); diff --git a/apps/api/src/lib/job-priority.ts b/apps/api/src/lib/job-priority.ts new file mode 100644 index 00000000..b418feba --- /dev/null +++ b/apps/api/src/lib/job-priority.ts @@ -0,0 +1,78 @@ +import { redisConnection } from "../../src/services/queue-service"; +import { PlanType } from "../../src/types"; +import { Logger } from "./logger"; + + + +const SET_KEY_PREFIX = "limit_team_id:" +export async function addJobPriority(team_id, job_id) { + try { + const setKey = SET_KEY_PREFIX + team_id; + + // Add scrape job id to the set + await redisConnection.sadd(setKey, job_id); + } catch (e) { + Logger.error(`Add job priority (sadd) failed: ${team_id}, ${job_id}`); + } +} + +export async function deleteJobPriority(team_id, job_id) { + try { + const setKey = SET_KEY_PREFIX + team_id; + + // remove job_id from the set + await redisConnection.srem(setKey, job_id); + } catch (e) { + Logger.error(`Delete job priority (srem) failed: ${team_id}, ${job_id}`); + } +} + +export async function getJobPriority({ + plan, + team_id, +}: { + plan: PlanType; + team_id: string; +}): Promise { + const setKey = SET_KEY_PREFIX + team_id; + + // Get the length of the set + const setLength = await redisConnection.scard(setKey); + + // Determine the priority based on the plan and set length + let basePriority = 10; + let planModifier = 1; + let bucketLimit = 0; + + switch (plan) { + case "standard": + case "standardnew": + bucketLimit = 100; + planModifier = 0.2; + break; + case "growth": + case "growthdouble": + bucketLimit = 200; + planModifier = 0.2; + break; + case "hobby": + bucketLimit = 50; + planModifier = 0.5; + break; + case "free": + default: + bucketLimit = 25; + planModifier = 1; + break; + } + + // if length set is smaller than set, just return base priority + if (setLength <= bucketLimit) { + return basePriority; + } else { + // If not, we keep base priority + planModifier + return Math.ceil( + basePriority + Math.ceil((setLength - bucketLimit) * planModifier) + ); + } +} diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 3099da68..be78e558 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -7,9 +7,10 @@ export async function addScrapeJob( webScraperOptions: WebScraperOptions, options: any = {}, jobId: string = uuidv4(), + jobPriority: number = 10 ): Promise { return await getScrapeQueue().add(jobId, webScraperOptions, { - priority: webScraperOptions.crawl_id ? 20 : 10, + priority: webScraperOptions.crawl_id ? 20 : jobPriority, ...options, jobId, }); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 2086d0a6..f4c720b4 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -21,6 +21,7 @@ import { addCrawlJob, addCrawlJobDone, crawlToCrawler, finishCrawl, getCrawl, ge import { StoredCrawl } from "../lib/crawl-redis"; import { addScrapeJob } from "./queue-jobs"; import { supabaseGetJobById } from "../../src/lib/supabase-jobs"; +import { addJobPriority, deleteJobPriority } from "../../src/lib/job-priority"; if (process.env.ENV === "production") { initSDK({ @@ -50,6 +51,7 @@ const processJobInternal = async (token: string, job: Job) => { await job.extendLock(token, jobLockExtensionTime); }, jobLockExtendInterval); + await addJobPriority(job.data.team_id, job.id ); try { const result = await processJob(job, token); try{ @@ -62,9 +64,9 @@ const processJobInternal = async (token: string, job: Job) => { } } catch (error) { console.log("Job failed, error:", error); - await job.moveToFailed(error, token, false); } finally { + await deleteJobPriority(job.data.team_id, job.id ); clearInterval(extendLockInterval); } }; diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index b092d310..f2932b5a 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -113,7 +113,7 @@ export interface AuthResponse { team_id?: string; error?: string; status?: number; - plan?: string; + plan?: PlanType; } @@ -136,4 +136,15 @@ export type ScrapeLog = { html?: string; ipv4_support?: boolean | null; ipv6_support?: boolean | null; -}; \ No newline at end of file +}; + +export type PlanType = + | "starter" + | "standard" + | "scale" + | "hobby" + | "standardnew" + | "growth" + | "growthdouble" + | "free" + | ""; \ No newline at end of file