Add crawl delay functionality with per-crawl concurrency limiting (FIR-249) (#1413)

* feat: Add crawl delay functionality with per-crawl concurrency limiting (FIR-249)

Co-Authored-By: mogery@sideguide.dev <mogery@sideguide.dev>

* fix: Skip crawl delay in test environment to fix CI tests

Co-Authored-By: mogery@sideguide.dev <mogery@sideguide.dev>

* refactor: Use crawlerOptions.delay instead of separate fields

Co-Authored-By: mogery@sideguide.dev <mogery@sideguide.dev>

* refactor: Rename crawlDelay to delay in type definitions for uniformity

Co-Authored-By: mogery@sideguide.dev <mogery@sideguide.dev>

* refactor: Fix crawl concurrency implementation based on PR feedback

Co-Authored-By: mogery@sideguide.dev <mogery@sideguide.dev>

* refactor: Simplify if/else structure in queue-jobs.ts based on PR feedback

Co-Authored-By: mogery@sideguide.dev <mogery@sideguide.dev>

* human fixes

* test: Add tests for crawl delay functionality

Co-Authored-By: mogery@sideguide.dev <mogery@sideguide.dev>

* test: Move crawl delay tests to existing crawl.test.ts file

Co-Authored-By: mogery@sideguide.dev <mogery@sideguide.dev>

* fix: Ensure sitemapped URLs are added to crawl concurrency queue and update crawl status endpoint

Co-Authored-By: mogery@sideguide.dev <mogery@sideguide.dev>

* dbg

* fix: Ensure jobs with crawl delay are properly added to BullMQ

Co-Authored-By: mogery@sideguide.dev <mogery@sideguide.dev>

* fix: Remove duplicate job addition to BullMQ for jobs with crawl delay

Co-Authored-By: mogery@sideguide.dev <mogery@sideguide.dev>

* fixes

* warning for devin

* test: Simplify crawl delay test as requested in PR feedback

Co-Authored-By: mogery@sideguide.dev <mogery@sideguide.dev>

* bump delay test timeout

* fix operation order

* bump further???

* fix: broken on self-host

* Update apps/api/src/services/queue-jobs.ts

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: import

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: mogery@sideguide.dev <mogery@sideguide.dev>
Co-authored-by: Gergő Móricz <mo.geryy@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
devin-ai-integration[bot] 2025-05-02 17:20:57 +02:00 committed by GitHub
parent 510171cabe
commit 411ecdf04b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 227 additions and 23 deletions

View File

@ -1,4 +1,5 @@
import { crawl } from "./lib";
import { describe, it, expect } from "@jest/globals";
describe("Crawl tests", () => {
it.concurrent("works", async () => {
@ -37,6 +38,14 @@ describe("Crawl tests", () => {
}
}, 120000);
it.concurrent("delay parameter works", async () => {
await crawl({
url: "https://firecrawl.dev",
limit: 3,
delay: 5,
});
}, 300000);
// TEMP: Flaky
// it.concurrent("discovers URLs properly when origin is not included", async () => {
// const res = await crawl({
@ -63,7 +72,6 @@ describe("Crawl tests", () => {
// maxDiscoveryDepth: 1,
// limit: 10,
// });
// expect(res.success).toBe(true);
// if (res.success) {
// expect(res.data.length).toBeGreaterThan(1);

View File

@ -22,7 +22,7 @@ import { configDotenv } from "dotenv";
import type { Job, JobState, Queue } from "bullmq";
import { logger } from "../../lib/logger";
import { supabase_rr_service, supabase_service } from "../../services/supabase";
import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit";
import { getConcurrencyLimitedJobs, getCrawlConcurrencyLimitedJobs } from "../../lib/concurrency-limit";
import { getJobFromGCS } from "../../lib/gcs-jobs";
configDotenv();
@ -157,7 +157,9 @@ export async function crawlStatusController(
),
);
const throttledJobsSet = await getConcurrencyLimitedJobs(req.auth.team_id);
const teamThrottledJobsSet = await getConcurrencyLimitedJobs(req.auth.team_id);
const crawlThrottledJobsSet = sc.crawlerOptions?.delay ? await getCrawlConcurrencyLimitedJobs(req.params.jobId) : new Set();
const throttledJobsSet = new Set([...teamThrottledJobsSet, ...crawlThrottledJobsSet]);
const validJobStatuses: [string, JobState | "unknown"][] = [];
const validJobIDs: string[] = [];

View File

@ -93,6 +93,10 @@ export async function crawlController(
try {
sc.robots = await crawler.getRobotsTxt(scrapeOptions.skipTlsVerification);
const robotsCrawlDelay = crawler.getRobotsCrawlDelay();
if (robotsCrawlDelay !== null && !sc.crawlerOptions.delay) {
sc.crawlerOptions.delay = robotsCrawlDelay;
}
} catch (e) {
logger.debug("Failed to get robots.txt (this is probably fine!)", {
error: e,

View File

@ -588,6 +588,7 @@ const crawlerOptions = z
deduplicateSimilarURLs: z.boolean().default(true),
ignoreQueryParameters: z.boolean().default(false),
regexOnFullURL: z.boolean().default(false),
delay: z.number().positive().optional(),
})
.strict(strictMessage);
@ -986,6 +987,7 @@ export function toLegacyCrawlerOptions(x: CrawlerOptions) {
regexOnFullURL: x.regexOnFullURL,
maxDiscoveryDepth: x.maxDiscoveryDepth,
currentDiscoveryDepth: 0,
delay: x.delay,
};
}
@ -1008,6 +1010,7 @@ export function fromLegacyCrawlerOptions(x: any, teamId: string): {
ignoreQueryParameters: x.ignoreQueryParameters,
regexOnFullURL: x.regexOnFullURL,
maxDiscoveryDepth: x.maxDiscoveryDepth,
delay: x.delay,
}),
internalOptions: {
v0CrawlOnlyUrls: x.returnOnlyUrls,

View File

@ -5,6 +5,9 @@ const constructKey = (team_id: string) => "concurrency-limiter:" + team_id;
const constructQueueKey = (team_id: string) =>
"concurrency-limit-queue:" + team_id;
const constructCrawlKey = (crawl_id: string) => "crawl-concurrency-limiter:" + crawl_id;
const constructCrawlQueueKey = (crawl_id: string) => "crawl-concurrency-limit-queue:" + crawl_id;
export async function cleanOldConcurrencyLimitEntries(
team_id: string,
now: number = Date.now(),
@ -82,3 +85,73 @@ export async function getConcurrencyQueueJobsCount(team_id: string): Promise<num
const count = await redisConnection.zcard(constructQueueKey(team_id));
return count;
}
export async function cleanOldCrawlConcurrencyLimitEntries(
crawl_id: string,
now: number = Date.now(),
) {
await redisConnection.zremrangebyscore(constructCrawlKey(crawl_id), -Infinity, now);
}
export async function getCrawlConcurrencyLimitActiveJobs(
crawl_id: string,
now: number = Date.now(),
): Promise<string[]> {
return await redisConnection.zrangebyscore(
constructCrawlKey(crawl_id),
now,
Infinity,
);
}
export async function pushCrawlConcurrencyLimitActiveJob(
crawl_id: string,
id: string,
timeout: number,
now: number = Date.now(),
) {
await redisConnection.zadd(
constructCrawlKey(crawl_id),
now + timeout,
id,
);
}
export async function removeCrawlConcurrencyLimitActiveJob(
crawl_id: string,
id: string,
) {
await redisConnection.zrem(constructCrawlKey(crawl_id), id);
}
export async function takeCrawlConcurrencyLimitedJob(
crawl_id: string,
): Promise<ConcurrencyLimitedJob | null> {
const res = await redisConnection.zmpop(1, constructCrawlQueueKey(crawl_id), "MIN");
if (res === null || res === undefined) {
return null;
}
return JSON.parse(res[1][0][0]);
}
export async function pushCrawlConcurrencyLimitedJob(
crawl_id: string,
job: ConcurrencyLimitedJob,
) {
await redisConnection.zadd(
constructCrawlQueueKey(crawl_id),
job.priority ?? 1,
JSON.stringify(job),
);
}
export async function getCrawlConcurrencyLimitedJobs(
crawl_id: string,
) {
return new Set((await redisConnection.zrange(constructCrawlQueueKey(crawl_id), 0, -1)).map(x => JSON.parse(x).id));
}
export async function getCrawlConcurrencyQueueJobsCount(crawl_id: string): Promise<number> {
const count = await redisConnection.zcard(constructCrawlQueueKey(crawl_id));
return count;
}

View File

@ -23,6 +23,7 @@ export class WebCrawler {
private limit: number;
private robotsTxtUrl: string;
public robots: Robot;
private robotsCrawlDelay: number | null = null;
private generateImgAltText: boolean;
private allowBackwardCrawling: boolean;
private allowExternalContentLinks: boolean;
@ -243,6 +244,12 @@ export class WebCrawler {
public importRobotsTxt(txt: string) {
this.robots = robotsParser(this.robotsTxtUrl, txt);
const delay = this.robots.getCrawlDelay("FireCrawlAgent") || this.robots.getCrawlDelay("FirecrawlAgent");
this.robotsCrawlDelay = delay !== undefined ? delay : null;
}
public getRobotsCrawlDelay(): number | null {
return this.robotsCrawlDelay;
}
public async tryGetSitemap(
@ -268,7 +275,7 @@ export class WebCrawler {
const _urlsHandler = async (urls: string[]) => {
if (fromMap && onlySitemap) {
return urlsHandler(urls);
return await urlsHandler(urls);
} else {
let filteredLinks = this.filterLinks(
[...new Set(urls)].filter(x => this.filterURL(x, this.initialUrl) !== null),
@ -295,7 +302,7 @@ export class WebCrawler {
"NX",
);
if (uniqueURLs.length > 0) {
return urlsHandler(uniqueURLs);
return await urlsHandler(uniqueURLs);
}
}
};

View File

@ -4,10 +4,14 @@ import { NotificationType, RateLimiterMode, WebScraperOptions } from "../types";
import * as Sentry from "@sentry/node";
import {
cleanOldConcurrencyLimitEntries,
cleanOldCrawlConcurrencyLimitEntries,
getConcurrencyLimitActiveJobs,
getConcurrencyQueueJobsCount,
getCrawlConcurrencyQueueJobsCount,
pushConcurrencyLimitActiveJob,
pushConcurrencyLimitedJob,
pushCrawlConcurrencyLimitActiveJob,
pushCrawlConcurrencyLimitedJob,
} from "../lib/concurrency-limit";
import { logger } from "../lib/logger";
import { sendNotificationWithCustomDays } from './notification/email_notification';
@ -45,6 +49,25 @@ async function _addScrapeJobToConcurrencyQueue(
});
}
async function _addCrawlScrapeJobToConcurrencyQueue(
webScraperOptions: any,
options: any,
jobId: string,
jobPriority: number,
) {
await pushCrawlConcurrencyLimitedJob(webScraperOptions.crawl_id, {
id: jobId,
data: webScraperOptions,
opts: {
...options,
priority: jobPriority,
jobId: jobId,
},
priority: jobPriority,
});
// NEVER ADD THESE TO BULLMQ!!! THEY ARE ADDED IN QUEUE-WORKER!!! SHOOOOO!!! - mogery
}
export async function _addScrapeJobToBullMQ(
webScraperOptions: any,
options: any,
@ -55,8 +78,12 @@ export async function _addScrapeJobToBullMQ(
webScraperOptions &&
webScraperOptions.team_id
) {
if (webScraperOptions.crawl_id && webScraperOptions.crawlerOptions?.delay) {
await pushCrawlConcurrencyLimitActiveJob(webScraperOptions.crawl_id, jobId, 60 * 1000);
} else {
await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId, 60 * 1000); // 60s default timeout
}
}
await getScrapeQueue().add(jobId, webScraperOptions, {
...options,
@ -71,6 +98,18 @@ async function addScrapeJobRaw(
jobId: string,
jobPriority: number,
) {
const hasCrawlDelay = webScraperOptions.crawl_id && webScraperOptions.crawlerOptions?.delay;
if (hasCrawlDelay) {
await _addCrawlScrapeJobToConcurrencyQueue(
webScraperOptions,
options,
jobId,
jobPriority
);
return;
}
let concurrencyLimited = false;
let currentActiveConcurrency = 0;
let maxConcurrency = 0;
@ -167,16 +206,19 @@ export async function addScrapeJobs(
) {
if (jobs.length === 0) return true;
const addToCCQ = jobs.filter(job => job.data.crawlerOptions?.delay);
const dontAddToCCQ = jobs.filter(job => !job.data.crawlerOptions?.delay);
let countCanBeDirectlyAdded = Infinity;
let currentActiveConcurrency = 0;
let maxConcurrency = 0;
if (jobs[0].data && jobs[0].data.team_id) {
if (dontAddToCCQ[0] && dontAddToCCQ[0].data && dontAddToCCQ[0].data.team_id) {
const now = Date.now();
maxConcurrency = (await getACUCTeam(jobs[0].data.team_id, false, true, jobs[0].data.from_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
cleanOldConcurrencyLimitEntries(jobs[0].data.team_id, now);
maxConcurrency = (await getACUCTeam(dontAddToCCQ[0].data.team_id, false, true, dontAddToCCQ[0].data.from_extract ? RateLimiterMode.Extract : RateLimiterMode.Crawl))?.concurrency ?? 2;
cleanOldConcurrencyLimitEntries(dontAddToCCQ[0].data.team_id, now);
currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(jobs[0].data.team_id, now)).length;
currentActiveConcurrency = (await getConcurrencyLimitActiveJobs(dontAddToCCQ[0].data.team_id, now)).length;
countCanBeDirectlyAdded = Math.max(
maxConcurrency - currentActiveConcurrency,
@ -184,24 +226,25 @@ export async function addScrapeJobs(
);
}
const addToBull = jobs.slice(0, countCanBeDirectlyAdded);
const addToCQ = jobs.slice(countCanBeDirectlyAdded);
const addToBull = dontAddToCCQ.slice(0, countCanBeDirectlyAdded);
const addToCQ = dontAddToCCQ.slice(countCanBeDirectlyAdded);
// equals 2x the max concurrency
if(addToCQ.length > maxConcurrency) {
logger.info("Concurrency limited 2x (multiple) - ", "Concurrency queue jobs: ", addToCQ.length, "Max concurrency: ", maxConcurrency, "Team ID: ", jobs[0].data.team_id);
logger.info(`Concurrency limited 2x (multiple) - Concurrency queue jobs: ${addToCQ.length} Max concurrency: ${maxConcurrency} Team ID: ${jobs[0].data.team_id}`);
// Only send notification if it's not a crawl or batch scrape
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(jobs[0].data.team_id);
if (!isCrawlOrBatchScrape(dontAddToCCQ[0].data)) {
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(dontAddToCCQ[0].data.team_id);
if (shouldSendNotification) {
sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
sendNotificationWithCustomDays(dontAddToCCQ[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
logger.error("Error sending notification (concurrency limit reached): ", error);
});
}
}
}
await Promise.all(
addToBull.map(async (job) => {
addToCCQ.map(async (job) => {
const size = JSON.stringify(job.data).length;
return await Sentry.startSpan(
{
@ -214,7 +257,7 @@ export async function addScrapeJobs(
},
},
async (span) => {
await _addScrapeJobToBullMQ(
await _addCrawlScrapeJobToConcurrencyQueue(
{
...job.data,
sentry: {
@ -246,7 +289,41 @@ export async function addScrapeJobs(
},
},
async (span) => {
const jobData = {
...job.data,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
};
await _addScrapeJobToConcurrencyQueue(
jobData,
job.opts,
job.opts.jobId,
job.opts.priority,
);
},
);
}),
);
await Promise.all(
addToBull.map(async (job) => {
const size = JSON.stringify(job.data).length;
return await Sentry.startSpan(
{
name: "Add scrape job",
op: "queue.publish",
attributes: {
"messaging.message.id": job.opts.jobId,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": size,
},
},
async (span) => {
await _addScrapeJobToBullMQ(
{
...job.data,
sentry: {

View File

@ -53,9 +53,13 @@ import { configDotenv } from "dotenv";
import { scrapeOptions } from "../controllers/v1/types";
import {
cleanOldConcurrencyLimitEntries,
cleanOldCrawlConcurrencyLimitEntries,
pushConcurrencyLimitActiveJob,
pushCrawlConcurrencyLimitActiveJob,
removeConcurrencyLimitActiveJob,
removeCrawlConcurrencyLimitActiveJob,
takeConcurrencyLimitedJob,
takeCrawlConcurrencyLimitedJob,
} from "../lib/concurrency-limit";
import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist";
import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings";
@ -728,11 +732,37 @@ const workerFun = async (
runningJobs.delete(job.id);
}
if (job.id && job.data.crawl_id && job.data.crawlerOptions?.delay) {
await removeCrawlConcurrencyLimitActiveJob(job.data.crawl_id, job.id);
cleanOldCrawlConcurrencyLimitEntries(job.data.crawl_id);
const delayInSeconds = job.data.crawlerOptions.delay;
const delayInMs = delayInSeconds * 1000;
await new Promise(resolve => setTimeout(resolve, delayInMs));
const nextCrawlJob = await takeCrawlConcurrencyLimitedJob(job.data.crawl_id);
if (nextCrawlJob !== null) {
await pushCrawlConcurrencyLimitActiveJob(job.data.crawl_id, nextCrawlJob.id, 60 * 1000);
await queue.add(
nextCrawlJob.id,
{
...nextCrawlJob.data,
},
{
...nextCrawlJob.opts,
jobId: nextCrawlJob.id,
priority: nextCrawlJob.priority,
},
);
}
}
if (job.id && job.data && job.data.team_id) {
await removeConcurrencyLimitActiveJob(job.data.team_id, job.id);
cleanOldConcurrencyLimitEntries(job.data.team_id);
// Queue up next job, if it exists
// No need to check if we're under the limit here -- if the current job is finished,
// we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG
const nextJob = await takeConcurrencyLimitedJob(job.data.team_id);