From c7bfe4ffe83325703359026906b9c478e05a9e60 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 21 Aug 2024 22:20:40 -0300 Subject: [PATCH] Nick: --- apps/api/src/controllers/crawl.ts | 8 +++++++- apps/api/src/controllers/crawlPreview.ts | 3 ++- apps/api/src/controllers/scrape.ts | 2 +- apps/api/src/index.ts | 2 +- apps/api/src/lib/crawl-redis.ts | 1 + apps/api/src/lib/job-priority.ts | 18 ++++++++++++------ apps/api/src/services/queue-jobs.ts | 2 +- apps/api/src/services/queue-worker.ts | 14 ++++++++++++-- 8 files changed, 37 insertions(+), 13 deletions(-) diff --git a/apps/api/src/controllers/crawl.ts b/apps/api/src/controllers/crawl.ts index 1dfe758f..4f3442d4 100644 --- a/apps/api/src/controllers/crawl.ts +++ b/apps/api/src/controllers/crawl.ts @@ -25,10 +25,11 @@ import { } from "../../src/lib/crawl-redis"; import { getScrapeQueue } from "../../src/services/queue-service"; import { checkAndUpdateURL } from "../../src/lib/validateUrl"; +import { getJobPriority } from "../../src/lib/job-priority"; export async function crawlController(req: Request, res: Response) { try { - const { success, team_id, error, status } = await authenticateUser( + const { success, team_id, error, status, plan } = await authenticateUser( req, res, RateLimiterMode.Crawl @@ -126,6 +127,7 @@ export async function crawlController(req: Request, res: Response) { crawlerOptions, pageOptions, team_id, + plan, createdAt: Date.now(), }; @@ -175,6 +177,10 @@ export async function crawlController(req: Request, res: Response) { await getScrapeQueue().addBulk(jobs); } else { await lockURL(id, sc, url); + + // Not needed, first one should be 15. + // const jobPriority = await getJobPriority({plan, team_id, basePriority: 10}) + const job = await addScrapeJob( { url, diff --git a/apps/api/src/controllers/crawlPreview.ts b/apps/api/src/controllers/crawlPreview.ts index cc10dc8e..20eae731 100644 --- a/apps/api/src/controllers/crawlPreview.ts +++ b/apps/api/src/controllers/crawlPreview.ts @@ -10,7 +10,7 @@ import { checkAndUpdateURL } from "../../src/lib/validateUrl"; export async function crawlPreviewController(req: Request, res: Response) { try { - const { success, error, status } = await authenticateUser( + const { success, error, status, team_id:a, plan } = await authenticateUser( req, res, RateLimiterMode.Preview @@ -88,6 +88,7 @@ export async function crawlPreviewController(req: Request, res: Response) { crawlerOptions, pageOptions, team_id, + plan, robots, createdAt: Date.now(), }; diff --git a/apps/api/src/controllers/scrape.ts b/apps/api/src/controllers/scrape.ts index 436ad4fd..bb991060 100644 --- a/apps/api/src/controllers/scrape.ts +++ b/apps/api/src/controllers/scrape.ts @@ -38,7 +38,7 @@ export async function scrapeHelper( return { success: false, error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.", returnCode: 403 }; } - const jobPriority = await getJobPriority({plan, team_id}) + const jobPriority = await getJobPriority({plan, team_id, basePriority: 10}) const job = await addScrapeJob({ url, diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index ca16e4b1..df9dcf0b 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -213,7 +213,7 @@ async function sendScrapeRequests() { await Promise.all(requests); } -sendScrapeRequests(); +// sendScrapeRequests(); // const sq = getScrapeQueue(); // sq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); diff --git a/apps/api/src/lib/crawl-redis.ts b/apps/api/src/lib/crawl-redis.ts index 9e8a0cf6..b8c1c151 100644 --- a/apps/api/src/lib/crawl-redis.ts +++ b/apps/api/src/lib/crawl-redis.ts @@ -6,6 +6,7 @@ export type StoredCrawl = { crawlerOptions: any; pageOptions: any; team_id: string; + plan: string; robots?: string; cancelled?: boolean; createdAt: number; diff --git a/apps/api/src/lib/job-priority.ts b/apps/api/src/lib/job-priority.ts index b418feba..fdea28b9 100644 --- a/apps/api/src/lib/job-priority.ts +++ b/apps/api/src/lib/job-priority.ts @@ -30,9 +30,11 @@ export async function deleteJobPriority(team_id, job_id) { export async function getJobPriority({ plan, team_id, + basePriority = 10 }: { plan: PlanType; team_id: string; + basePriority: number; }): Promise { const setKey = SET_KEY_PREFIX + team_id; @@ -40,11 +42,18 @@ export async function getJobPriority({ const setLength = await redisConnection.scard(setKey); // Determine the priority based on the plan and set length - let basePriority = 10; let planModifier = 1; let bucketLimit = 0; switch (plan) { + case "free": + bucketLimit = 25; + planModifier = 1; + break; + case "hobby": + bucketLimit = 50; + planModifier = 0.5; + break; case "standard": case "standardnew": bucketLimit = 100; @@ -55,11 +64,8 @@ export async function getJobPriority({ bucketLimit = 200; planModifier = 0.2; break; - case "hobby": - bucketLimit = 50; - planModifier = 0.5; - break; - case "free": + + default: bucketLimit = 25; planModifier = 1; diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index be78e558..080d52dc 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -10,7 +10,7 @@ export async function addScrapeJob( jobPriority: number = 10 ): Promise { return await getScrapeQueue().add(jobId, webScraperOptions, { - priority: webScraperOptions.crawl_id ? 20 : jobPriority, + priority: jobPriority, ...options, jobId, }); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index f4c720b4..955b7fbb 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -21,7 +21,8 @@ import { addCrawlJob, addCrawlJobDone, crawlToCrawler, finishCrawl, getCrawl, ge import { StoredCrawl } from "../lib/crawl-redis"; import { addScrapeJob } from "./queue-jobs"; import { supabaseGetJobById } from "../../src/lib/supabase-jobs"; -import { addJobPriority, deleteJobPriority } from "../../src/lib/job-priority"; +import { addJobPriority, deleteJobPriority, getJobPriority } from "../../src/lib/job-priority"; +import { PlanType } from "../types"; if (process.env.ENV === "production") { initSDK({ @@ -216,6 +217,15 @@ async function processJob(job: Job, token: string) { for (const link of links) { if (await lockURL(job.data.crawl_id, sc, link)) { + + const jobPriority = await getJobPriority({plan:sc.plan as PlanType, team_id: sc.team_id, basePriority: job.data.crawl_id ? 20 : 10}) + const jobId = uuidv4(); + + console.log("plan: ", sc.plan); + console.log("team_id: ", sc.team_id) + console.log("base priority: ", job.data.crawl_id ? 20 : 10) + console.log("job priority: " , jobPriority, "\n\n\n") + const newJob = await addScrapeJob({ url: link, mode: "single_urls", @@ -224,7 +234,7 @@ async function processJob(job: Job, token: string) { pageOptions: sc.pageOptions, origin: job.data.origin, crawl_id: job.data.crawl_id, - }); + }, {}, jobId, jobPriority); await addCrawlJob(job.data.crawl_id, newJob.id); }