From ccfada98caea89e0cd065945b74cd9c842d49844 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Tue, 7 Jan 2025 19:15:23 +0100 Subject: [PATCH] various queue fixes --- apps/api/src/controllers/v1/crawl.ts | 10 +- apps/api/src/lib/crawl-redis.ts | 16 +++ apps/api/src/scraper/WebScraper/crawler.ts | 2 +- apps/api/src/services/queue-worker.ts | 114 +++++++++++---------- 4 files changed, 78 insertions(+), 64 deletions(-) diff --git a/apps/api/src/controllers/v1/crawl.ts b/apps/api/src/controllers/v1/crawl.ts index a759f448..a01106d0 100644 --- a/apps/api/src/controllers/v1/crawl.ts +++ b/apps/api/src/controllers/v1/crawl.ts @@ -8,21 +8,13 @@ import { toLegacyCrawlerOptions, } from "./types"; import { - addCrawlJob, - addCrawlJobs, crawlToCrawler, - lockURL, - lockURLs, saveCrawl, StoredCrawl, } from "../../lib/crawl-redis"; import { logCrawl } from "../../services/logging/crawl_log"; -import { getScrapeQueue } from "../../services/queue-service"; -import { _addScrapeJobToBullMQ, addScrapeJob, addScrapeJobs } from "../../services/queue-jobs"; +import { _addScrapeJobToBullMQ } from "../../services/queue-jobs"; import { logger as _logger } from "../../lib/logger"; -import { getJobPriority } from "../../lib/job-priority"; -import { callWebhook } from "../../services/webhook"; -import { scrapeOptions as scrapeOptionsSchema } from "./types"; export async function crawlController( req: RequestWithAuth<{}, CrawlResponse, CrawlRequest>, diff --git a/apps/api/src/lib/crawl-redis.ts b/apps/api/src/lib/crawl-redis.ts index 6ecb0b8f..80720bc6 100644 --- a/apps/api/src/lib/crawl-redis.ts +++ b/apps/api/src/lib/crawl-redis.ts @@ -315,6 +315,22 @@ export async function lockURLs( return res; } +export async function lockURLsIndividually( + id: string, + sc: StoredCrawl, + jobs: { id: string; url: string; }[], +) { + const out: typeof jobs = []; + + for (const job of jobs) { + if (await lockURL(id, sc, job.url)) { + out.push(job); + } + } + + return out; +} + export function crawlToCrawler( id: string, sc: StoredCrawl, diff --git a/apps/api/src/scraper/WebScraper/crawler.ts b/apps/api/src/scraper/WebScraper/crawler.ts index dd949238..c958306d 100644 --- a/apps/api/src/scraper/WebScraper/crawler.ts +++ b/apps/api/src/scraper/WebScraper/crawler.ts @@ -471,7 +471,7 @@ export class WebCrawler { } private async tryFetchSitemapLinks(url: string, urlsHandler: (urls: string[]) => unknown): Promise { - const sitemapUrl = url.endsWith(".xml") ? url : `${url}/sitemap.xml`; + const sitemapUrl = url.endsWith(".xml") ? url : `${url}${url.endsWith("/") ? "" : "/"}sitemap.xml`; let sitemapCount: number = 0; diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 9e895216..2f27c69b 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -26,7 +26,9 @@ import { getCrawlJobs, lockURL, lockURLs, + lockURLsIndividually, normalizeURL, + saveCrawl, } from "../lib/crawl-redis"; import { StoredCrawl } from "../lib/crawl-redis"; import { addScrapeJob, addScrapeJobs } from "./queue-jobs"; @@ -480,6 +482,47 @@ async function processKickoffJob(job: Job & { id: string }, token: string) { const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl; const crawler = crawlToCrawler(job.data.crawl_id, sc); + logger.debug("Locking URL..."); + await lockURL(job.data.crawl_id, sc, job.data.url); + const jobId = uuidv4(); + logger.debug("Adding scrape job to Redis...", { jobId }); + await addScrapeJob( + { + url: job.data.url, + mode: "single_urls", + team_id: job.data.team_id, + crawlerOptions: job.data.crawlerOptions, + scrapeOptions: scrapeOptions.parse(job.data.scrapeOptions), + internalOptions: sc.internalOptions, + plan: job.data.plan!, + origin: job.data.origin, + crawl_id: job.data.crawl_id, + webhook: job.data.webhook, + v1: job.data.v1, + isCrawlSourceScrape: true, + }, + { + priority: 15, + }, + jobId, + ); + logger.debug("Adding scrape job to BullMQ...", { jobId }); + await addCrawlJob(job.data.crawl_id, jobId); + + if (job.data.webhook) { + logger.debug("Calling webhook with crawl.started...", { + webhook: job.data.webhook, + }); + await callWebhook( + job.data.team_id, + job.data.crawl_id, + null, + job.data.webhook, + true, + "crawl.started", + ); + } + const sitemap = sc.crawlerOptions.ignoreSitemap ? 0 : await crawler.tryGetSitemap(async (urls) => { @@ -522,68 +565,29 @@ async function processKickoffJob(job: Job & { id: string }, token: string) { }); logger.debug("Locking URLs..."); - await lockURLs( + const lockedIds = await lockURLsIndividually( job.data.crawl_id, sc, - jobs.map((x) => x.data.url), + jobs.map((x) => ({ id: x.opts.jobId, url: x.data.url })), ); + const lockedJobs = jobs.filter(x => lockedIds.find(y => y.id === x.opts.jobId)); logger.debug("Adding scrape jobs to Redis..."); await addCrawlJobs( job.data.crawl_id, - jobs.map((x) => x.opts.jobId), + lockedJobs.map((x) => x.opts.jobId), ); logger.debug("Adding scrape jobs to BullMQ..."); - await addScrapeJobs(jobs); + await addScrapeJobs(lockedJobs); }); if (sitemap === 0) { logger.debug("Sitemap not found or ignored.", { ignoreSitemap: sc.crawlerOptions.ignoreSitemap, }); - - logger.debug("Locking URL..."); - await lockURL(job.data.crawl_id, sc, job.data.url); - const jobId = uuidv4(); - logger.debug("Adding scrape job to Redis...", { jobId }); - await addScrapeJob( - { - url: job.data.url, - mode: "single_urls", - team_id: job.data.team_id, - crawlerOptions: job.data.crawlerOptions, - scrapeOptions: scrapeOptions.parse(job.data.scrapeOptions), - internalOptions: sc.internalOptions, - plan: job.data.plan!, - origin: job.data.origin, - crawl_id: job.data.crawl_id, - webhook: job.data.webhook, - v1: job.data.v1, - isCrawlSourceScrape: true, - }, - { - priority: 15, - }, - jobId, - ); - logger.debug("Adding scrape job to BullMQ...", { jobId }); - await addCrawlJob(job.data.crawl_id, jobId); } + logger.debug("Done queueing jobs!"); - if (job.data.webhook) { - logger.debug("Calling webhook with crawl.started...", { - webhook: job.data.webhook, - }); - await callWebhook( - job.data.team_id, - job.data.crawl_id, - null, - job.data.webhook, - true, - "crawl.started", - ); - } - return { success: true }; } catch (error) { logger.error("An error occurred!", { error }); @@ -720,16 +724,18 @@ async function processJob(job: Job & { id: string }, token: string) { ) { const crawler = crawlToCrawler(job.data.crawl_id, sc); if ( - crawler.filterURL(doc.metadata.url, doc.metadata.sourceURL) === null + crawler.filterURL(doc.metadata.url, doc.metadata.sourceURL) === null && + !job.data.isCrawlSourceScrape ) { - if (job.data.isCrawlSourceScrape) { - // TODO: re-fetch sitemap for redirect target domain - // TODO: reset crawl source url to new target - } else { - throw new Error( - "Redirected target URL is not allowed by crawlOptions", - ); // TODO: make this its own error type that is ignored by error tracking - } + throw new Error( + "Redirected target URL is not allowed by crawlOptions", + ); // TODO: make this its own error type that is ignored by error tracking + } + + if (job.data.isCrawlSourceScrape) { + // TODO: re-fetch sitemap for redirect target domain + sc.originUrl = doc.metadata.url; + await saveCrawl(job.data.crawl_id, sc); } if (isUrlBlocked(doc.metadata.url)) {