From 411ecdf04ba25fb2dfd9c52a8cbdb7aa72e72885 Mon Sep 17 00:00:00 2001 From: "devin-ai-integration[bot]" <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 2 May 2025 17:20:57 +0200 Subject: [PATCH] Add crawl delay functionality with per-crawl concurrency limiting (FIR-249) (#1413) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Add crawl delay functionality with per-crawl concurrency limiting (FIR-249) Co-Authored-By: mogery@sideguide.dev * fix: Skip crawl delay in test environment to fix CI tests Co-Authored-By: mogery@sideguide.dev * refactor: Use crawlerOptions.delay instead of separate fields Co-Authored-By: mogery@sideguide.dev * refactor: Rename crawlDelay to delay in type definitions for uniformity Co-Authored-By: mogery@sideguide.dev * refactor: Fix crawl concurrency implementation based on PR feedback Co-Authored-By: mogery@sideguide.dev * refactor: Simplify if/else structure in queue-jobs.ts based on PR feedback Co-Authored-By: mogery@sideguide.dev * human fixes * test: Add tests for crawl delay functionality Co-Authored-By: mogery@sideguide.dev * test: Move crawl delay tests to existing crawl.test.ts file Co-Authored-By: mogery@sideguide.dev * fix: Ensure sitemapped URLs are added to crawl concurrency queue and update crawl status endpoint Co-Authored-By: mogery@sideguide.dev * dbg * fix: Ensure jobs with crawl delay are properly added to BullMQ Co-Authored-By: mogery@sideguide.dev * fix: Remove duplicate job addition to BullMQ for jobs with crawl delay Co-Authored-By: mogery@sideguide.dev * fixes * warning for devin * test: Simplify crawl delay test as requested in PR feedback Co-Authored-By: mogery@sideguide.dev * bump delay test timeout * fix operation order * bump further??? * fix: broken on self-host * Update apps/api/src/services/queue-jobs.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: import --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: mogery@sideguide.dev Co-authored-by: Gergő Móricz Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- apps/api/src/__tests__/snips/crawl.test.ts | 10 +- apps/api/src/controllers/v1/crawl-status.ts | 6 +- apps/api/src/controllers/v1/crawl.ts | 4 + apps/api/src/controllers/v1/types.ts | 3 + apps/api/src/lib/concurrency-limit.ts | 73 +++++++++++++ apps/api/src/scraper/WebScraper/crawler.ts | 11 +- apps/api/src/services/queue-jobs.ts | 111 +++++++++++++++++--- apps/api/src/services/queue-worker.ts | 32 +++++- 8 files changed, 227 insertions(+), 23 deletions(-) diff --git a/apps/api/src/__tests__/snips/crawl.test.ts b/apps/api/src/__tests__/snips/crawl.test.ts index d04eab35..ceb66631 100644 --- a/apps/api/src/__tests__/snips/crawl.test.ts +++ b/apps/api/src/__tests__/snips/crawl.test.ts @@ -1,4 +1,5 @@ import { crawl } from "./lib"; +import { describe, it, expect } from "@jest/globals"; describe("Crawl tests", () => { it.concurrent("works", async () => { @@ -37,6 +38,14 @@ describe("Crawl tests", () => { } }, 120000); + it.concurrent("delay parameter works", async () => { + await crawl({ + url: "https://firecrawl.dev", + limit: 3, + delay: 5, + }); + }, 300000); + // TEMP: Flaky // it.concurrent("discovers URLs properly when origin is not included", async () => { // const res = await crawl({ @@ -63,7 +72,6 @@ describe("Crawl tests", () => { // maxDiscoveryDepth: 1, // limit: 10, // }); - // expect(res.success).toBe(true); // if (res.success) { // expect(res.data.length).toBeGreaterThan(1); diff --git a/apps/api/src/controllers/v1/crawl-status.ts b/apps/api/src/controllers/v1/crawl-status.ts index 748b9807..b1f97471 100644 --- a/apps/api/src/controllers/v1/crawl-status.ts +++ b/apps/api/src/controllers/v1/crawl-status.ts @@ -22,7 +22,7 @@ import { configDotenv } from "dotenv"; import type { Job, JobState, Queue } from "bullmq"; import { logger } from "../../lib/logger"; import { supabase_rr_service, supabase_service } from "../../services/supabase"; -import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit"; +import { getConcurrencyLimitedJobs, getCrawlConcurrencyLimitedJobs } from "../../lib/concurrency-limit"; import { getJobFromGCS } from "../../lib/gcs-jobs"; configDotenv(); @@ -157,7 +157,9 @@ export async function crawlStatusController( ), ); - const throttledJobsSet = await getConcurrencyLimitedJobs(req.auth.team_id); + const teamThrottledJobsSet = await getConcurrencyLimitedJobs(req.auth.team_id); + const crawlThrottledJobsSet = sc.crawlerOptions?.delay ? await getCrawlConcurrencyLimitedJobs(req.params.jobId) : new Set(); + const throttledJobsSet = new Set([...teamThrottledJobsSet, ...crawlThrottledJobsSet]); const validJobStatuses: [string, JobState | "unknown"][] = []; const validJobIDs: string[] = []; diff --git a/apps/api/src/controllers/v1/crawl.ts b/apps/api/src/controllers/v1/crawl.ts index bd81bdc8..b60661a2 100644 --- a/apps/api/src/controllers/v1/crawl.ts +++ b/apps/api/src/controllers/v1/crawl.ts @@ -93,6 +93,10 @@ export async function crawlController( try { sc.robots = await crawler.getRobotsTxt(scrapeOptions.skipTlsVerification); + const robotsCrawlDelay = crawler.getRobotsCrawlDelay(); + if (robotsCrawlDelay !== null && !sc.crawlerOptions.delay) { + sc.crawlerOptions.delay = robotsCrawlDelay; + } } catch (e) { logger.debug("Failed to get robots.txt (this is probably fine!)", { error: e, diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index 38881dd2..7a291b90 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -588,6 +588,7 @@ const crawlerOptions = z deduplicateSimilarURLs: z.boolean().default(true), ignoreQueryParameters: z.boolean().default(false), regexOnFullURL: z.boolean().default(false), + delay: z.number().positive().optional(), }) .strict(strictMessage); @@ -986,6 +987,7 @@ export function toLegacyCrawlerOptions(x: CrawlerOptions) { regexOnFullURL: x.regexOnFullURL, maxDiscoveryDepth: x.maxDiscoveryDepth, currentDiscoveryDepth: 0, + delay: x.delay, }; } @@ -1008,6 +1010,7 @@ export function fromLegacyCrawlerOptions(x: any, teamId: string): { ignoreQueryParameters: x.ignoreQueryParameters, regexOnFullURL: x.regexOnFullURL, maxDiscoveryDepth: x.maxDiscoveryDepth, + delay: x.delay, }), internalOptions: { v0CrawlOnlyUrls: x.returnOnlyUrls, diff --git a/apps/api/src/lib/concurrency-limit.ts b/apps/api/src/lib/concurrency-limit.ts index 1eca6839..3be85ec2 100644 --- a/apps/api/src/lib/concurrency-limit.ts +++ b/apps/api/src/lib/concurrency-limit.ts @@ -5,6 +5,9 @@ const constructKey = (team_id: string) => "concurrency-limiter:" + team_id; const constructQueueKey = (team_id: string) => "concurrency-limit-queue:" + team_id; +const constructCrawlKey = (crawl_id: string) => "crawl-concurrency-limiter:" + crawl_id; +const constructCrawlQueueKey = (crawl_id: string) => "crawl-concurrency-limit-queue:" + crawl_id; + export async function cleanOldConcurrencyLimitEntries( team_id: string, now: number = Date.now(), @@ -82,3 +85,73 @@ export async function getConcurrencyQueueJobsCount(team_id: string): Promise { + return await redisConnection.zrangebyscore( + constructCrawlKey(crawl_id), + now, + Infinity, + ); +} + +export async function pushCrawlConcurrencyLimitActiveJob( + crawl_id: string, + id: string, + timeout: number, + now: number = Date.now(), +) { + await redisConnection.zadd( + constructCrawlKey(crawl_id), + now + timeout, + id, + ); +} + +export async function removeCrawlConcurrencyLimitActiveJob( + crawl_id: string, + id: string, +) { + await redisConnection.zrem(constructCrawlKey(crawl_id), id); +} + +export async function takeCrawlConcurrencyLimitedJob( + crawl_id: string, +): Promise { + const res = await redisConnection.zmpop(1, constructCrawlQueueKey(crawl_id), "MIN"); + if (res === null || res === undefined) { + return null; + } + return JSON.parse(res[1][0][0]); +} + +export async function pushCrawlConcurrencyLimitedJob( + crawl_id: string, + job: ConcurrencyLimitedJob, +) { + await redisConnection.zadd( + constructCrawlQueueKey(crawl_id), + job.priority ?? 1, + JSON.stringify(job), + ); +} + +export async function getCrawlConcurrencyLimitedJobs( + crawl_id: string, +) { + return new Set((await redisConnection.zrange(constructCrawlQueueKey(crawl_id), 0, -1)).map(x => JSON.parse(x).id)); +} + +export async function getCrawlConcurrencyQueueJobsCount(crawl_id: string): Promise { + const count = await redisConnection.zcard(constructCrawlQueueKey(crawl_id)); + return count; +} diff --git a/apps/api/src/scraper/WebScraper/crawler.ts b/apps/api/src/scraper/WebScraper/crawler.ts index 2962af3e..34d1426a 100644 --- a/apps/api/src/scraper/WebScraper/crawler.ts +++ b/apps/api/src/scraper/WebScraper/crawler.ts @@ -23,6 +23,7 @@ export class WebCrawler { private limit: number; private robotsTxtUrl: string; public robots: Robot; + private robotsCrawlDelay: number | null = null; private generateImgAltText: boolean; private allowBackwardCrawling: boolean; private allowExternalContentLinks: boolean; @@ -243,6 +244,12 @@ export class WebCrawler { public importRobotsTxt(txt: string) { this.robots = robotsParser(this.robotsTxtUrl, txt); + const delay = this.robots.getCrawlDelay("FireCrawlAgent") || this.robots.getCrawlDelay("FirecrawlAgent"); + this.robotsCrawlDelay = delay !== undefined ? delay : null; + } + + public getRobotsCrawlDelay(): number | null { + return this.robotsCrawlDelay; } public async tryGetSitemap( @@ -268,7 +275,7 @@ export class WebCrawler { const _urlsHandler = async (urls: string[]) => { if (fromMap && onlySitemap) { - return urlsHandler(urls); + return await urlsHandler(urls); } else { let filteredLinks = this.filterLinks( [...new Set(urls)].filter(x => this.filterURL(x, this.initialUrl) !== null), @@ -295,7 +302,7 @@ export class WebCrawler { "NX", ); if (uniqueURLs.length > 0) { - return urlsHandler(uniqueURLs); + return await urlsHandler(uniqueURLs); } } }; diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 7778393a..8dd57ffc 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -4,10 +4,14 @@ import { NotificationType, RateLimiterMode, WebScraperOptions } from "../types"; import * as Sentry from "@sentry/node"; import { cleanOldConcurrencyLimitEntries, + cleanOldCrawlConcurrencyLimitEntries, getConcurrencyLimitActiveJobs, getConcurrencyQueueJobsCount, + getCrawlConcurrencyQueueJobsCount, pushConcurrencyLimitActiveJob, pushConcurrencyLimitedJob, + pushCrawlConcurrencyLimitActiveJob, + pushCrawlConcurrencyLimitedJob, } from "../lib/concurrency-limit"; import { logger } from "../lib/logger"; import { sendNotificationWithCustomDays } from './notification/email_notification'; @@ -45,6 +49,25 @@ async function _addScrapeJobToConcurrencyQueue( }); } +async function _addCrawlScrapeJobToConcurrencyQueue( + webScraperOptions: any, + options: any, + jobId: string, + jobPriority: number, +) { + await pushCrawlConcurrencyLimitedJob(webScraperOptions.crawl_id, { + id: jobId, + data: webScraperOptions, + opts: { + ...options, + priority: jobPriority, + jobId: jobId, + }, + priority: jobPriority, + }); + // NEVER ADD THESE TO BULLMQ!!! THEY ARE ADDED IN QUEUE-WORKER!!! SHOOOOO!!! - mogery +} + export async function _addScrapeJobToBullMQ( webScraperOptions: any, options: any, @@ -55,7 +78,11 @@ export async function _addScrapeJobToBullMQ( webScraperOptions && webScraperOptions.team_id ) { - await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId, 60 * 1000); // 60s default timeout + if (webScraperOptions.crawl_id && webScraperOptions.crawlerOptions?.delay) { + await pushCrawlConcurrencyLimitActiveJob(webScraperOptions.crawl_id, jobId, 60 * 1000); + } else { + await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId, 60 * 1000); // 60s default timeout + } } await getScrapeQueue().add(jobId, webScraperOptions, { @@ -71,6 +98,18 @@ async function addScrapeJobRaw( jobId: string, jobPriority: number, ) { + const hasCrawlDelay = webScraperOptions.crawl_id && webScraperOptions.crawlerOptions?.delay; + + if (hasCrawlDelay) { + await _addCrawlScrapeJobToConcurrencyQueue( + webScraperOptions, + options, + jobId, + jobPriority + ); + return; + } + let concurrencyLimited = false; let currentActiveConcurrency = 0; let maxConcurrency = 0; @@ -94,7 +133,7 @@ async function addScrapeJobRaw( // No need to 2x as if there are more than the max concurrency in the concurrency queue, it is already 2x if(concurrencyQueueJobs > maxConcurrency) { logger.info("Concurrency limited 2x (single) - ", "Concurrency queue jobs: ", concurrencyQueueJobs, "Max concurrency: ", maxConcurrency, "Team ID: ", webScraperOptions.team_id); - + // Only send notification if it's not a crawl or batch scrape const shouldSendNotification = await shouldSendConcurrencyLimitNotification(webScraperOptions.team_id); if (shouldSendNotification) { @@ -103,7 +142,7 @@ async function addScrapeJobRaw( }); } } - + webScraperOptions.concurrencyLimited = true; await _addScrapeJobToConcurrencyQueue( @@ -167,16 +206,19 @@ export async function addScrapeJobs( ) { if (jobs.length === 0) return true; + const addToCCQ = jobs.filter(job => job.data.crawlerOptions?.delay); + const dontAddToCCQ = jobs.filter(job => !job.data.crawlerOptions?.delay); + let countCanBeDirectlyAdded = Infinity; let currentActiveConcurrency = 0; let maxConcurrency = 0; - if (jobs[0].data && jobs[0].data.team_id) { + if (dontAddToCCQ[0] && dontAddToCCQ[0].data && dontAddToCCQ[0].data.team_id) { const now = Date.now(); - maxConcurrency = (await getACUCTeam(jobs[0].data.team_id, false, true, jobs[0].data.from_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2; - cleanOldConcurrencyLimitEntries(jobs[0].data.team_id, now); + maxConcurrency = (await getACUCTeam(dontAddToCCQ[0].data.team_id, false, true, dontAddToCCQ[0].data.from_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2; + cleanOldConcurrencyLimitEntries(dontAddToCCQ[0].data.team_id, now); - currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(jobs[0].data.team_id, now)).length; + currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(dontAddToCCQ[0].data.team_id, now)).length; countCanBeDirectlyAdded = Math.max( maxConcurrency - currentActiveConcurrency, @@ -184,24 +226,25 @@ export async function addScrapeJobs( ); } - const addToBull = jobs.slice(0, countCanBeDirectlyAdded); - const addToCQ = jobs.slice(countCanBeDirectlyAdded); + const addToBull = dontAddToCCQ.slice(0, countCanBeDirectlyAdded); + const addToCQ = dontAddToCCQ.slice(countCanBeDirectlyAdded); // equals 2x the max concurrency if(addToCQ.length > maxConcurrency) { - logger.info("Concurrency limited 2x (multiple) - ", "Concurrency queue jobs: ", addToCQ.length, "Max concurrency: ", maxConcurrency, "Team ID: ", jobs[0].data.team_id); - + logger.info(`Concurrency limited 2x (multiple) - Concurrency queue jobs: ${addToCQ.length} Max concurrency: ${maxConcurrency} Team ID: ${jobs[0].data.team_id}`); // Only send notification if it's not a crawl or batch scrape - const shouldSendNotification = await shouldSendConcurrencyLimitNotification(jobs[0].data.team_id); + if (!isCrawlOrBatchScrape(dontAddToCCQ[0].data)) { + const shouldSendNotification = await shouldSendConcurrencyLimitNotification(dontAddToCCQ[0].data.team_id); if (shouldSendNotification) { - sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { + sendNotificationWithCustomDays(dontAddToCCQ[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { logger.error("Error sending notification (concurrency limit reached): ", error); }); } + } } - await Promise.all( - addToBull.map(async (job) => { + await Promise.all( + addToCCQ.map(async (job) => { const size = JSON.stringify(job.data).length; return await Sentry.startSpan( { @@ -212,9 +255,9 @@ export async function addScrapeJobs( "messaging.destination.name": getScrapeQueue().name, "messaging.message.body.size": size, }, - }, + }, async (span) => { - await _addScrapeJobToBullMQ( + await _addCrawlScrapeJobToConcurrencyQueue( { ...job.data, sentry: { @@ -246,7 +289,41 @@ export async function addScrapeJobs( }, }, async (span) => { + const jobData = { + ...job.data, + sentry: { + trace: Sentry.spanToTraceHeader(span), + baggage: Sentry.spanToBaggageHeader(span), + size, + }, + }; + await _addScrapeJobToConcurrencyQueue( + jobData, + job.opts, + job.opts.jobId, + job.opts.priority, + ); + }, + ); + }), + ); + + await Promise.all( + addToBull.map(async (job) => { + const size = JSON.stringify(job.data).length; + return await Sentry.startSpan( + { + name: "Add scrape job", + op: "queue.publish", + attributes: { + "messaging.message.id": job.opts.jobId, + "messaging.destination.name": getScrapeQueue().name, + "messaging.message.body.size": size, + }, + }, + async (span) => { + await _addScrapeJobToBullMQ( { ...job.data, sentry: { diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 15af06bd..4e56394c 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -53,9 +53,13 @@ import { configDotenv } from "dotenv"; import { scrapeOptions } from "../controllers/v1/types"; import { cleanOldConcurrencyLimitEntries, + cleanOldCrawlConcurrencyLimitEntries, pushConcurrencyLimitActiveJob, + pushCrawlConcurrencyLimitActiveJob, removeConcurrencyLimitActiveJob, + removeCrawlConcurrencyLimitActiveJob, takeConcurrencyLimitedJob, + takeCrawlConcurrencyLimitedJob, } from "../lib/concurrency-limit"; import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings"; @@ -728,11 +732,37 @@ const workerFun = async ( runningJobs.delete(job.id); } + if (job.id && job.data.crawl_id && job.data.crawlerOptions?.delay) { + await removeCrawlConcurrencyLimitActiveJob(job.data.crawl_id, job.id); + cleanOldCrawlConcurrencyLimitEntries(job.data.crawl_id); + + const delayInSeconds = job.data.crawlerOptions.delay; + const delayInMs = delayInSeconds * 1000; + + await new Promise(resolve => setTimeout(resolve, delayInMs)); + + const nextCrawlJob = await takeCrawlConcurrencyLimitedJob(job.data.crawl_id); + if (nextCrawlJob !== null) { + await pushCrawlConcurrencyLimitActiveJob(job.data.crawl_id, nextCrawlJob.id, 60 * 1000); + + await queue.add( + nextCrawlJob.id, + { + ...nextCrawlJob.data, + }, + { + ...nextCrawlJob.opts, + jobId: nextCrawlJob.id, + priority: nextCrawlJob.priority, + }, + ); + } + } + if (job.id && job.data && job.data.team_id) { await removeConcurrencyLimitActiveJob(job.data.team_id, job.id); cleanOldConcurrencyLimitEntries(job.data.team_id); - // Queue up next job, if it exists // No need to check if we're under the limit here -- if the current job is finished, // we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG const nextJob = await takeConcurrencyLimitedJob(job.data.team_id);