From 86e136beca5d09ba8d6d74f8ea3c02a8580f53f4 Mon Sep 17 00:00:00 2001 From: Gergo Moricz Date: Tue, 13 Aug 2024 20:51:43 +0200 Subject: [PATCH] feat: crawl to scrape conversion --- apps/api/src/controllers/crawl-cancel.ts | 30 +---- apps/api/src/controllers/crawl-status.ts | 67 +++++----- apps/api/src/controllers/crawl.ts | 117 ++++++++++++------ apps/api/src/lib/crawl-redis.ts | 69 +++++++++++ apps/api/src/scraper/WebScraper/crawler.ts | 112 +++++++++++------ .../api/src/scraper/WebScraper/utils/utils.ts | 4 +- apps/api/src/services/queue-worker.ts | 71 ++++++++--- apps/api/src/types.ts | 3 + 8 files changed, 313 insertions(+), 160 deletions(-) create mode 100644 apps/api/src/lib/crawl-redis.ts diff --git a/apps/api/src/controllers/crawl-cancel.ts b/apps/api/src/controllers/crawl-cancel.ts index 86a4c5b4..ed2c4166 100644 --- a/apps/api/src/controllers/crawl-cancel.ts +++ b/apps/api/src/controllers/crawl-cancel.ts @@ -1,10 +1,9 @@ import { Request, Response } from "express"; import { authenticateUser } from "./auth"; import { RateLimiterMode } from "../../src/types"; -import { getWebScraperQueue } from "../../src/services/queue-service"; import { supabase_service } from "../../src/services/supabase"; -import { billTeam } from "../../src/services/billing/credit_billing"; import { Logger } from "../../src/lib/logger"; +import { getCrawl, saveCrawl } from "../../src/lib/crawl-redis"; export async function crawlCancelController(req: Request, res: Response) { try { @@ -18,8 +17,9 @@ export async function crawlCancelController(req: Request, res: Response) { if (!success) { return res.status(status).json({ error }); } - const job = await getWebScraperQueue().getJob(req.params.jobId); - if (!job) { + + const sc = await getCrawl(req.params.jobId); + if (!sc) { return res.status(404).json({ error: "Job not found" }); } @@ -39,27 +39,9 @@ export async function crawlCancelController(req: Request, res: Response) { } } - const jobState = await job.getState(); - let progress = job.progress; - if(typeof progress !== 'object') { - progress = { - partialDocs: [] - } - } - const { - partialDocs = [] - } = progress as { partialDocs: any[] }; - - if (partialDocs && partialDocs.length > 0 && jobState === "active") { - Logger.info("Billing team for partial docs..."); - // Note: the credits that we will bill them here might be lower than the actual - // due to promises that are not yet resolved - await billTeam(team_id, partialDocs.length); - } - try { - await (await getWebScraperQueue().client).set("cancelled:" + job.id, "true", "EX", 60 * 60); - await job.discard(); + sc.cancelled = true; + await saveCrawl(req.params.jobId, sc); } catch (error) { Logger.error(error); } diff --git a/apps/api/src/controllers/crawl-status.ts b/apps/api/src/controllers/crawl-status.ts index bc6de593..b82a6088 100644 --- a/apps/api/src/controllers/crawl-status.ts +++ b/apps/api/src/controllers/crawl-status.ts @@ -1,10 +1,9 @@ import { Request, Response } from "express"; import { authenticateUser } from "./auth"; import { RateLimiterMode } from "../../src/types"; -import { addWebScraperJob } from "../../src/services/queue-jobs"; -import { getWebScraperQueue } from "../../src/services/queue-service"; -import { supabaseGetJobById } from "../../src/lib/supabase-jobs"; +import { getScrapeQueue } from "../../src/services/queue-service"; import { Logger } from "../../src/lib/logger"; +import { getCrawl, getCrawlJobs } from "../../src/lib/crawl-redis"; export async function crawlStatusController(req: Request, res: Response) { try { @@ -16,51 +15,41 @@ export async function crawlStatusController(req: Request, res: Response) { if (!success) { return res.status(status).json({ error }); } - const job = await getWebScraperQueue().getJob(req.params.jobId); - if (!job) { + // const job = await getWebScraperQueue().getJob(req.params.jobId); + // if (!job) { + // return res.status(404).json({ error: "Job not found" }); + // } + + // const isCancelled = await (await getWebScraperQueue().client).exists("cancelled:" + req.params.jobId); + + const sc = await getCrawl(req.params.jobId); + if (!sc) { return res.status(404).json({ error: "Job not found" }); } - const isCancelled = await (await getWebScraperQueue().client).exists("cancelled:" + req.params.jobId); + const jobIDs = await getCrawlJobs(req.params.jobId); - let progress = job.progress; - if(typeof progress !== 'object') { - progress = { - current: 0, - current_url: '', - total: 0, - current_step: '', - partialDocs: [] - } - } - const { - current = 0, - current_url = '', - total = 0, - current_step = '', - partialDocs = [] - } = progress as { current: number, current_url: string, total: number, current_step: string, partialDocs: any[] }; + // let data = job.returnvalue; + // if (process.env.USE_DB_AUTHENTICATION === "true") { + // const supabaseData = await supabaseGetJobById(req.params.jobId); - let data = job.returnvalue; - if (process.env.USE_DB_AUTHENTICATION === "true") { - const supabaseData = await supabaseGetJobById(req.params.jobId); + // if (supabaseData) { + // data = supabaseData.docs; + // } + // } - if (supabaseData) { - data = supabaseData.docs; - } - } + const jobs = await Promise.all(jobIDs.map(x => getScrapeQueue().getJob(x))); + const jobStatuses = await Promise.all(jobs.map(x => x.getState())); + const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active"; - const jobStatus = await job.getState(); + const data = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue); res.json({ - status: isCancelled ? "failed" : jobStatus, - // progress: job.progress(), - current, - current_url, - current_step, - total, - data: data && !isCancelled ? data : null, - partial_data: jobStatus == 'completed' && !isCancelled ? [] : partialDocs, + status: jobStatus, + current: jobStatuses.filter(x => x === "completed" || x === "failed").length, + total: jobs.length, + data: jobStatus === "completed" ? data : null, + partial_data: jobStatus === "completed" ? [] : data.filter(x => x !== null), }); } catch (error) { Logger.error(error); diff --git a/apps/api/src/controllers/crawl.ts b/apps/api/src/controllers/crawl.ts index 42593d83..a6971a22 100644 --- a/apps/api/src/controllers/crawl.ts +++ b/apps/api/src/controllers/crawl.ts @@ -4,7 +4,7 @@ import { billTeam } from "../../src/services/billing/credit_billing"; import { checkTeamCredits } from "../../src/services/billing/credit_billing"; import { authenticateUser } from "./auth"; import { RateLimiterMode } from "../../src/types"; -import { addWebScraperJob } from "../../src/services/queue-jobs"; +import { addScrapeJob, addWebScraperJob } from "../../src/services/queue-jobs"; import { isUrlBlocked } from "../../src/scraper/WebScraper/utils/blocklist"; import { logCrawl } from "../../src/services/logging/crawl_log"; import { validateIdempotencyKey } from "../../src/services/idempotency/validate"; @@ -12,6 +12,7 @@ import { createIdempotencyKey } from "../../src/services/idempotency/create"; import { defaultCrawlPageOptions, defaultCrawlerOptions, defaultOrigin } from "../../src/lib/default-values"; import { v4 as uuidv4 } from "uuid"; import { Logger } from "../../src/lib/logger"; +import { addCrawlJob, crawlToCrawler, lockURL, saveCrawl, StoredCrawl } from "../../src/lib/crawl-redis"; export async function crawlController(req: Request, res: Response) { try { @@ -62,47 +63,89 @@ export async function crawlController(req: Request, res: Response) { const crawlerOptions = { ...defaultCrawlerOptions, ...req.body.crawlerOptions }; const pageOptions = { ...defaultCrawlPageOptions, ...req.body.pageOptions }; - if (mode === "single_urls" && !url.includes(",")) { // NOTE: do we need this? - try { - const a = new WebScraperDataProvider(); - await a.setOptions({ - jobId: uuidv4(), - mode: "single_urls", - urls: [url], - crawlerOptions: { ...crawlerOptions, returnOnlyUrls: true }, - pageOptions: pageOptions, - }); + // if (mode === "single_urls" && !url.includes(",")) { // NOTE: do we need this? + // try { + // const a = new WebScraperDataProvider(); + // await a.setOptions({ + // jobId: uuidv4(), + // mode: "single_urls", + // urls: [url], + // crawlerOptions: { ...crawlerOptions, returnOnlyUrls: true }, + // pageOptions: pageOptions, + // }); - const docs = await a.getDocuments(false, (progress) => { - job.updateProgress({ - current: progress.current, - total: progress.total, - current_step: "SCRAPING", - current_url: progress.currentDocumentUrl, - }); + // const docs = await a.getDocuments(false, (progress) => { + // job.updateProgress({ + // current: progress.current, + // total: progress.total, + // current_step: "SCRAPING", + // current_url: progress.currentDocumentUrl, + // }); + // }); + // return res.json({ + // success: true, + // documents: docs, + // }); + // } catch (error) { + // Logger.error(error); + // return res.status(500).json({ error: error.message }); + // } + // } + + const id = uuidv4(); + + await logCrawl(id, team_id); + + let robots; + + try { + robots = await this.getRobotsTxt(); + } catch (_) {} + + const sc: StoredCrawl = { + originUrl: url, + crawlerOptions, + pageOptions, + team_id, + robots, + }; + + await saveCrawl(id, sc); + + const crawler = crawlToCrawler(id, sc); + + const sitemap = sc.crawlerOptions?.ignoreSitemap ? null : await crawler.tryGetSitemap(); + + if (sitemap !== null) { + for (const url of sitemap.map(x => x.url)) { + await lockURL(id, sc, url); + const job = await addScrapeJob({ + url, + mode: "single_urls", + crawlerOptions: crawlerOptions, + team_id: team_id, + pageOptions: pageOptions, + origin: req.body.origin ?? defaultOrigin, + crawl_id: id, + sitemapped: true, }); - return res.json({ - success: true, - documents: docs, - }); - } catch (error) { - Logger.error(error); - return res.status(500).json({ error: error.message }); + await addCrawlJob(id, job.id); } + } else { + await lockURL(id, sc, url); + const job = await addScrapeJob({ + url, + mode: "single_urls", + crawlerOptions: crawlerOptions, + team_id: team_id, + pageOptions: pageOptions, + origin: req.body.origin ?? defaultOrigin, + crawl_id: id, + }); + await addCrawlJob(id, job.id); } - const job = await addWebScraperJob({ - url: url, - mode: mode ?? "crawl", // fix for single urls not working - crawlerOptions: crawlerOptions, - team_id: team_id, - pageOptions: pageOptions, - origin: req.body.origin ?? defaultOrigin, - }); - - await logCrawl(job.id.toString(), team_id); - - res.json({ jobId: job.id }); + res.json({ jobId: id }); } catch (error) { Logger.error(error); return res.status(500).json({ error: error.message }); diff --git a/apps/api/src/lib/crawl-redis.ts b/apps/api/src/lib/crawl-redis.ts new file mode 100644 index 00000000..f77b66a5 --- /dev/null +++ b/apps/api/src/lib/crawl-redis.ts @@ -0,0 +1,69 @@ +import { WebCrawler } from "../scraper/WebScraper/crawler"; +import { redisConnection } from "../services/queue-service"; + +export type StoredCrawl = { + originUrl: string; + crawlerOptions: any; + pageOptions: any; + team_id: string; + robots?: string; + cancelled?: boolean; +}; + +export async function saveCrawl(id: string, crawl: StoredCrawl) { + await redisConnection.set("crawl:" + id, JSON.stringify(crawl)); + await redisConnection.expire("crawl:" + id, 24 * 60 * 60, "NX"); +} + +export async function getCrawl(id: string): Promise { + const x = await redisConnection.get("crawl:" + id); + + if (x === null) { + return null; + } + + return JSON.parse(x); +} + +export async function addCrawlJob(id: string, job_id: string) { + await redisConnection.sadd("crawl:" + id + ":jobs", job_id); + await redisConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60, "NX"); +} + +export async function getCrawlJobs(id: string): Promise { + return await redisConnection.smembers("crawl:" + id + ":jobs"); +} + +export async function lockURL(id: string, sc: StoredCrawl, url: string): Promise { + if (typeof sc.crawlerOptions?.limit === "number") { + if (await redisConnection.scard("crawl:" + id + ":visited") >= sc.crawlerOptions.limit) { + return false; + } + } + const res = (await redisConnection.sadd("crawl:" + id + ":visited", url)) !== 0 + await redisConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60, "NX"); + return res; +} + +export function crawlToCrawler(id: string, sc: StoredCrawl): WebCrawler { + const crawler = new WebCrawler({ + jobId: id, + initialUrl: sc.originUrl, + includes: sc.crawlerOptions?.includes ?? [], + excludes: sc.crawlerOptions?.excludes ?? [], + maxCrawledLinks: sc.crawlerOptions?.maxCrawledLinks ?? 1000, + maxCrawledDepth: sc.crawlerOptions?.maxDepth ?? 10, + limit: sc.crawlerOptions?.limit ?? 10000, + generateImgAltText: sc.crawlerOptions?.generateImgAltText ?? false, + allowBackwardCrawling: sc.crawlerOptions?.allowBackwardCrawling ?? false, + allowExternalContentLinks: sc.crawlerOptions?.allowExternalContentLinks ?? false, + }); + + if (sc.robots !== undefined) { + try { + crawler.importRobotsTxt(sc.robots); + } catch (_) {} + } + + return crawler; +} diff --git a/apps/api/src/scraper/WebScraper/crawler.ts b/apps/api/src/scraper/WebScraper/crawler.ts index fc0eee3e..79e4bf18 100644 --- a/apps/api/src/scraper/WebScraper/crawler.ts +++ b/apps/api/src/scraper/WebScraper/crawler.ts @@ -1,4 +1,4 @@ -import axios from "axios"; +import axios, { AxiosError } from "axios"; import cheerio, { load } from "cheerio"; import { URL } from "url"; import { getLinksFromSitemap } from "./sitemap"; @@ -22,7 +22,7 @@ export class WebCrawler { private crawledUrls: Map = new Map(); private limit: number; private robotsTxtUrl: string; - private robots: any; + public robots: any; private generateImgAltText: boolean; private allowBackwardCrawling: boolean; private allowExternalContentLinks: boolean; @@ -66,7 +66,7 @@ export class WebCrawler { this.allowExternalContentLinks = allowExternalContentLinks ?? false; } - private filterLinks(sitemapLinks: string[], limit: number, maxDepth: number): string[] { + public filterLinks(sitemapLinks: string[], limit: number, maxDepth: number): string[] { return sitemapLinks .filter((link) => { const url = new URL(link.trim(), this.baseUrl); @@ -130,6 +130,25 @@ export class WebCrawler { .slice(0, limit); } + public async getRobotsTxt(): Promise { + const response = await axios.get(this.robotsTxtUrl, { timeout: axiosTimeout }); + return response.data; + } + + public importRobotsTxt(txt: string) { + this.robots = robotsParser(this.robotsTxtUrl, txt); + } + + public async tryGetSitemap(): Promise<{ url: string; html: string; }[] | null> { + Logger.debug(`Fetching sitemap links from ${this.initialUrl}`); + const sitemapLinks = await this.tryFetchSitemapLinks(this.initialUrl); + if (sitemapLinks.length > 0) { + let filteredLinks = this.filterLinks(sitemapLinks, this.limit, this.maxCrawledDepth); + return filteredLinks.map(link => ({ url: link, html: "" })); + } + return null; + } + public async start( inProgress?: (progress: Progress) => void, pageOptions?: PageOptions, @@ -142,19 +161,17 @@ export class WebCrawler { Logger.debug(`Crawler starting with ${this.initialUrl}`); // Fetch and parse robots.txt try { - const response = await axios.get(this.robotsTxtUrl, { timeout: axiosTimeout }); - this.robots = robotsParser(this.robotsTxtUrl, response.data); + const txt = await this.getRobotsTxt(); + this.importRobotsTxt(txt); Logger.debug(`Crawler robots.txt fetched with ${this.robotsTxtUrl}`); } catch (error) { Logger.debug(`Failed to fetch robots.txt from ${this.robotsTxtUrl}`); } if (!crawlerOptions?.ignoreSitemap){ - Logger.debug(`Fetching sitemap links from ${this.initialUrl}`); - const sitemapLinks = await this.tryFetchSitemapLinks(this.initialUrl); - if (sitemapLinks.length > 0) { - let filteredLinks = this.filterLinks(sitemapLinks, limit, maxDepth); - return filteredLinks.map(link => ({ url: link, html: "" })); + const sm = await this.tryGetSitemap(); + if (sm !== null) { + return sm; } } @@ -241,6 +258,37 @@ export class WebCrawler { return Array.from(this.crawledUrls.entries()).map(([url, html]) => ({ url, html })); } + public filterURL(href: string, url: string): string | null { + let fullUrl = href; + if (!href.startsWith("http")) { + fullUrl = new URL(href, this.baseUrl).toString(); + } + const urlObj = new URL(fullUrl); + const path = urlObj.pathname; + + if (this.isInternalLink(fullUrl)) { // INTERNAL LINKS + if (this.isInternalLink(fullUrl) && + this.noSections(fullUrl) && + !this.matchesExcludes(path) && + this.isRobotsAllowed(fullUrl) + ) { + return fullUrl; + } + } else { // EXTERNAL LINKS + if ( + this.isInternalLink(url) && + this.allowExternalContentLinks && + !this.isSocialMediaOrEmail(fullUrl) && + !this.matchesExcludes(fullUrl, true) && + !this.isExternalMainPage(fullUrl) + ) { + return fullUrl; + } + } + + return null; + } + async crawl(url: string, pageOptions: PageOptions): Promise<{url: string, html: string, pageStatusCode?: number, pageError?: string}[]> { if (this.visited.has(url) || !this.robots.isAllowed(url, "FireCrawlAgent")) { return []; @@ -287,31 +335,9 @@ export class WebCrawler { $("a").each((_, element) => { const href = $(element).attr("href"); if (href) { - let fullUrl = href; - if (!href.startsWith("http")) { - fullUrl = new URL(href, this.baseUrl).toString(); - } - const urlObj = new URL(fullUrl); - const path = urlObj.pathname; - - if (this.isInternalLink(fullUrl)) { // INTERNAL LINKS - if (this.isInternalLink(fullUrl) && - this.noSections(fullUrl) && - !this.matchesExcludes(path) && - this.isRobotsAllowed(fullUrl) - ) { - links.push({ url: fullUrl, html: content, pageStatusCode, pageError }); - } - } else { // EXTERNAL LINKS - if ( - this.isInternalLink(url) && - this.allowExternalContentLinks && - !this.isSocialMediaOrEmail(fullUrl) && - !this.matchesExcludes(fullUrl, true) && - !this.isExternalMainPage(fullUrl) - ) { - links.push({ url: fullUrl, html: content, pageStatusCode, pageError }); - } + const u = this.filterURL(href, url); + if (u !== null) { + links.push({ url: u, html: content, pageStatusCode, pageError }); } } }); @@ -465,9 +491,13 @@ export class WebCrawler { } } catch (error) { Logger.debug(`Failed to fetch sitemap with axios from ${sitemapUrl}: ${error}`); - const response = await getLinksFromSitemap({ sitemapUrl, mode: 'fire-engine' }); - if (response) { - sitemapLinks = response; + if (error instanceof AxiosError && error.response?.status === 404) { + // ignore 404 + } else { + const response = await getLinksFromSitemap({ sitemapUrl, mode: 'fire-engine' }); + if (response) { + sitemapLinks = response; + } } } @@ -480,7 +510,11 @@ export class WebCrawler { } } catch (error) { Logger.debug(`Failed to fetch sitemap from ${baseUrlSitemap}: ${error}`); - sitemapLinks = await getLinksFromSitemap({ sitemapUrl: baseUrlSitemap, mode: 'fire-engine' }); + if (error instanceof AxiosError && error.response?.status === 404) { + // ignore 404 + } else { + sitemapLinks = await getLinksFromSitemap({ sitemapUrl: baseUrlSitemap, mode: 'fire-engine' }); + } } } diff --git a/apps/api/src/scraper/WebScraper/utils/utils.ts b/apps/api/src/scraper/WebScraper/utils/utils.ts index dd5906b0..872adc6e 100644 --- a/apps/api/src/scraper/WebScraper/utils/utils.ts +++ b/apps/api/src/scraper/WebScraper/utils/utils.ts @@ -41,10 +41,10 @@ export function extractLinks(html: string, baseUrl: string): string[] { links.push(href); } else if (href.startsWith('/')) { // Relative URL starting with '/', append to origin - links.push(`${origin}${href}`); + links.push(new URL(href, baseUrl).href); } else if (!href.startsWith('#') && !href.startsWith('mailto:')) { // Relative URL not starting with '/', append to base URL - links.push(`${baseUrl}/${href}`); + links.push(new URL(href, baseUrl).href); } else if (href.startsWith('mailto:')) { // mailto: links, add as is links.push(href); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index f7e82dc2..8f6b7093 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -18,6 +18,11 @@ import { ScrapeEvents } from "../lib/scrape-events"; import { Worker } from "bullmq"; import systemMonitor from "./system-monitor"; import { v4 as uuidv4 } from "uuid"; +import { WebCrawler } from "../scraper/WebScraper/crawler"; +import { getAdjustedMaxDepth } from "../scraper/WebScraper/utils/maxDepthUtils"; +import { addCrawlJob, crawlToCrawler, getCrawl, lockURL } from "../lib/crawl-redis"; +import { StoredCrawl } from "../lib/crawl-redis"; +import { addScrapeJob } from "./queue-jobs"; if (process.env.ENV === "production") { initSDK({ @@ -40,8 +45,6 @@ const cantAcceptConnectionInterval = const connectionMonitorInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10; const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20; -const wsq = getWebScraperQueue(); -const sq = getScrapeQueue(); const processJobInternal = async (token: string, job: Job) => { const extendLockInterval = setInterval(async () => { @@ -128,18 +131,10 @@ async function processJob(job: Job, token: string) { const end = Date.now(); const timeTakenInSeconds = (end - start) / 1000; - const isCancelled = await (await getWebScraperQueue().client).exists("cancelled:" + job.id); - - if (isCancelled) { - await job.discard(); - await job.moveToFailed(Error("Job cancelled by user"), job.token); - await job.discard(); - } - const data = { success, result: { - links: isCancelled ? [] : docs.map((doc) => { + links: docs.map((doc) => { return { content: doc, source: doc?.metadata?.sourceURL ?? doc?.url ?? "", @@ -147,20 +142,20 @@ async function processJob(job: Job, token: string) { }), }, project_id: job.data.project_id, - error: isCancelled ? "Job cancelled by user" : message /* etc... */, - docs: isCancelled ? [] : docs, + error: message /* etc... */, + docs, }; - if (job.data.mode === "crawl" && !isCancelled) { + if (job.data.mode === "crawl") { await callWebhook(job.data.team_id, job.id as string, data); } await logJob({ job_id: job.id as string, - success: success && !isCancelled, - message: isCancelled ? "Job cancelled by user" : message, - num_docs: isCancelled ? 0 : docs.length, - docs: isCancelled ? [] : docs, + success: success, + message: message, + num_docs: docs.length, + docs: docs, time_taken: timeTakenInSeconds, team_id: job.data.team_id, mode: job.data.mode, @@ -168,7 +163,44 @@ async function processJob(job: Job, token: string) { crawlerOptions: job.data.crawlerOptions, pageOptions: job.data.pageOptions, origin: job.data.origin, + crawl_id: job.data.crawl_id, }); + + if (job.data.crawl_id) { + if (!job.data.sitemapped) { + const sc = await getCrawl(job.data.crawl_id) as StoredCrawl; + + if (!sc.cancelled) { + const crawler = crawlToCrawler(job.data.crawl_id, sc); + + const links = crawler.filterLinks((data.docs[0].linksOnPage as string[]) + .map(href => crawler.filterURL(href, sc.originUrl)) + .filter(x => x !== null), + Infinity, + sc.crawlerOptions?.maxDepth ?? 10 + ) + + for (const link of links) { + if (await lockURL(job.data.crawl_id, sc, link)) { + console.log("Locked", link + "!"); + + const newJob = await addScrapeJob({ + url: link, + mode: "single_urls", + crawlerOptions: sc.crawlerOptions, + team_id: sc.team_id, + pageOptions: sc.pageOptions, + origin: job.data.origin, + crawl_id: job.data.crawl_id, + }); + + await addCrawlJob(job.data.crawl_id, newJob.id); + } + } + } + } + } + Logger.info(`🐂 Job done ${job.id}`); return data; } catch (error) { @@ -216,11 +248,12 @@ async function processJob(job: Job, token: string) { docs: [], time_taken: 0, team_id: job.data.team_id, - mode: "crawl", + mode: job.data.mode, url: job.data.url, crawlerOptions: job.data.crawlerOptions, pageOptions: job.data.pageOptions, origin: job.data.origin, + crawl_id: job.data.crawl_id, }); // done(null, data); return data; diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index 3b28b765..c02aba1c 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -28,6 +28,8 @@ export interface WebScraperOptions { extractorOptions?: any; team_id: string; origin?: string; + crawl_id?: string; + sitemapped?: boolean; } export interface RunWebScraperParams { @@ -65,6 +67,7 @@ export interface FirecrawlJob { extractor_options?: ExtractorOptions, num_tokens?: number, retry?: boolean, + crawl_id?: string; } export interface FirecrawlScrapeResponse {