mirror of
https://git.mirrors.martin98.com/https://github.com/mendableai/firecrawl
synced 2025-08-06 05:58:35 +08:00
feat(queue-worker/crawl): solidify redirect behaviour
This commit is contained in:
parent
ce460a3a56
commit
d9e017e5e2
@ -41,6 +41,12 @@ import { getRateLimiterPoints } from "./rate-limiter";
|
|||||||
import { cleanOldConcurrencyLimitEntries, pushConcurrencyLimitActiveJob, removeConcurrencyLimitActiveJob, takeConcurrencyLimitedJob } from "../lib/concurrency-limit";
|
import { cleanOldConcurrencyLimitEntries, pushConcurrencyLimitActiveJob, removeConcurrencyLimitActiveJob, takeConcurrencyLimitedJob } from "../lib/concurrency-limit";
|
||||||
configDotenv();
|
configDotenv();
|
||||||
|
|
||||||
|
class RacedRedirectError extends Error {
|
||||||
|
constructor() {
|
||||||
|
super("Raced redirect error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
|
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
|
||||||
|
|
||||||
const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000;
|
const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000;
|
||||||
@ -434,8 +440,17 @@ async function processJob(job: Job & { id: string }, token: string) {
|
|||||||
// Remove the old URL from visited unique due to checking for limit
|
// Remove the old URL from visited unique due to checking for limit
|
||||||
// Do not remove from :visited otherwise it will keep crawling the original URL (sourceURL)
|
// Do not remove from :visited otherwise it will keep crawling the original URL (sourceURL)
|
||||||
await redisConnection.srem("crawl:" + job.data.crawl_id + ":visited_unique", normalizeURL(doc.metadata.sourceURL, sc));
|
await redisConnection.srem("crawl:" + job.data.crawl_id + ":visited_unique", normalizeURL(doc.metadata.sourceURL, sc));
|
||||||
|
|
||||||
|
const p1 = generateURLPermutations(normalizeURL(doc.metadata.url, sc));
|
||||||
|
const p2 = generateURLPermutations(normalizeURL(doc.metadata.sourceURL, sc));
|
||||||
|
|
||||||
|
// In crawls, we should only crawl a redirected page once, no matter how many; times it is redirected to, or if it's been discovered by the crawler before.
|
||||||
|
// This can prevent flakiness with race conditions.
|
||||||
// Lock the new URL
|
// Lock the new URL
|
||||||
await lockURL(job.data.crawl_id, sc, doc.metadata.url);
|
const lockRes = await lockURL(job.data.crawl_id, sc, doc.metadata.url);
|
||||||
|
if (job.data.crawlerOptions !== null && !lockRes && JSON.stringify(p1) !== JSON.stringify(p2)) {
|
||||||
|
throw new RacedRedirectError();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug("Logging job to DB...");
|
logger.debug("Logging job to DB...");
|
||||||
@ -455,7 +470,7 @@ async function processJob(job: Job & { id: string }, token: string) {
|
|||||||
}, true);
|
}, true);
|
||||||
|
|
||||||
logger.debug("Declaring job as done...");
|
logger.debug("Declaring job as done...");
|
||||||
await addCrawlJobDone(job.data.crawl_id, job.id);
|
await addCrawlJobDone(job.data.crawl_id, job.id, true);
|
||||||
|
|
||||||
if (job.data.crawlerOptions !== null) {
|
if (job.data.crawlerOptions !== null) {
|
||||||
if (!sc.cancelled) {
|
if (!sc.cancelled) {
|
||||||
@ -520,7 +535,11 @@ async function processJob(job: Job & { id: string }, token: string) {
|
|||||||
} catch (error) {
|
} catch (error) {
|
||||||
const isEarlyTimeout = error instanceof Error && error.message === "timeout";
|
const isEarlyTimeout = error instanceof Error && error.message === "timeout";
|
||||||
|
|
||||||
if (!isEarlyTimeout) {
|
if (isEarlyTimeout) {
|
||||||
|
logger.error(`🐂 Job timed out ${job.id}`);
|
||||||
|
} else if (error instanceof RacedRedirectError) {
|
||||||
|
logger.warn(`🐂 Job got redirect raced ${job.id}, silently failing`);
|
||||||
|
} else {
|
||||||
logger.error(`🐂 Job errored ${job.id} - ${error}`, { error });
|
logger.error(`🐂 Job errored ${job.id} - ${error}`, { error });
|
||||||
|
|
||||||
Sentry.captureException(error, {
|
Sentry.captureException(error, {
|
||||||
@ -537,8 +556,6 @@ async function processJob(job: Job & { id: string }, token: string) {
|
|||||||
if (error.stack) {
|
if (error.stack) {
|
||||||
logger.error(error.stack);
|
logger.error(error.stack);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
logger.error(`🐂 Job timed out ${job.id}`);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const data = {
|
const data = {
|
||||||
@ -573,7 +590,7 @@ async function processJob(job: Job & { id: string }, token: string) {
|
|||||||
const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl;
|
const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl;
|
||||||
|
|
||||||
logger.debug("Declaring job as done...");
|
logger.debug("Declaring job as done...");
|
||||||
await addCrawlJobDone(job.data.crawl_id, job.id);
|
await addCrawlJobDone(job.data.crawl_id, job.id, false);
|
||||||
|
|
||||||
logger.debug("Logging job to DB...");
|
logger.debug("Logging job to DB...");
|
||||||
await logJob({
|
await logJob({
|
||||||
|
Loading…
x
Reference in New Issue
Block a user