diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 527bfc83..74e954cd 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -41,6 +41,12 @@ import { getRateLimiterPoints } from "./rate-limiter"; import { cleanOldConcurrencyLimitEntries, pushConcurrencyLimitActiveJob, removeConcurrencyLimitActiveJob, takeConcurrencyLimitedJob } from "../lib/concurrency-limit"; configDotenv(); +class RacedRedirectError extends Error { + constructor() { + super("Raced redirect error") + } +} + const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000; @@ -434,8 +440,17 @@ async function processJob(job: Job & { id: string }, token: string) { // Remove the old URL from visited unique due to checking for limit // Do not remove from :visited otherwise it will keep crawling the original URL (sourceURL) await redisConnection.srem("crawl:" + job.data.crawl_id + ":visited_unique", normalizeURL(doc.metadata.sourceURL, sc)); + + const p1 = generateURLPermutations(normalizeURL(doc.metadata.url, sc)); + const p2 = generateURLPermutations(normalizeURL(doc.metadata.sourceURL, sc)); + + // In crawls, we should only crawl a redirected page once, no matter how many; times it is redirected to, or if it's been discovered by the crawler before. + // This can prevent flakiness with race conditions. // Lock the new URL - await lockURL(job.data.crawl_id, sc, doc.metadata.url); + const lockRes = await lockURL(job.data.crawl_id, sc, doc.metadata.url); + if (job.data.crawlerOptions !== null && !lockRes && JSON.stringify(p1) !== JSON.stringify(p2)) { + throw new RacedRedirectError(); + } } logger.debug("Logging job to DB..."); @@ -455,7 +470,7 @@ async function processJob(job: Job & { id: string }, token: string) { }, true); logger.debug("Declaring job as done..."); - await addCrawlJobDone(job.data.crawl_id, job.id); + await addCrawlJobDone(job.data.crawl_id, job.id, true); if (job.data.crawlerOptions !== null) { if (!sc.cancelled) { @@ -520,7 +535,11 @@ async function processJob(job: Job & { id: string }, token: string) { } catch (error) { const isEarlyTimeout = error instanceof Error && error.message === "timeout"; - if (!isEarlyTimeout) { + if (isEarlyTimeout) { + logger.error(`🐂 Job timed out ${job.id}`); + } else if (error instanceof RacedRedirectError) { + logger.warn(`🐂 Job got redirect raced ${job.id}, silently failing`); + } else { logger.error(`🐂 Job errored ${job.id} - ${error}`, { error }); Sentry.captureException(error, { @@ -537,8 +556,6 @@ async function processJob(job: Job & { id: string }, token: string) { if (error.stack) { logger.error(error.stack); } - } else { - logger.error(`🐂 Job timed out ${job.id}`); } const data = { @@ -573,7 +590,7 @@ async function processJob(job: Job & { id: string }, token: string) { const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl; logger.debug("Declaring job as done..."); - await addCrawlJobDone(job.data.crawl_id, job.id); + await addCrawlJobDone(job.data.crawl_id, job.id, false); logger.debug("Logging job to DB..."); await logJob({