feat: crawl to scrape conversion

This commit is contained in:
Gergo Moricz 2024-08-13 20:51:43 +02:00
parent e28c415cf4
commit 86e136beca
8 changed files with 313 additions and 160 deletions

View File

@ -1,10 +1,9 @@
import { Request, Response } from "express"; import { Request, Response } from "express";
import { authenticateUser } from "./auth"; import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../../src/types"; import { RateLimiterMode } from "../../src/types";
import { getWebScraperQueue } from "../../src/services/queue-service";
import { supabase_service } from "../../src/services/supabase"; import { supabase_service } from "../../src/services/supabase";
import { billTeam } from "../../src/services/billing/credit_billing";
import { Logger } from "../../src/lib/logger"; import { Logger } from "../../src/lib/logger";
import { getCrawl, saveCrawl } from "../../src/lib/crawl-redis";
export async function crawlCancelController(req: Request, res: Response) { export async function crawlCancelController(req: Request, res: Response) {
try { try {
@ -18,8 +17,9 @@ export async function crawlCancelController(req: Request, res: Response) {
if (!success) { if (!success) {
return res.status(status).json({ error }); return res.status(status).json({ error });
} }
const job = await getWebScraperQueue().getJob(req.params.jobId);
if (!job) { const sc = await getCrawl(req.params.jobId);
if (!sc) {
return res.status(404).json({ error: "Job not found" }); return res.status(404).json({ error: "Job not found" });
} }
@ -39,27 +39,9 @@ export async function crawlCancelController(req: Request, res: Response) {
} }
} }
const jobState = await job.getState();
let progress = job.progress;
if(typeof progress !== 'object') {
progress = {
partialDocs: []
}
}
const {
partialDocs = []
} = progress as { partialDocs: any[] };
if (partialDocs && partialDocs.length > 0 && jobState === "active") {
Logger.info("Billing team for partial docs...");
// Note: the credits that we will bill them here might be lower than the actual
// due to promises that are not yet resolved
await billTeam(team_id, partialDocs.length);
}
try { try {
await (await getWebScraperQueue().client).set("cancelled:" + job.id, "true", "EX", 60 * 60); sc.cancelled = true;
await job.discard(); await saveCrawl(req.params.jobId, sc);
} catch (error) { } catch (error) {
Logger.error(error); Logger.error(error);
} }

View File

@ -1,10 +1,9 @@
import { Request, Response } from "express"; import { Request, Response } from "express";
import { authenticateUser } from "./auth"; import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../../src/types"; import { RateLimiterMode } from "../../src/types";
import { addWebScraperJob } from "../../src/services/queue-jobs"; import { getScrapeQueue } from "../../src/services/queue-service";
import { getWebScraperQueue } from "../../src/services/queue-service";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
import { Logger } from "../../src/lib/logger"; import { Logger } from "../../src/lib/logger";
import { getCrawl, getCrawlJobs } from "../../src/lib/crawl-redis";
export async function crawlStatusController(req: Request, res: Response) { export async function crawlStatusController(req: Request, res: Response) {
try { try {
@ -16,51 +15,41 @@ export async function crawlStatusController(req: Request, res: Response) {
if (!success) { if (!success) {
return res.status(status).json({ error }); return res.status(status).json({ error });
} }
const job = await getWebScraperQueue().getJob(req.params.jobId); // const job = await getWebScraperQueue().getJob(req.params.jobId);
if (!job) { // if (!job) {
// return res.status(404).json({ error: "Job not found" });
// }
// const isCancelled = await (await getWebScraperQueue().client).exists("cancelled:" + req.params.jobId);
const sc = await getCrawl(req.params.jobId);
if (!sc) {
return res.status(404).json({ error: "Job not found" }); return res.status(404).json({ error: "Job not found" });
} }
const isCancelled = await (await getWebScraperQueue().client).exists("cancelled:" + req.params.jobId); const jobIDs = await getCrawlJobs(req.params.jobId);
let progress = job.progress; // let data = job.returnvalue;
if(typeof progress !== 'object') { // if (process.env.USE_DB_AUTHENTICATION === "true") {
progress = { // const supabaseData = await supabaseGetJobById(req.params.jobId);
current: 0,
current_url: '',
total: 0,
current_step: '',
partialDocs: []
}
}
const {
current = 0,
current_url = '',
total = 0,
current_step = '',
partialDocs = []
} = progress as { current: number, current_url: string, total: number, current_step: string, partialDocs: any[] };
let data = job.returnvalue; // if (supabaseData) {
if (process.env.USE_DB_AUTHENTICATION === "true") { // data = supabaseData.docs;
const supabaseData = await supabaseGetJobById(req.params.jobId); // }
// }
if (supabaseData) { const jobs = await Promise.all(jobIDs.map(x => getScrapeQueue().getJob(x)));
data = supabaseData.docs; const jobStatuses = await Promise.all(jobs.map(x => x.getState()));
} const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active";
}
const jobStatus = await job.getState(); const data = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue);
res.json({ res.json({
status: isCancelled ? "failed" : jobStatus, status: jobStatus,
// progress: job.progress(), current: jobStatuses.filter(x => x === "completed" || x === "failed").length,
current, total: jobs.length,
current_url, data: jobStatus === "completed" ? data : null,
current_step, partial_data: jobStatus === "completed" ? [] : data.filter(x => x !== null),
total,
data: data && !isCancelled ? data : null,
partial_data: jobStatus == 'completed' && !isCancelled ? [] : partialDocs,
}); });
} catch (error) { } catch (error) {
Logger.error(error); Logger.error(error);

View File

@ -4,7 +4,7 @@ import { billTeam } from "../../src/services/billing/credit_billing";
import { checkTeamCredits } from "../../src/services/billing/credit_billing"; import { checkTeamCredits } from "../../src/services/billing/credit_billing";
import { authenticateUser } from "./auth"; import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../../src/types"; import { RateLimiterMode } from "../../src/types";
import { addWebScraperJob } from "../../src/services/queue-jobs"; import { addScrapeJob, addWebScraperJob } from "../../src/services/queue-jobs";
import { isUrlBlocked } from "../../src/scraper/WebScraper/utils/blocklist"; import { isUrlBlocked } from "../../src/scraper/WebScraper/utils/blocklist";
import { logCrawl } from "../../src/services/logging/crawl_log"; import { logCrawl } from "../../src/services/logging/crawl_log";
import { validateIdempotencyKey } from "../../src/services/idempotency/validate"; import { validateIdempotencyKey } from "../../src/services/idempotency/validate";
@ -12,6 +12,7 @@ import { createIdempotencyKey } from "../../src/services/idempotency/create";
import { defaultCrawlPageOptions, defaultCrawlerOptions, defaultOrigin } from "../../src/lib/default-values"; import { defaultCrawlPageOptions, defaultCrawlerOptions, defaultOrigin } from "../../src/lib/default-values";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { Logger } from "../../src/lib/logger"; import { Logger } from "../../src/lib/logger";
import { addCrawlJob, crawlToCrawler, lockURL, saveCrawl, StoredCrawl } from "../../src/lib/crawl-redis";
export async function crawlController(req: Request, res: Response) { export async function crawlController(req: Request, res: Response) {
try { try {
@ -62,47 +63,89 @@ export async function crawlController(req: Request, res: Response) {
const crawlerOptions = { ...defaultCrawlerOptions, ...req.body.crawlerOptions }; const crawlerOptions = { ...defaultCrawlerOptions, ...req.body.crawlerOptions };
const pageOptions = { ...defaultCrawlPageOptions, ...req.body.pageOptions }; const pageOptions = { ...defaultCrawlPageOptions, ...req.body.pageOptions };
if (mode === "single_urls" && !url.includes(",")) { // NOTE: do we need this? // if (mode === "single_urls" && !url.includes(",")) { // NOTE: do we need this?
// try {
// const a = new WebScraperDataProvider();
// await a.setOptions({
// jobId: uuidv4(),
// mode: "single_urls",
// urls: [url],
// crawlerOptions: { ...crawlerOptions, returnOnlyUrls: true },
// pageOptions: pageOptions,
// });
// const docs = await a.getDocuments(false, (progress) => {
// job.updateProgress({
// current: progress.current,
// total: progress.total,
// current_step: "SCRAPING",
// current_url: progress.currentDocumentUrl,
// });
// });
// return res.json({
// success: true,
// documents: docs,
// });
// } catch (error) {
// Logger.error(error);
// return res.status(500).json({ error: error.message });
// }
// }
const id = uuidv4();
await logCrawl(id, team_id);
let robots;
try { try {
const a = new WebScraperDataProvider(); robots = await this.getRobotsTxt();
await a.setOptions({ } catch (_) {}
jobId: uuidv4(),
const sc: StoredCrawl = {
originUrl: url,
crawlerOptions,
pageOptions,
team_id,
robots,
};
await saveCrawl(id, sc);
const crawler = crawlToCrawler(id, sc);
const sitemap = sc.crawlerOptions?.ignoreSitemap ? null : await crawler.tryGetSitemap();
if (sitemap !== null) {
for (const url of sitemap.map(x => x.url)) {
await lockURL(id, sc, url);
const job = await addScrapeJob({
url,
mode: "single_urls", mode: "single_urls",
urls: [url],
crawlerOptions: { ...crawlerOptions, returnOnlyUrls: true },
pageOptions: pageOptions,
});
const docs = await a.getDocuments(false, (progress) => {
job.updateProgress({
current: progress.current,
total: progress.total,
current_step: "SCRAPING",
current_url: progress.currentDocumentUrl,
});
});
return res.json({
success: true,
documents: docs,
});
} catch (error) {
Logger.error(error);
return res.status(500).json({ error: error.message });
}
}
const job = await addWebScraperJob({
url: url,
mode: mode ?? "crawl", // fix for single urls not working
crawlerOptions: crawlerOptions, crawlerOptions: crawlerOptions,
team_id: team_id, team_id: team_id,
pageOptions: pageOptions, pageOptions: pageOptions,
origin: req.body.origin ?? defaultOrigin, origin: req.body.origin ?? defaultOrigin,
crawl_id: id,
sitemapped: true,
}); });
await addCrawlJob(id, job.id);
}
} else {
await lockURL(id, sc, url);
const job = await addScrapeJob({
url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
team_id: team_id,
pageOptions: pageOptions,
origin: req.body.origin ?? defaultOrigin,
crawl_id: id,
});
await addCrawlJob(id, job.id);
}
await logCrawl(job.id.toString(), team_id); res.json({ jobId: id });
res.json({ jobId: job.id });
} catch (error) { } catch (error) {
Logger.error(error); Logger.error(error);
return res.status(500).json({ error: error.message }); return res.status(500).json({ error: error.message });

View File

@ -0,0 +1,69 @@
import { WebCrawler } from "../scraper/WebScraper/crawler";
import { redisConnection } from "../services/queue-service";
export type StoredCrawl = {
originUrl: string;
crawlerOptions: any;
pageOptions: any;
team_id: string;
robots?: string;
cancelled?: boolean;
};
export async function saveCrawl(id: string, crawl: StoredCrawl) {
await redisConnection.set("crawl:" + id, JSON.stringify(crawl));
await redisConnection.expire("crawl:" + id, 24 * 60 * 60, "NX");
}
export async function getCrawl(id: string): Promise<StoredCrawl | null> {
const x = await redisConnection.get("crawl:" + id);
if (x === null) {
return null;
}
return JSON.parse(x);
}
export async function addCrawlJob(id: string, job_id: string) {
await redisConnection.sadd("crawl:" + id + ":jobs", job_id);
await redisConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60, "NX");
}
export async function getCrawlJobs(id: string): Promise<string[]> {
return await redisConnection.smembers("crawl:" + id + ":jobs");
}
export async function lockURL(id: string, sc: StoredCrawl, url: string): Promise<boolean> {
if (typeof sc.crawlerOptions?.limit === "number") {
if (await redisConnection.scard("crawl:" + id + ":visited") >= sc.crawlerOptions.limit) {
return false;
}
}
const res = (await redisConnection.sadd("crawl:" + id + ":visited", url)) !== 0
await redisConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60, "NX");
return res;
}
export function crawlToCrawler(id: string, sc: StoredCrawl): WebCrawler {
const crawler = new WebCrawler({
jobId: id,
initialUrl: sc.originUrl,
includes: sc.crawlerOptions?.includes ?? [],
excludes: sc.crawlerOptions?.excludes ?? [],
maxCrawledLinks: sc.crawlerOptions?.maxCrawledLinks ?? 1000,
maxCrawledDepth: sc.crawlerOptions?.maxDepth ?? 10,
limit: sc.crawlerOptions?.limit ?? 10000,
generateImgAltText: sc.crawlerOptions?.generateImgAltText ?? false,
allowBackwardCrawling: sc.crawlerOptions?.allowBackwardCrawling ?? false,
allowExternalContentLinks: sc.crawlerOptions?.allowExternalContentLinks ?? false,
});
if (sc.robots !== undefined) {
try {
crawler.importRobotsTxt(sc.robots);
} catch (_) {}
}
return crawler;
}

View File

@ -1,4 +1,4 @@
import axios from "axios"; import axios, { AxiosError } from "axios";
import cheerio, { load } from "cheerio"; import cheerio, { load } from "cheerio";
import { URL } from "url"; import { URL } from "url";
import { getLinksFromSitemap } from "./sitemap"; import { getLinksFromSitemap } from "./sitemap";
@ -22,7 +22,7 @@ export class WebCrawler {
private crawledUrls: Map<string, string> = new Map(); private crawledUrls: Map<string, string> = new Map();
private limit: number; private limit: number;
private robotsTxtUrl: string; private robotsTxtUrl: string;
private robots: any; public robots: any;
private generateImgAltText: boolean; private generateImgAltText: boolean;
private allowBackwardCrawling: boolean; private allowBackwardCrawling: boolean;
private allowExternalContentLinks: boolean; private allowExternalContentLinks: boolean;
@ -66,7 +66,7 @@ export class WebCrawler {
this.allowExternalContentLinks = allowExternalContentLinks ?? false; this.allowExternalContentLinks = allowExternalContentLinks ?? false;
} }
private filterLinks(sitemapLinks: string[], limit: number, maxDepth: number): string[] { public filterLinks(sitemapLinks: string[], limit: number, maxDepth: number): string[] {
return sitemapLinks return sitemapLinks
.filter((link) => { .filter((link) => {
const url = new URL(link.trim(), this.baseUrl); const url = new URL(link.trim(), this.baseUrl);
@ -130,6 +130,25 @@ export class WebCrawler {
.slice(0, limit); .slice(0, limit);
} }
public async getRobotsTxt(): Promise<string> {
const response = await axios.get(this.robotsTxtUrl, { timeout: axiosTimeout });
return response.data;
}
public importRobotsTxt(txt: string) {
this.robots = robotsParser(this.robotsTxtUrl, txt);
}
public async tryGetSitemap(): Promise<{ url: string; html: string; }[] | null> {
Logger.debug(`Fetching sitemap links from ${this.initialUrl}`);
const sitemapLinks = await this.tryFetchSitemapLinks(this.initialUrl);
if (sitemapLinks.length > 0) {
let filteredLinks = this.filterLinks(sitemapLinks, this.limit, this.maxCrawledDepth);
return filteredLinks.map(link => ({ url: link, html: "" }));
}
return null;
}
public async start( public async start(
inProgress?: (progress: Progress) => void, inProgress?: (progress: Progress) => void,
pageOptions?: PageOptions, pageOptions?: PageOptions,
@ -142,19 +161,17 @@ export class WebCrawler {
Logger.debug(`Crawler starting with ${this.initialUrl}`); Logger.debug(`Crawler starting with ${this.initialUrl}`);
// Fetch and parse robots.txt // Fetch and parse robots.txt
try { try {
const response = await axios.get(this.robotsTxtUrl, { timeout: axiosTimeout }); const txt = await this.getRobotsTxt();
this.robots = robotsParser(this.robotsTxtUrl, response.data); this.importRobotsTxt(txt);
Logger.debug(`Crawler robots.txt fetched with ${this.robotsTxtUrl}`); Logger.debug(`Crawler robots.txt fetched with ${this.robotsTxtUrl}`);
} catch (error) { } catch (error) {
Logger.debug(`Failed to fetch robots.txt from ${this.robotsTxtUrl}`); Logger.debug(`Failed to fetch robots.txt from ${this.robotsTxtUrl}`);
} }
if (!crawlerOptions?.ignoreSitemap){ if (!crawlerOptions?.ignoreSitemap){
Logger.debug(`Fetching sitemap links from ${this.initialUrl}`); const sm = await this.tryGetSitemap();
const sitemapLinks = await this.tryFetchSitemapLinks(this.initialUrl); if (sm !== null) {
if (sitemapLinks.length > 0) { return sm;
let filteredLinks = this.filterLinks(sitemapLinks, limit, maxDepth);
return filteredLinks.map(link => ({ url: link, html: "" }));
} }
} }
@ -241,6 +258,37 @@ export class WebCrawler {
return Array.from(this.crawledUrls.entries()).map(([url, html]) => ({ url, html })); return Array.from(this.crawledUrls.entries()).map(([url, html]) => ({ url, html }));
} }
public filterURL(href: string, url: string): string | null {
let fullUrl = href;
if (!href.startsWith("http")) {
fullUrl = new URL(href, this.baseUrl).toString();
}
const urlObj = new URL(fullUrl);
const path = urlObj.pathname;
if (this.isInternalLink(fullUrl)) { // INTERNAL LINKS
if (this.isInternalLink(fullUrl) &&
this.noSections(fullUrl) &&
!this.matchesExcludes(path) &&
this.isRobotsAllowed(fullUrl)
) {
return fullUrl;
}
} else { // EXTERNAL LINKS
if (
this.isInternalLink(url) &&
this.allowExternalContentLinks &&
!this.isSocialMediaOrEmail(fullUrl) &&
!this.matchesExcludes(fullUrl, true) &&
!this.isExternalMainPage(fullUrl)
) {
return fullUrl;
}
}
return null;
}
async crawl(url: string, pageOptions: PageOptions): Promise<{url: string, html: string, pageStatusCode?: number, pageError?: string}[]> { async crawl(url: string, pageOptions: PageOptions): Promise<{url: string, html: string, pageStatusCode?: number, pageError?: string}[]> {
if (this.visited.has(url) || !this.robots.isAllowed(url, "FireCrawlAgent")) { if (this.visited.has(url) || !this.robots.isAllowed(url, "FireCrawlAgent")) {
return []; return [];
@ -287,31 +335,9 @@ export class WebCrawler {
$("a").each((_, element) => { $("a").each((_, element) => {
const href = $(element).attr("href"); const href = $(element).attr("href");
if (href) { if (href) {
let fullUrl = href; const u = this.filterURL(href, url);
if (!href.startsWith("http")) { if (u !== null) {
fullUrl = new URL(href, this.baseUrl).toString(); links.push({ url: u, html: content, pageStatusCode, pageError });
}
const urlObj = new URL(fullUrl);
const path = urlObj.pathname;
if (this.isInternalLink(fullUrl)) { // INTERNAL LINKS
if (this.isInternalLink(fullUrl) &&
this.noSections(fullUrl) &&
!this.matchesExcludes(path) &&
this.isRobotsAllowed(fullUrl)
) {
links.push({ url: fullUrl, html: content, pageStatusCode, pageError });
}
} else { // EXTERNAL LINKS
if (
this.isInternalLink(url) &&
this.allowExternalContentLinks &&
!this.isSocialMediaOrEmail(fullUrl) &&
!this.matchesExcludes(fullUrl, true) &&
!this.isExternalMainPage(fullUrl)
) {
links.push({ url: fullUrl, html: content, pageStatusCode, pageError });
}
} }
} }
}); });
@ -465,11 +491,15 @@ export class WebCrawler {
} }
} catch (error) { } catch (error) {
Logger.debug(`Failed to fetch sitemap with axios from ${sitemapUrl}: ${error}`); Logger.debug(`Failed to fetch sitemap with axios from ${sitemapUrl}: ${error}`);
if (error instanceof AxiosError && error.response?.status === 404) {
// ignore 404
} else {
const response = await getLinksFromSitemap({ sitemapUrl, mode: 'fire-engine' }); const response = await getLinksFromSitemap({ sitemapUrl, mode: 'fire-engine' });
if (response) { if (response) {
sitemapLinks = response; sitemapLinks = response;
} }
} }
}
if (sitemapLinks.length === 0) { if (sitemapLinks.length === 0) {
const baseUrlSitemap = `${this.baseUrl}/sitemap.xml`; const baseUrlSitemap = `${this.baseUrl}/sitemap.xml`;
@ -480,9 +510,13 @@ export class WebCrawler {
} }
} catch (error) { } catch (error) {
Logger.debug(`Failed to fetch sitemap from ${baseUrlSitemap}: ${error}`); Logger.debug(`Failed to fetch sitemap from ${baseUrlSitemap}: ${error}`);
if (error instanceof AxiosError && error.response?.status === 404) {
// ignore 404
} else {
sitemapLinks = await getLinksFromSitemap({ sitemapUrl: baseUrlSitemap, mode: 'fire-engine' }); sitemapLinks = await getLinksFromSitemap({ sitemapUrl: baseUrlSitemap, mode: 'fire-engine' });
} }
} }
}
const normalizedUrl = normalizeUrl(url); const normalizedUrl = normalizeUrl(url);
const normalizedSitemapLinks = sitemapLinks.map(link => normalizeUrl(link)); const normalizedSitemapLinks = sitemapLinks.map(link => normalizeUrl(link));

View File

@ -41,10 +41,10 @@ export function extractLinks(html: string, baseUrl: string): string[] {
links.push(href); links.push(href);
} else if (href.startsWith('/')) { } else if (href.startsWith('/')) {
// Relative URL starting with '/', append to origin // Relative URL starting with '/', append to origin
links.push(`${origin}${href}`); links.push(new URL(href, baseUrl).href);
} else if (!href.startsWith('#') && !href.startsWith('mailto:')) { } else if (!href.startsWith('#') && !href.startsWith('mailto:')) {
// Relative URL not starting with '/', append to base URL // Relative URL not starting with '/', append to base URL
links.push(`${baseUrl}/${href}`); links.push(new URL(href, baseUrl).href);
} else if (href.startsWith('mailto:')) { } else if (href.startsWith('mailto:')) {
// mailto: links, add as is // mailto: links, add as is
links.push(href); links.push(href);

View File

@ -18,6 +18,11 @@ import { ScrapeEvents } from "../lib/scrape-events";
import { Worker } from "bullmq"; import { Worker } from "bullmq";
import systemMonitor from "./system-monitor"; import systemMonitor from "./system-monitor";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { WebCrawler } from "../scraper/WebScraper/crawler";
import { getAdjustedMaxDepth } from "../scraper/WebScraper/utils/maxDepthUtils";
import { addCrawlJob, crawlToCrawler, getCrawl, lockURL } from "../lib/crawl-redis";
import { StoredCrawl } from "../lib/crawl-redis";
import { addScrapeJob } from "./queue-jobs";
if (process.env.ENV === "production") { if (process.env.ENV === "production") {
initSDK({ initSDK({
@ -40,8 +45,6 @@ const cantAcceptConnectionInterval =
const connectionMonitorInterval = const connectionMonitorInterval =
Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10; Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10;
const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20; const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20;
const wsq = getWebScraperQueue();
const sq = getScrapeQueue();
const processJobInternal = async (token: string, job: Job) => { const processJobInternal = async (token: string, job: Job) => {
const extendLockInterval = setInterval(async () => { const extendLockInterval = setInterval(async () => {
@ -128,18 +131,10 @@ async function processJob(job: Job, token: string) {
const end = Date.now(); const end = Date.now();
const timeTakenInSeconds = (end - start) / 1000; const timeTakenInSeconds = (end - start) / 1000;
const isCancelled = await (await getWebScraperQueue().client).exists("cancelled:" + job.id);
if (isCancelled) {
await job.discard();
await job.moveToFailed(Error("Job cancelled by user"), job.token);
await job.discard();
}
const data = { const data = {
success, success,
result: { result: {
links: isCancelled ? [] : docs.map((doc) => { links: docs.map((doc) => {
return { return {
content: doc, content: doc,
source: doc?.metadata?.sourceURL ?? doc?.url ?? "", source: doc?.metadata?.sourceURL ?? doc?.url ?? "",
@ -147,20 +142,20 @@ async function processJob(job: Job, token: string) {
}), }),
}, },
project_id: job.data.project_id, project_id: job.data.project_id,
error: isCancelled ? "Job cancelled by user" : message /* etc... */, error: message /* etc... */,
docs: isCancelled ? [] : docs, docs,
}; };
if (job.data.mode === "crawl" && !isCancelled) { if (job.data.mode === "crawl") {
await callWebhook(job.data.team_id, job.id as string, data); await callWebhook(job.data.team_id, job.id as string, data);
} }
await logJob({ await logJob({
job_id: job.id as string, job_id: job.id as string,
success: success && !isCancelled, success: success,
message: isCancelled ? "Job cancelled by user" : message, message: message,
num_docs: isCancelled ? 0 : docs.length, num_docs: docs.length,
docs: isCancelled ? [] : docs, docs: docs,
time_taken: timeTakenInSeconds, time_taken: timeTakenInSeconds,
team_id: job.data.team_id, team_id: job.data.team_id,
mode: job.data.mode, mode: job.data.mode,
@ -168,7 +163,44 @@ async function processJob(job: Job, token: string) {
crawlerOptions: job.data.crawlerOptions, crawlerOptions: job.data.crawlerOptions,
pageOptions: job.data.pageOptions, pageOptions: job.data.pageOptions,
origin: job.data.origin, origin: job.data.origin,
crawl_id: job.data.crawl_id,
}); });
if (job.data.crawl_id) {
if (!job.data.sitemapped) {
const sc = await getCrawl(job.data.crawl_id) as StoredCrawl;
if (!sc.cancelled) {
const crawler = crawlToCrawler(job.data.crawl_id, sc);
const links = crawler.filterLinks((data.docs[0].linksOnPage as string[])
.map(href => crawler.filterURL(href, sc.originUrl))
.filter(x => x !== null),
Infinity,
sc.crawlerOptions?.maxDepth ?? 10
)
for (const link of links) {
if (await lockURL(job.data.crawl_id, sc, link)) {
console.log("Locked", link + "!");
const newJob = await addScrapeJob({
url: link,
mode: "single_urls",
crawlerOptions: sc.crawlerOptions,
team_id: sc.team_id,
pageOptions: sc.pageOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
});
await addCrawlJob(job.data.crawl_id, newJob.id);
}
}
}
}
}
Logger.info(`🐂 Job done ${job.id}`); Logger.info(`🐂 Job done ${job.id}`);
return data; return data;
} catch (error) { } catch (error) {
@ -216,11 +248,12 @@ async function processJob(job: Job, token: string) {
docs: [], docs: [],
time_taken: 0, time_taken: 0,
team_id: job.data.team_id, team_id: job.data.team_id,
mode: "crawl", mode: job.data.mode,
url: job.data.url, url: job.data.url,
crawlerOptions: job.data.crawlerOptions, crawlerOptions: job.data.crawlerOptions,
pageOptions: job.data.pageOptions, pageOptions: job.data.pageOptions,
origin: job.data.origin, origin: job.data.origin,
crawl_id: job.data.crawl_id,
}); });
// done(null, data); // done(null, data);
return data; return data;

View File

@ -28,6 +28,8 @@ export interface WebScraperOptions {
extractorOptions?: any; extractorOptions?: any;
team_id: string; team_id: string;
origin?: string; origin?: string;
crawl_id?: string;
sitemapped?: boolean;
} }
export interface RunWebScraperParams { export interface RunWebScraperParams {
@ -65,6 +67,7 @@ export interface FirecrawlJob {
extractor_options?: ExtractorOptions, extractor_options?: ExtractorOptions,
num_tokens?: number, num_tokens?: number,
retry?: boolean, retry?: boolean,
crawl_id?: string;
} }
export interface FirecrawlScrapeResponse { export interface FirecrawlScrapeResponse {