From 845c2744a9e4f9278fab608823c71b0cc17a0fed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Thu, 5 Dec 2024 20:50:36 +0100 Subject: [PATCH] feat(app): add extra crawl logging (app-side only for now) --- apps/api/src/controllers/v1/batch-scrape.ts | 10 +++++- apps/api/src/controllers/v1/crawl.ts | 40 ++++++++++++++------- apps/api/src/lib/crawl-redis.ts | 26 ++++++++++++-- apps/api/src/scraper/WebScraper/crawler.ts | 24 +++++++------ apps/api/src/scraper/WebScraper/sitemap.ts | 11 +++--- 5 files changed, 78 insertions(+), 33 deletions(-) diff --git a/apps/api/src/controllers/v1/batch-scrape.ts b/apps/api/src/controllers/v1/batch-scrape.ts index db029d40..133977be 100644 --- a/apps/api/src/controllers/v1/batch-scrape.ts +++ b/apps/api/src/controllers/v1/batch-scrape.ts @@ -5,6 +5,7 @@ import { batchScrapeRequestSchema, CrawlResponse, RequestWithAuth, + ScrapeOptions, } from "./types"; import { addCrawlJobs, @@ -14,10 +15,10 @@ import { StoredCrawl, } from "../../lib/crawl-redis"; import { logCrawl } from "../../services/logging/crawl_log"; -import { getScrapeQueue } from "../../services/queue-service"; import { getJobPriority } from "../../lib/job-priority"; import { addScrapeJobs } from "../../services/queue-jobs"; import { callWebhook } from "../../services/webhook"; +import { logger as _logger } from "../../lib/logger"; export async function batchScrapeController( req: RequestWithAuth<{}, CrawlResponse, BatchScrapeRequest>, @@ -26,6 +27,8 @@ export async function batchScrapeController( req.body = batchScrapeRequestSchema.parse(req.body); const id = req.body.appendToId ?? uuidv4(); + const logger = _logger.child({ crawlId: id, batchScrapeId: id, module: "api/v1", method: "batchScrapeController", teamId: req.auth.team_id, plan: req.auth.plan }); + logger.debug("Batch scrape " + id + " starting", { urlsLength: req.body.urls, appendToId: req.body.appendToId, account: req.account }); if (!req.body.appendToId) { await logCrawl(id, req.auth.team_id); @@ -58,6 +61,7 @@ export async function batchScrapeController( // set base to 21 jobPriority = await getJobPriority({plan: req.auth.plan, team_id: req.auth.team_id, basePriority: 21}) } + logger.debug("Using job priority " + jobPriority, { jobPriority }); const scrapeOptions: ScrapeOptions = { ...req.body }; delete (scrapeOptions as any).urls; @@ -85,18 +89,22 @@ export async function batchScrapeController( }; }); + logger.debug("Locking URLs..."); await lockURLs( id, sc, jobs.map((x) => x.data.url) ); + logger.debug("Adding scrape jobs to Redis..."); await addCrawlJobs( id, jobs.map((x) => x.opts.jobId) ); + logger.debug("Adding scrape jobs to BullMQ..."); await addScrapeJobs(jobs); if(req.body.webhook) { + logger.debug("Calling webhook with batch_scrape.started...", { webhook: req.body.webhook }); await callWebhook(req.auth.team_id, id, null, req.body.webhook, true, "batch_scrape.started"); } diff --git a/apps/api/src/controllers/v1/crawl.ts b/apps/api/src/controllers/v1/crawl.ts index 2195913f..08630bfa 100644 --- a/apps/api/src/controllers/v1/crawl.ts +++ b/apps/api/src/controllers/v1/crawl.ts @@ -19,7 +19,7 @@ import { import { logCrawl } from "../../services/logging/crawl_log"; import { getScrapeQueue } from "../../services/queue-service"; import { addScrapeJob } from "../../services/queue-jobs"; -import { logger } from "../../lib/logger"; +import { logger as _logger } from "../../lib/logger"; import { getJobPriority } from "../../lib/job-priority"; import { callWebhook } from "../../services/webhook"; import { scrapeOptions as scrapeOptionsSchema } from "./types"; @@ -28,9 +28,12 @@ export async function crawlController( req: RequestWithAuth<{}, CrawlResponse, CrawlRequest>, res: Response ) { + const preNormalizedBody = req.body; req.body = crawlRequestSchema.parse(req.body); const id = uuidv4(); + const logger = _logger.child({ crawlId: id, module: "api/v1", method: "crawlController", teamId: req.auth.team_id, plan: req.auth.plan }); + logger.debug("Crawl " + id + " starting", { request: req.body, originalRequest: preNormalizedBody, account: req.account }); await logCrawl(id, req.auth.team_id); @@ -68,7 +71,9 @@ export async function crawlController( } } + const originalLimit = crawlerOptions.limit; crawlerOptions.limit = Math.min(remainingCredits, crawlerOptions.limit); + logger.debug("Determined limit: " + crawlerOptions.limit, { remainingCredits, bodyLimit: originalLimit, originalBodyLimit: preNormalizedBody.limit }); const sc: StoredCrawl = { originUrl: req.body.url, @@ -85,11 +90,7 @@ export async function crawlController( try { sc.robots = await crawler.getRobotsTxt(scrapeOptions.skipTlsVerification); } catch (e) { - logger.debug( - `[Crawl] Failed to get robots.txt (this is probably fine!): ${JSON.stringify( - e - )}` - ); + logger.debug("Failed to get robots.txt (this is probably fine!)", { error: e }); } await saveCrawl(id, sc); @@ -97,15 +98,18 @@ export async function crawlController( const sitemap = sc.crawlerOptions.ignoreSitemap ? null : await crawler.tryGetSitemap(); - + if (sitemap !== null && sitemap.length > 0) { + logger.debug("Using sitemap of length " + sitemap.length, { sitemapLength: sitemap.length }); let jobPriority = 20; - // If it is over 1000, we need to get the job priority, - // otherwise we can use the default priority of 20 - if(sitemap.length > 1000){ - // set base to 21 - jobPriority = await getJobPriority({plan: req.auth.plan, team_id: req.auth.team_id, basePriority: 21}) - } + // If it is over 1000, we need to get the job priority, + // otherwise we can use the default priority of 20 + if(sitemap.length > 1000){ + // set base to 21 + jobPriority = await getJobPriority({plan: req.auth.plan, team_id: req.auth.team_id, basePriority: 21}) + } + logger.debug("Using job priority " + jobPriority, { jobPriority }); + const jobs = sitemap.map((x) => { const url = x.url; const uuid = uuidv4(); @@ -131,19 +135,26 @@ export async function crawlController( }; }) + logger.debug("Locking URLs..."); await lockURLs( id, sc, jobs.map((x) => x.data.url) ); + logger.debug("Adding scrape jobs to Redis..."); await addCrawlJobs( id, jobs.map((x) => x.opts.jobId) ); + logger.debug("Adding scrape jobs to BullMQ..."); await getScrapeQueue().addBulk(jobs); } else { + logger.debug("Sitemap not found or ignored.", { ignoreSitemap: sc.crawlerOptions.ignoreSitemap }); + + logger.debug("Locking URL..."); await lockURL(id, sc, req.body.url); const jobId = uuidv4(); + logger.debug("Adding scrape job to Redis...", { jobId }); await addScrapeJob( { url: req.body.url, @@ -162,10 +173,13 @@ export async function crawlController( }, jobId, ); + logger.debug("Adding scrape job to BullMQ...", { jobId }); await addCrawlJob(id, jobId); } + logger.debug("Done queueing jobs!"); if(req.body.webhook) { + logger.debug("Calling webhook with crawl.started...", { webhook: req.body.webhook }); await callWebhook(req.auth.team_id, id, null, req.body.webhook, true, "crawl.started"); } diff --git a/apps/api/src/lib/crawl-redis.ts b/apps/api/src/lib/crawl-redis.ts index 68087993..1e07662b 100644 --- a/apps/api/src/lib/crawl-redis.ts +++ b/apps/api/src/lib/crawl-redis.ts @@ -2,7 +2,7 @@ import { InternalOptions } from "../scraper/scrapeURL"; import { ScrapeOptions } from "../controllers/v1/types"; import { WebCrawler } from "../scraper/WebScraper/crawler"; import { redisConnection } from "../services/queue-service"; -import { logger } from "./logger"; +import { logger as _logger } from "./logger"; import { getAdjustedMaxDepth } from "../scraper/WebScraper/utils/maxDepthUtils"; export type StoredCrawl = { @@ -18,6 +18,7 @@ export type StoredCrawl = { }; export async function saveCrawl(id: string, crawl: StoredCrawl) { + _logger.debug("Saving crawl " + id + " to Redis...", { crawl, module: "crawl-redis", method: "saveCrawl", crawlId: id, teamId: crawl.team_id, plan: crawl.plan }); await redisConnection.set("crawl:" + id, JSON.stringify(crawl)); await redisConnection.expire("crawl:" + id, 24 * 60 * 60, "NX"); } @@ -41,16 +42,19 @@ export async function getCrawlExpiry(id: string): Promise { } export async function addCrawlJob(id: string, job_id: string) { + _logger.debug("Adding crawl job " + job_id + " to Redis...", { jobId: job_id, module: "crawl-redis", method: "addCrawlJob", crawlId: id }); await redisConnection.sadd("crawl:" + id + ":jobs", job_id); await redisConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60, "NX"); } export async function addCrawlJobs(id: string, job_ids: string[]) { + _logger.debug("Adding crawl jobs to Redis...", { jobIds: job_ids, module: "crawl-redis", method: "addCrawlJobs", crawlId: id }); await redisConnection.sadd("crawl:" + id + ":jobs", ...job_ids); await redisConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60, "NX"); } export async function addCrawlJobDone(id: string, job_id: string) { + _logger.debug("Adding done crawl job to Redis...", { jobId: job_id, module: "crawl-redis", method: "addCrawlJobDone", crawlId: id }); await redisConnection.sadd("crawl:" + id + ":jobs_done", job_id); await redisConnection.rpush("crawl:" + id + ":jobs_done_ordered", job_id); await redisConnection.expire("crawl:" + id + ":jobs_done", 24 * 60 * 60, "NX"); @@ -75,11 +79,14 @@ export async function isCrawlFinishedLocked(id: string) { export async function finishCrawl(id: string) { if (await isCrawlFinished(id)) { + _logger.debug("Marking crawl as finished.", { module: "crawl-redis", method: "finishCrawl", crawlId: id }); const set = await redisConnection.setnx("crawl:" + id + ":finish", "yes"); if (set === 1) { await redisConnection.expire("crawl:" + id + ":finish", 24 * 60 * 60); } return set === 1 + } else { + _logger.debug("Crawl can not be finished yet, not marking as finished.", { module: "crawl-redis", method: "finishCrawl", crawlId: id }); } } @@ -132,14 +139,19 @@ export function generateURLPermutations(url: string | URL): URL[] { } export async function lockURL(id: string, sc: StoredCrawl, url: string): Promise { + const logger = _logger.child({ crawlId: id, module: "crawl-redis", method: "lockURL", preNormalizedURL: url, teamId: sc.team_id, plan: sc.plan }); + if (typeof sc.crawlerOptions?.limit === "number") { if (await redisConnection.scard("crawl:" + id + ":visited_unique") >= sc.crawlerOptions.limit) { + logger.debug("Crawl has already hit visited_unique limit, not locking URL."); return false; } } url = normalizeURL(url, sc); + logger.defaultMeta.url = url; + logger.debug("Locking URL " + JSON.stringify(url) + "..."); await redisConnection.sadd("crawl:" + id + ":visited_unique", url); await redisConnection.expire("crawl:" + id + ":visited_unique", 24 * 60 * 60, "NX"); @@ -147,20 +159,25 @@ export async function lockURL(id: string, sc: StoredCrawl, url: string): Promise if (!sc.crawlerOptions?.deduplicateSimilarURLs) { res = (await redisConnection.sadd("crawl:" + id + ":visited", url)) !== 0 } else { - const permutations = generateURLPermutations(url); - const x = (await redisConnection.sadd("crawl:" + id + ":visited", ...permutations.map(x => x.href))); + const permutations = generateURLPermutations(url).map(x => x.href); + logger.debug("Adding URL permutations for URL " + JSON.stringify(url) + "...", { permutations }); + const x = (await redisConnection.sadd("crawl:" + id + ":visited", ...permutations)); res = x === permutations.length; } await redisConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60, "NX"); + + logger.debug("lockURL final result: " + res, { res }); return res; } /// NOTE: does not check limit. only use if limit is checked beforehand e.g. with sitemap export async function lockURLs(id: string, sc: StoredCrawl, urls: string[]): Promise { urls = urls.map(url => normalizeURL(url, sc)); + const logger = _logger.child({ crawlId: id, module: "crawl-redis", method: "lockURL", teamId: sc.team_id, plan: sc.plan }); // Add to visited_unique set + logger.debug("Locking " + urls.length + " URLs..."); await redisConnection.sadd("crawl:" + id + ":visited_unique", ...urls); await redisConnection.expire("crawl:" + id + ":visited_unique", 24 * 60 * 60, "NX"); @@ -170,11 +187,14 @@ export async function lockURLs(id: string, sc: StoredCrawl, urls: string[]): Pro res = x === urls.length; } else { const allPermutations = urls.flatMap(url => generateURLPermutations(url).map(x => x.href)); + logger.debug("Adding " + allPermutations.length + " URL permutations..."); const x = await redisConnection.sadd("crawl:" + id + ":visited", ...allPermutations); res = x === allPermutations.length; } await redisConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60, "NX"); + + logger.debug("lockURLs final result: " + res, { res }); return res; } diff --git a/apps/api/src/scraper/WebScraper/crawler.ts b/apps/api/src/scraper/WebScraper/crawler.ts index 2296b095..87b2c437 100644 --- a/apps/api/src/scraper/WebScraper/crawler.ts +++ b/apps/api/src/scraper/WebScraper/crawler.ts @@ -5,7 +5,7 @@ import { getLinksFromSitemap } from "./sitemap"; import robotsParser from "robots-parser"; import { getURLDepth } from "./utils/maxDepthUtils"; import { axiosTimeout } from "../../../src/lib/timeout"; -import { logger } from "../../../src/lib/logger"; +import { logger as _logger } from "../../../src/lib/logger"; import https from "https"; export class WebCrawler { private jobId: string; @@ -25,6 +25,7 @@ export class WebCrawler { private allowExternalContentLinks: boolean; private allowSubdomains: boolean; private ignoreRobotsTxt: boolean; + private logger: typeof _logger; constructor({ jobId, @@ -71,6 +72,7 @@ export class WebCrawler { this.allowExternalContentLinks = allowExternalContentLinks ?? false; this.allowSubdomains = allowSubdomains ?? false; this.ignoreRobotsTxt = ignoreRobotsTxt ?? false; + this.logger = _logger.child({ crawlId: this.jobId, module: "WebCrawler" }); } public filterLinks(sitemapLinks: string[], limit: number, maxDepth: number, fromMap: boolean = false): string[] { @@ -85,7 +87,7 @@ export class WebCrawler { try { url = new URL(link.trim(), this.baseUrl); } catch (error) { - logger.debug(`Error processing link: ${link} | Error: ${error.message}`); + this.logger.debug(`Error processing link: ${link}`, { link, error, method: "filterLinks" }); return false; } const path = url.pathname; @@ -144,7 +146,7 @@ export class WebCrawler { const isAllowed = this.ignoreRobotsTxt ? true : (this.robots.isAllowed(link, "FireCrawlAgent") ?? true); // Check if the link is disallowed by robots.txt if (!isAllowed) { - logger.debug(`Link disallowed by robots.txt: ${link}`); + this.logger.debug(`Link disallowed by robots.txt: ${link}`, { method: "filterLinks", link }); return false; } @@ -173,7 +175,7 @@ export class WebCrawler { } public async tryGetSitemap(fromMap: boolean = false, onlySitemap: boolean = false): Promise<{ url: string; html: string; }[] | null> { - logger.debug(`Fetching sitemap links from ${this.initialUrl}`); + this.logger.debug(`Fetching sitemap links from ${this.initialUrl}`, { method: "tryGetSitemap" }); const sitemapLinks = await this.tryFetchSitemapLinks(this.initialUrl); if(fromMap && onlySitemap) { return sitemapLinks.map(link => ({ url: link, html: "" })); @@ -350,7 +352,7 @@ export class WebCrawler { const urlWithoutQuery = url.split('?')[0].toLowerCase(); return fileExtensions.some((ext) => urlWithoutQuery.endsWith(ext)); } catch (error) { - logger.error(`Error processing URL in isFile: ${error}`); + this.logger.error(`Error processing URL in isFile`, { method: "isFile", error }); return false; } } @@ -390,14 +392,14 @@ export class WebCrawler { try { const response = await axios.get(sitemapUrl, { timeout: axiosTimeout }); if (response.status === 200) { - sitemapLinks = await getLinksFromSitemap({ sitemapUrl }); + sitemapLinks = await getLinksFromSitemap({ sitemapUrl }, this.logger); } } catch (error) { - logger.debug(`Failed to fetch sitemap with axios from ${sitemapUrl}: ${error}`); + this.logger.debug(`Failed to fetch sitemap with axios from ${sitemapUrl}`, { method: "tryFetchSitemapLinks", sitemapUrl, error }); if (error instanceof AxiosError && error.response?.status === 404) { // ignore 404 } else { - const response = await getLinksFromSitemap({ sitemapUrl, mode: 'fire-engine' }); + const response = await getLinksFromSitemap({ sitemapUrl, mode: 'fire-engine' }, this.logger); if (response) { sitemapLinks = response; } @@ -409,14 +411,14 @@ export class WebCrawler { try { const response = await axios.get(baseUrlSitemap, { timeout: axiosTimeout }); if (response.status === 200) { - sitemapLinks = await getLinksFromSitemap({ sitemapUrl: baseUrlSitemap, mode: 'fire-engine' }); + sitemapLinks = await getLinksFromSitemap({ sitemapUrl: baseUrlSitemap, mode: 'fire-engine' }, this.logger); } } catch (error) { - logger.debug(`Failed to fetch sitemap from ${baseUrlSitemap}: ${error}`); + this.logger.debug(`Failed to fetch sitemap from ${baseUrlSitemap}`, { method: "tryFetchSitemapLinks", sitemapUrl: baseUrlSitemap, error }); if (error instanceof AxiosError && error.response?.status === 404) { // ignore 404 } else { - sitemapLinks = await getLinksFromSitemap({ sitemapUrl: baseUrlSitemap, mode: 'fire-engine' }); + sitemapLinks = await getLinksFromSitemap({ sitemapUrl: baseUrlSitemap, mode: 'fire-engine' }, this.logger); } } } diff --git a/apps/api/src/scraper/WebScraper/sitemap.ts b/apps/api/src/scraper/WebScraper/sitemap.ts index 51f90b18..0c93448c 100644 --- a/apps/api/src/scraper/WebScraper/sitemap.ts +++ b/apps/api/src/scraper/WebScraper/sitemap.ts @@ -2,9 +2,9 @@ import axios from "axios"; import { axiosTimeout } from "../../lib/timeout"; import { parseStringPromise } from "xml2js"; import { WebCrawler } from "./crawler"; -import { logger } from "../../lib/logger"; import { scrapeURL } from "../scrapeURL"; import { scrapeOptions } from "../../controllers/v1/types"; +import type { Logger } from "winston"; export async function getLinksFromSitemap( { @@ -15,7 +15,8 @@ export async function getLinksFromSitemap( sitemapUrl: string, allUrls?: string[], mode?: 'axios' | 'fire-engine' - } + }, + logger: Logger, ): Promise { try { let content: string = ""; @@ -31,7 +32,7 @@ export async function getLinksFromSitemap( content = response.document.rawHtml!; } } catch (error) { - logger.error(`Request failed for ${sitemapUrl}: ${error.message}`); + logger.error(`Request failed for ${sitemapUrl}`, { method: "getLinksFromSitemap", mode, sitemapUrl, error }); return allUrls; } @@ -42,7 +43,7 @@ export async function getLinksFromSitemap( if (root && root.sitemap) { const sitemapPromises = root.sitemap .filter(sitemap => sitemap.loc && sitemap.loc.length > 0) - .map(sitemap => getLinksFromSitemap({ sitemapUrl: sitemap.loc[0], allUrls, mode })); + .map(sitemap => getLinksFromSitemap({ sitemapUrl: sitemap.loc[0], allUrls, mode }, logger)); await Promise.all(sitemapPromises); } else if (root && root.url) { const validUrls = root.url @@ -51,7 +52,7 @@ export async function getLinksFromSitemap( allUrls.push(...validUrls); } } catch (error) { - logger.debug(`Error processing sitemapUrl: ${sitemapUrl} | Error: ${error.message}`); + logger.debug(`Error processing sitemapUrl: ${sitemapUrl}`, { method: "getLinksFromSitemap", mode, sitemapUrl, error }); } return allUrls;