diff --git a/apps/api/src/controllers/v0/crawl.ts b/apps/api/src/controllers/v0/crawl.ts index 3ebee976..fb412147 100644 --- a/apps/api/src/controllers/v0/crawl.ts +++ b/apps/api/src/controllers/v0/crawl.ts @@ -193,13 +193,9 @@ export async function crawlController(req: Request, res: Response) { id, jobs.map((x) => x.opts.jobId) ); - if (Sentry.isInitialized()) { - for (const job of jobs) { - // add with sentry instrumentation - await addScrapeJob(job.data as any, {}, job.opts.jobId); - } - } else { - await getScrapeQueue().addBulk(jobs); + for (const job of jobs) { + // add with sentry instrumentation + await addScrapeJob(job.data as any, {}, job.opts.jobId); } } else { await lockURL(id, sc, url); @@ -207,7 +203,8 @@ export async function crawlController(req: Request, res: Response) { // Not needed, first one should be 15. // const jobPriority = await getJobPriority({plan, team_id, basePriority: 10}) - const job = await addScrapeJob( + const jobId = uuidv4(); + await addScrapeJob( { url, mode: "single_urls", @@ -220,9 +217,10 @@ export async function crawlController(req: Request, res: Response) { }, { priority: 15, // prioritize request 0 of crawl jobs same as scrape jobs - } + }, + jobId, ); - await addCrawlJob(id, job.id); + await addCrawlJob(id, jobId); } res.json({ jobId: id }); diff --git a/apps/api/src/controllers/v0/crawlPreview.ts b/apps/api/src/controllers/v0/crawlPreview.ts index bceb1df9..e9f6e806 100644 --- a/apps/api/src/controllers/v0/crawlPreview.ts +++ b/apps/api/src/controllers/v0/crawlPreview.ts @@ -103,7 +103,8 @@ export async function crawlPreviewController(req: Request, res: Response) { if (sitemap !== null) { for (const url of sitemap.map(x => x.url)) { await lockURL(id, sc, url); - const job = await addScrapeJob({ + const jobId = uuidv4(); + await addScrapeJob({ url, mode: "single_urls", crawlerOptions: crawlerOptions, @@ -113,12 +114,13 @@ export async function crawlPreviewController(req: Request, res: Response) { origin: "website-preview", crawl_id: id, sitemapped: true, - }); - await addCrawlJob(id, job.id); + }, {}, jobId); + await addCrawlJob(id, jobId); } } else { await lockURL(id, sc, url); - const job = await addScrapeJob({ + const jobId = uuidv4(); + await addScrapeJob({ url, mode: "single_urls", crawlerOptions: crawlerOptions, @@ -127,8 +129,8 @@ export async function crawlPreviewController(req: Request, res: Response) { pageOptions: pageOptions, origin: "website-preview", crawl_id: id, - }); - await addCrawlJob(id, job.id); + }, {}, jobId); + await addCrawlJob(id, jobId); } res.json({ jobId: id }); diff --git a/apps/api/src/controllers/v0/scrape.ts b/apps/api/src/controllers/v0/scrape.ts index f5dbc3d1..5e6b7c6f 100644 --- a/apps/api/src/controllers/v0/scrape.ts +++ b/apps/api/src/controllers/v0/scrape.ts @@ -54,7 +54,7 @@ export async function scrapeHelper( const jobPriority = await getJobPriority({ plan, team_id, basePriority: 10 }); - const job = await addScrapeJob( + await addScrapeJob( { url, mode: "single_urls", @@ -81,7 +81,7 @@ export async function scrapeHelper( }, async (span) => { try { - doc = (await waitForJob(job.id, timeout))[0]; + doc = (await waitForJob(jobId, timeout))[0]; } catch (e) { if (e instanceof Error && e.message.startsWith("Job wait")) { span.setAttribute("timedOut", true); @@ -116,10 +116,10 @@ export async function scrapeHelper( return err; } - await job.remove(); + await getScrapeQueue().remove(jobId); if (!doc) { - console.error("!!! PANIC DOC IS", doc, job); + console.error("!!! PANIC DOC IS", doc); return { success: true, error: "No page found", diff --git a/apps/api/src/controllers/v0/search.ts b/apps/api/src/controllers/v0/search.ts index 3635a4c4..67cff8eb 100644 --- a/apps/api/src/controllers/v0/search.ts +++ b/apps/api/src/controllers/v0/search.ts @@ -99,24 +99,19 @@ export async function searchHelper( }; }) - let jobs = []; - if (Sentry.isInitialized()) { - for (const job of jobDatas) { - // add with sentry instrumentation - jobs.push(await addScrapeJob(job.data as any, {}, job.opts.jobId, job.opts.priority)); - } - } else { - jobs = await getScrapeQueue().addBulk(jobDatas); - await getScrapeQueue().addBulk(jobs); + // TODO: addScrapeJobs + for (const job of jobDatas) { + await addScrapeJob(job.data as any, {}, job.opts.jobId, job.opts.priority) } - const docs = (await Promise.all(jobs.map(x => waitForJob(x.id, 60000)))).map(x => x[0]); + const docs = (await Promise.all(jobDatas.map(x => waitForJob(x.opts.jobId, 60000)))).map(x => x[0]); if (docs.length === 0) { return { success: true, error: "No search results found", returnCode: 200 }; } - await Promise.all(jobs.map(x => x.remove())); + const sq = getScrapeQueue(); + await Promise.all(jobDatas.map(x => sq.remove(x.opts.jobId))); // make sure doc.content is not empty const filteredDocs = docs.filter( diff --git a/apps/api/src/controllers/v1/batch-scrape.ts b/apps/api/src/controllers/v1/batch-scrape.ts index cde4bd76..286163df 100644 --- a/apps/api/src/controllers/v1/batch-scrape.ts +++ b/apps/api/src/controllers/v1/batch-scrape.ts @@ -17,6 +17,7 @@ import { import { logCrawl } from "../../services/logging/crawl_log"; import { getScrapeQueue } from "../../services/queue-service"; import { getJobPriority } from "../../lib/job-priority"; +import { addScrapeJobs } from "../../services/queue-jobs"; export async function batchScrapeController( req: RequestWithAuth<{}, CrawlResponse, BatchScrapeRequest>, @@ -58,12 +59,10 @@ export async function batchScrapeController( } const jobs = req.body.urls.map((x) => { - const uuid = uuidv4(); return { - name: uuid, data: { url: x, - mode: "single_urls", + mode: "single_urls" as const, team_id: req.auth.team_id, plan: req.auth.plan, crawlerOptions: null, @@ -75,7 +74,7 @@ export async function batchScrapeController( v1: true, }, opts: { - jobId: uuid, + jobId: uuidv4(), priority: 20, }, }; @@ -89,7 +88,7 @@ export async function batchScrapeController( id, jobs.map((x) => x.opts.jobId) ); - await getScrapeQueue().addBulk(jobs); + await addScrapeJobs(jobs); const protocol = process.env.ENV === "local" ? req.protocol : "https"; diff --git a/apps/api/src/controllers/v1/crawl.ts b/apps/api/src/controllers/v1/crawl.ts index 0000b6fe..53cc04e8 100644 --- a/apps/api/src/controllers/v1/crawl.ts +++ b/apps/api/src/controllers/v1/crawl.ts @@ -137,7 +137,8 @@ export async function crawlController( await getScrapeQueue().addBulk(jobs); } else { await lockURL(id, sc, req.body.url); - const job = await addScrapeJob( + const jobId = uuidv4(); + await addScrapeJob( { url: req.body.url, mode: "single_urls", @@ -152,9 +153,10 @@ export async function crawlController( }, { priority: 15, - } + }, + jobId, ); - await addCrawlJob(id, job.id); + await addCrawlJob(id, jobId); } if(req.body.webhook) { diff --git a/apps/api/src/controllers/v1/scrape.ts b/apps/api/src/controllers/v1/scrape.ts index d0d4c5fc..6fa855f7 100644 --- a/apps/api/src/controllers/v1/scrape.ts +++ b/apps/api/src/controllers/v1/scrape.ts @@ -17,6 +17,7 @@ import { addScrapeJob, waitForJob } from "../../services/queue-jobs"; import { logJob } from "../../services/logging/log_job"; import { getJobPriority } from "../../lib/job-priority"; import { PlanType } from "../../types"; +import { getScrapeQueue } from "../../services/queue-service"; export async function scrapeController( req: RequestWithAuth<{}, ScrapeResponse, ScrapeRequest>, @@ -38,7 +39,7 @@ export async function scrapeController( basePriority: 10, }); - const job = await addScrapeJob( + await addScrapeJob( { url: req.body.url, mode: "single_urls", @@ -59,7 +60,7 @@ export async function scrapeController( let doc: any | undefined; try { - doc = (await waitForJob(job.id, timeout + totalWait))[0]; + doc = (await waitForJob(jobId, timeout + totalWait))[0]; } catch (e) { Logger.error(`Error in scrapeController: ${e}`); if (e instanceof Error && e.message.startsWith("Job wait")) { @@ -79,10 +80,10 @@ export async function scrapeController( } } - await job.remove(); + await getScrapeQueue().remove(jobId); if (!doc) { - console.error("!!! PANIC DOC IS", doc, job); + console.error("!!! PANIC DOC IS", doc); return res.status(200).json({ success: true, warning: "No page found", diff --git a/apps/api/src/lib/concurrency-limit.ts b/apps/api/src/lib/concurrency-limit.ts new file mode 100644 index 00000000..72dc1e45 --- /dev/null +++ b/apps/api/src/lib/concurrency-limit.ts @@ -0,0 +1,48 @@ +import { getRateLimiterPoints } from "../services/rate-limiter"; +import { redisConnection } from "../services/queue-service"; +import { RateLimiterMode } from "../types"; +import { JobsOptions } from "bullmq"; + +const constructKey = (team_id: string) => "concurrency-limiter:" + team_id; +const constructQueueKey = (team_id: string) => "concurrency-limit-queue:" + team_id; +const stalledJobTimeoutMs = 2 * 60 * 1000; + +export function getConcurrencyLimitMax(plan: string): number { + return getRateLimiterPoints(RateLimiterMode.Scrape, undefined, plan); +} + +export async function cleanOldConcurrencyLimitEntries(team_id: string, now: number = Date.now()) { + await redisConnection.zremrangebyscore(constructKey(team_id), -Infinity, now); +} + +export async function getConcurrencyLimitActiveJobs(team_id: string, now: number = Date.now()): Promise { + return await redisConnection.zrangebyscore(constructKey(team_id), now, Infinity); +} + +export async function pushConcurrencyLimitActiveJob(team_id: string, id: string, now: number = Date.now()) { + await redisConnection.zadd(constructKey(team_id), now + stalledJobTimeoutMs, id); +} + +export async function removeConcurrencyLimitActiveJob(team_id: string, id: string) { + await redisConnection.zrem(constructKey(team_id), id); +} + +export type ConcurrencyLimitedJob = { + id: string; + data: any; + opts: JobsOptions; + priority?: number; +} + +export async function takeConcurrencyLimitedJob(team_id: string): Promise { + const res = await redisConnection.zmpop(1, constructQueueKey(team_id), "MIN"); + if (res === null || res === undefined) { + return null; + } + + return JSON.parse(res[1][0][0]); +} + +export async function pushConcurrencyLimitedJob(team_id: string, job: ConcurrencyLimitedJob) { + await redisConnection.zadd(constructQueueKey(team_id), job.priority ?? 1, JSON.stringify(job)); +} diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 315700a1..e912ef42 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -1,20 +1,47 @@ -import { Job, Queue } from "bullmq"; +import { Job, JobsOptions } from "bullmq"; import { getScrapeQueue } from "./queue-service"; import { v4 as uuidv4 } from "uuid"; import { WebScraperOptions } from "../types"; import * as Sentry from "@sentry/node"; +import { cleanOldConcurrencyLimitEntries, getConcurrencyLimitActiveJobs, getConcurrencyLimitMax, pushConcurrencyLimitActiveJob, pushConcurrencyLimitedJob } from "../lib/concurrency-limit"; async function addScrapeJobRaw( webScraperOptions: any, options: any, jobId: string, jobPriority: number = 10 -): Promise { - return await getScrapeQueue().add(jobId, webScraperOptions, { - ...options, - priority: jobPriority, - jobId, - }); +) { + let concurrencyLimited = false; + + if (webScraperOptions && webScraperOptions.team_id && webScraperOptions.plan) { + const now = Date.now(); + const limit = await getConcurrencyLimitMax(webScraperOptions.plan); + cleanOldConcurrencyLimitEntries(webScraperOptions.team_id, now); + concurrencyLimited = (await getConcurrencyLimitActiveJobs(webScraperOptions.team_id, now)).length >= limit; + } + + if (concurrencyLimited) { + await pushConcurrencyLimitedJob(webScraperOptions.team_id, { + id: jobId, + data: webScraperOptions, + opts: { + ...options, + priority: jobPriority, + jobId: jobId, + }, + priority: jobPriority, + }); + } else { + if (webScraperOptions && webScraperOptions.team_id && webScraperOptions.plan) { + await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId); + } + + await getScrapeQueue().add(jobId, webScraperOptions, { + ...options, + priority: jobPriority, + jobId, + }); + } } export async function addScrapeJob( @@ -22,8 +49,7 @@ export async function addScrapeJob( options: any = {}, jobId: string = uuidv4(), jobPriority: number = 10 -): Promise { - +) { if (Sentry.isInitialized()) { const size = JSON.stringify(webScraperOptions).length; return await Sentry.startSpan({ @@ -35,7 +61,7 @@ export async function addScrapeJob( "messaging.message.body.size": size, }, }, async (span) => { - return await addScrapeJobRaw({ + await addScrapeJobRaw({ ...webScraperOptions, sentry: { trace: Sentry.spanToTraceHeader(span), @@ -45,10 +71,23 @@ export async function addScrapeJob( }, options, jobId, jobPriority); }); } else { - return await addScrapeJobRaw(webScraperOptions, options, jobId, jobPriority); + await addScrapeJobRaw(webScraperOptions, options, jobId, jobPriority); } } +export async function addScrapeJobs( + jobs: { + data: WebScraperOptions, + opts: { + jobId: string, + priority: number, + }, + }[], +) { + // TODO: better + await Promise.all(jobs.map(job => addScrapeJob(job.data, job.opts, job.opts.jobId, job.opts.priority))); +} + export function waitForJob(jobId: string, timeout: number) { return new Promise((resolve, reject) => { const start = Date.now(); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index f15aca4e..0e176ddb 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -38,6 +38,7 @@ import { PlanType, RateLimiterMode } from "../types"; import { getJobs } from "../../src/controllers/v1/crawl-status"; import { configDotenv } from "dotenv"; import { getRateLimiterPoints } from "./rate-limiter"; +import { cleanOldConcurrencyLimitEntries, pushConcurrencyLimitActiveJob, removeConcurrencyLimitActiveJob, takeConcurrencyLimitedJob } from "../lib/concurrency-limit"; configDotenv(); if (process.env.ENV === "production") { @@ -135,46 +136,27 @@ const workerFun = async ( const job = await worker.getNextJob(token); if (job) { - const concurrencyLimiterKey = "concurrency-limiter:" + job.data?.team_id; + async function afterJobDone(job: Job) { + if (job.id && job.data && job.data.team_id && job.data.plan) { + await removeConcurrencyLimitActiveJob(job.data.team_id, job.id); + cleanOldConcurrencyLimitEntries(job.data.team_id); - if (job.data && job.data.team_id && job.data.plan) { - const concurrencyLimiterThrottledKey = "concurrency-limiter:" + job.data.team_id + ":throttled"; - const concurrencyLimit = getRateLimiterPoints(RateLimiterMode.Scrape, undefined, job.data.plan); - const now = Date.now(); - const stalledJobTimeoutMs = 2 * 60 * 1000; - const throttledJobTimeoutMs = 10 * 60 * 1000; + // Queue up next job, if it exists + // No need to check if we're under the limit here -- if the current job is finished, + // we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG + const nextJob = await takeConcurrencyLimitedJob(job.data.team_id); + if (nextJob !== null) { + await pushConcurrencyLimitActiveJob(job.data.team_id, nextJob.id); - redisConnection.zremrangebyscore(concurrencyLimiterThrottledKey, -Infinity, now); - redisConnection.zremrangebyscore(concurrencyLimiterKey, -Infinity, now); - const activeJobsOfTeam = await redisConnection.zrangebyscore(concurrencyLimiterKey, now, Infinity); - if (activeJobsOfTeam.length >= concurrencyLimit) { - // Nick: removed the log because it was too spammy, tested and confirmed that the job is added back to the queue - // Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit"); - // Concurrency limit hit, throttles the job - await redisConnection.zadd(concurrencyLimiterThrottledKey, now + throttledJobTimeoutMs, job.id); - // We move to failed with a specific error - await job.moveToFailed(new Error("Concurrency limit hit"), token, false); - // Remove the job from the queue - await job.remove(); - // Increment the priority of the job exponentially by 5%, Note: max bull priority is 2 million - const newJobPriority = Math.min(Math.round((job.opts.priority ?? 10) * 1.05), 20000); - // Add the job back to the queue with the new priority - await queue.add(job.name, { - ...job.data, - concurrencyLimitHit: true, - }, { - ...job.opts, - jobId: job.id, - priority: newJobPriority, // exponential backoff for stuck jobs - }); - - // await sleep(gotJobInterval); - continue; - } else { - // If we are not throttled, add the job back to the queue with the new priority - await redisConnection.zadd(concurrencyLimiterKey, now + stalledJobTimeoutMs, job.id); - // Remove the job from the throttled list - await redisConnection.zrem(concurrencyLimiterThrottledKey, job.id); + await queue.add(nextJob.id, { + ...nextJob.data, + concurrencyLimitHit: true, + }, { + ...nextJob.opts, + jobId: nextJob.id, + priority: nextJob.priority, + }); + } } } @@ -212,9 +194,7 @@ const workerFun = async ( try { res = await processJobInternal(token, job); } finally { - if (job.id && job.data && job.data.team_id) { - await redisConnection.zrem(concurrencyLimiterKey, job.id); - } + await afterJobDone(job) } if (res !== null) { @@ -239,11 +219,7 @@ const workerFun = async ( }, () => { processJobInternal(token, job) - .finally(() => { - if (job.id && job.data && job.data.team_id) { - redisConnection.zrem(concurrencyLimiterKey, job.id); - } - }); + .finally(() => afterJobDone(job)); } ); } @@ -391,7 +367,7 @@ async function processJob(job: Job, token: string) { // console.log("base priority: ", job.data.crawl_id ? 20 : 10) // console.log("job priority: " , jobPriority, "\n\n\n") - const newJob = await addScrapeJob( + await addScrapeJob( { url: link, mode: "single_urls", @@ -409,7 +385,7 @@ async function processJob(job: Job, token: string) { jobPriority ); - await addCrawlJob(job.data.crawl_id, newJob.id); + await addCrawlJob(job.data.crawl_id, jobId); } } }