diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index a4a06dcc..c73e33b4 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -86,6 +86,8 @@ import { cacheableLookup } from "../scraper/scrapeURL/lib/cacheableLookup"; import { robustFetch } from "../scraper/scrapeURL/lib/fetch"; import { RateLimiterMode } from "../types"; import { redisEvictConnection } from "./redis"; +import { generateURLSplits, hashURL, index_supabase_service } from "./index"; +import { val } from "node_modules/cheerio/lib/api/attributes"; configDotenv(); @@ -1027,6 +1029,87 @@ async function processKickoffJob(job: Job & { id: string }, token: string) { }); } + const trimmedURL = new URL(job.data.url); + trimmedURL.search = ""; + + const urlSplits = generateURLSplits(trimmedURL.href).map(x => hashURL(x)); + + const index = sc.crawlerOptions.ignoreSitemap + ? [] + : sc.crawlerOptions.allowBackwardCrawling + ? (await index_supabase_service + .from("index") + .select("resolved_url") + .eq("url_split_0_hash", urlSplits[0]) + .gte("created_at", new Date(Date.now() - 2 * 24 * 60 * 60 * 1000).toISOString()) + .limit(sc.crawlerOptions.limit ?? 100)).data ?? [] + : (await index_supabase_service + .from("index") + .select("resolved_url") + .eq("url_split_" + (urlSplits.length - 1) + "_hash", urlSplits[urlSplits.length - 1]) + .gte("created_at", new Date(Date.now() - 2 * 24 * 60 * 60 * 1000).toISOString()) + .limit(sc.crawlerOptions.limit ?? 100)).data ?? []; + + const validIndexLinks = crawler.filterLinks( + [...new Set(index.map(x => x.resolved_url))].filter(x => crawler.filterURL(x, trimmedURL.href) !== null), + sc.crawlerOptions.limit ?? 100, + sc.crawlerOptions.maxDepth ?? 10, + false, + ); + + if (validIndexLinks.length > 0) { + logger.debug("Using index links of length " + validIndexLinks.length, { + indexLinksLength: validIndexLinks.length, + }); + + let jobPriority = await getJobPriority({ + team_id: job.data.team_id, + basePriority: 21, + }); + logger.debug("Using job priority " + jobPriority, { jobPriority }); + + const jobs = validIndexLinks.map((url) => { + const uuid = uuidv4(); + return { + name: uuid, + data: { + url, + mode: "single_urls" as const, + team_id: job.data.team_id, + crawlerOptions: job.data.crawlerOptions, + scrapeOptions: job.data.scrapeOptions, + internalOptions: sc.internalOptions, + origin: job.data.origin, + crawl_id: job.data.crawl_id, + sitemapped: true, + webhook: job.data.webhook, + v1: job.data.v1, + }, + opts: { + jobId: uuid, + priority: 20, + }, + }; + }); + + logger.debug("Locking URLs..."); + const lockedIds = await lockURLsIndividually( + job.data.crawl_id, + sc, + jobs.map((x) => ({ id: x.opts.jobId, url: x.data.url })), + ); + const lockedJobs = jobs.filter((x) => + lockedIds.find((y) => y.id === x.opts.jobId), + ); + logger.debug("Adding scrape jobs to Redis..."); + await addCrawlJobs( + job.data.crawl_id, + lockedJobs.map((x) => x.opts.jobId), + ); + logger.debug("Adding scrape jobs to BullMQ..."); + await addScrapeJobs(lockedJobs); + } + logger.debug("Done queueing jobs!"); await finishCrawlKickoff(job.data.crawl_id);