Nick: wip

This commit is contained in:
Nicolas 2024-08-21 20:54:39 -03:00
parent 79f5d49d3f
commit 0ea0a5db46
8 changed files with 201 additions and 10 deletions

View File

@ -3,6 +3,7 @@ import { getRateLimiter } from "../../src/services/rate-limiter";
import {
AuthResponse,
NotificationType,
PlanType,
RateLimiterMode,
} from "../../src/types";
import { supabase_service } from "../../src/services/supabase";
@ -82,7 +83,7 @@ export async function supaAuthenticateUser(
team_id?: string;
error?: string;
status?: number;
plan?: string;
plan?: PlanType;
}> {
const authHeader = req.headers.authorization;
if (!authHeader) {
@ -316,10 +317,10 @@ export async function supaAuthenticateUser(
return {
success: true,
team_id: subscriptionData.team_id,
plan: subscriptionData.plan ?? "",
plan: (subscriptionData.plan ?? "") as PlanType,
};
}
function getPlanByPriceId(price_id: string) {
function getPlanByPriceId(price_id: string): PlanType {
switch (price_id) {
case process.env.STRIPE_PRICE_ID_STARTER:
return "starter";

View File

@ -2,7 +2,7 @@ import { ExtractorOptions, PageOptions } from './../lib/entities';
import { Request, Response } from "express";
import { billTeam, checkTeamCredits } from "../services/billing/credit_billing";
import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../types";
import { PlanType, RateLimiterMode } from "../types";
import { logJob } from "../services/logging/log_job";
import { Document } from "../lib/entities";
import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function
@ -12,6 +12,7 @@ import { addScrapeJob } from '../services/queue-jobs';
import { scrapeQueueEvents } from '../services/queue-service';
import { v4 as uuidv4 } from "uuid";
import { Logger } from '../lib/logger';
import { getJobPriority } from '../lib/job-priority';
export async function scrapeHelper(
jobId: string,
@ -21,7 +22,7 @@ export async function scrapeHelper(
pageOptions: PageOptions,
extractorOptions: ExtractorOptions,
timeout: number,
plan?: string
plan?: PlanType
): Promise<{
success: boolean;
error?: string;
@ -37,6 +38,8 @@ export async function scrapeHelper(
return { success: false, error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.", returnCode: 403 };
}
const jobPriority = await getJobPriority({plan, team_id})
const job = await addScrapeJob({
url,
mode: "single_urls",
@ -45,7 +48,7 @@ export async function scrapeHelper(
pageOptions,
extractorOptions,
origin: req.body.origin ?? defaultOrigin,
}, {}, jobId);
}, {}, jobId, jobPriority);
let doc;
try {

View File

@ -187,6 +187,33 @@ if (cluster.isMaster) {
Logger.info(`Worker ${process.pid} started`);
}
async function sendScrapeRequests() {
await new Promise(resolve => setTimeout(resolve, 5000));
const url = 'http://127.0.0.1:3002/v0/scrape';
const headers = {
'Authorization': 'Bearer fc-365b09a44b8844d08e0dc98f13e49bca',
'Content-Type': 'application/json'
};
const body = JSON.stringify({
url: 'https://roastmywebsite.ai'
});
const requests = Array.from({ length: 20 }, (_, i) =>
fetch(url, {
method: 'POST',
headers: headers,
body: body
}).catch(error => {
Logger.error(`Request ${i + 1} encountered an error: ${error.message}`);
})
);
await Promise.all(requests);
}
sendScrapeRequests();
// const sq = getScrapeQueue();
// sq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));

View File

@ -0,0 +1,68 @@
import { getJobPriority, addJobPriority, deleteJobPriority } from "../job-priority";
import { redisConnection } from "../../services/queue-service";
import { PlanType } from "../../types";
jest.mock("../../services/queue-service", () => ({
redisConnection: {
sadd: jest.fn(),
srem: jest.fn(),
scard: jest.fn(),
},
}));
describe("Job Priority Tests", () => {
afterEach(() => {
jest.clearAllMocks();
});
test("addJobPriority should add job_id to the set", async () => {
const team_id = "team1";
const job_id = "job1";
await addJobPriority(team_id, job_id);
expect(redisConnection.sadd).toHaveBeenCalledWith(`limit_team_id:${team_id}`, job_id);
});
test("deleteFromSet should remove job_id from the set", async () => {
const team_id = "team1";
const job_id = "job1";
await deleteJobPriority(team_id, job_id);
expect(redisConnection.srem).toHaveBeenCalledWith(`limit_team_id:${team_id}`, job_id);
});
test("getJobPriority should return correct priority based on plan and set length", async () => {
const team_id = "team1";
const plan: PlanType = "standard";
(redisConnection.scard as jest.Mock).mockResolvedValue(150);
const priority = await getJobPriority({ plan, team_id });
expect(priority).toBe(10);
(redisConnection.scard as jest.Mock).mockResolvedValue(250);
const priorityExceeded = await getJobPriority({ plan, team_id });
expect(priorityExceeded).toBe(20); // basePriority + Math.ceil((250 - 200) * 0.2)
});
test("getJobPriority should handle different plans correctly", async () => {
const team_id = "team1";
(redisConnection.scard as jest.Mock).mockResolvedValue(50);
let plan: PlanType = "hobby";
let priority = await getJobPriority({ plan, team_id });
expect(priority).toBe(10);
(redisConnection.scard as jest.Mock).mockResolvedValue(150);
plan = "hobby";
priority = await getJobPriority({ plan, team_id });
expect(priority).toBe(35); // basePriority + Math.ceil((150 - 100) * 0.5)
(redisConnection.scard as jest.Mock).mockResolvedValue(50);
plan = "free";
priority = await getJobPriority({ plan, team_id });
expect(priority).toBe(10);
(redisConnection.scard as jest.Mock).mockResolvedValue(60);
plan = "free";
priority = await getJobPriority({ plan, team_id });
expect(priority).toBe(20); // basePriority + Math.ceil((60 - 50) * 1)
});
});

View File

@ -0,0 +1,78 @@
import { redisConnection } from "../../src/services/queue-service";
import { PlanType } from "../../src/types";
import { Logger } from "./logger";
const SET_KEY_PREFIX = "limit_team_id:"
export async function addJobPriority(team_id, job_id) {
try {
const setKey = SET_KEY_PREFIX + team_id;
// Add scrape job id to the set
await redisConnection.sadd(setKey, job_id);
} catch (e) {
Logger.error(`Add job priority (sadd) failed: ${team_id}, ${job_id}`);
}
}
export async function deleteJobPriority(team_id, job_id) {
try {
const setKey = SET_KEY_PREFIX + team_id;
// remove job_id from the set
await redisConnection.srem(setKey, job_id);
} catch (e) {
Logger.error(`Delete job priority (srem) failed: ${team_id}, ${job_id}`);
}
}
export async function getJobPriority({
plan,
team_id,
}: {
plan: PlanType;
team_id: string;
}): Promise<number> {
const setKey = SET_KEY_PREFIX + team_id;
// Get the length of the set
const setLength = await redisConnection.scard(setKey);
// Determine the priority based on the plan and set length
let basePriority = 10;
let planModifier = 1;
let bucketLimit = 0;
switch (plan) {
case "standard":
case "standardnew":
bucketLimit = 100;
planModifier = 0.2;
break;
case "growth":
case "growthdouble":
bucketLimit = 200;
planModifier = 0.2;
break;
case "hobby":
bucketLimit = 50;
planModifier = 0.5;
break;
case "free":
default:
bucketLimit = 25;
planModifier = 1;
break;
}
// if length set is smaller than set, just return base priority
if (setLength <= bucketLimit) {
return basePriority;
} else {
// If not, we keep base priority + planModifier
return Math.ceil(
basePriority + Math.ceil((setLength - bucketLimit) * planModifier)
);
}
}

View File

@ -7,9 +7,10 @@ export async function addScrapeJob(
webScraperOptions: WebScraperOptions,
options: any = {},
jobId: string = uuidv4(),
jobPriority: number = 10
): Promise<Job> {
return await getScrapeQueue().add(jobId, webScraperOptions, {
priority: webScraperOptions.crawl_id ? 20 : 10,
priority: webScraperOptions.crawl_id ? 20 : jobPriority,
...options,
jobId,
});

View File

@ -21,6 +21,7 @@ import { addCrawlJob, addCrawlJobDone, crawlToCrawler, finishCrawl, getCrawl, ge
import { StoredCrawl } from "../lib/crawl-redis";
import { addScrapeJob } from "./queue-jobs";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
import { addJobPriority, deleteJobPriority } from "../../src/lib/job-priority";
if (process.env.ENV === "production") {
initSDK({
@ -50,6 +51,7 @@ const processJobInternal = async (token: string, job: Job) => {
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);
await addJobPriority(job.data.team_id, job.id );
try {
const result = await processJob(job, token);
try{
@ -62,9 +64,9 @@ const processJobInternal = async (token: string, job: Job) => {
}
} catch (error) {
console.log("Job failed, error:", error);
await job.moveToFailed(error, token, false);
} finally {
await deleteJobPriority(job.data.team_id, job.id );
clearInterval(extendLockInterval);
}
};

View File

@ -113,7 +113,7 @@ export interface AuthResponse {
team_id?: string;
error?: string;
status?: number;
plan?: string;
plan?: PlanType;
}
@ -137,3 +137,14 @@ export type ScrapeLog = {
ipv4_support?: boolean | null;
ipv6_support?: boolean | null;
};
export type PlanType =
| "starter"
| "standard"
| "scale"
| "hobby"
| "standardnew"
| "growth"
| "growthdouble"
| "free"
| "";