various queue fixes

This commit is contained in:
Gergő Móricz 2025-01-07 19:15:23 +01:00
parent 7a03275575
commit ccfada98ca
4 changed files with 78 additions and 64 deletions

View File

@ -8,21 +8,13 @@ import {
toLegacyCrawlerOptions,
} from "./types";
import {
addCrawlJob,
addCrawlJobs,
crawlToCrawler,
lockURL,
lockURLs,
saveCrawl,
StoredCrawl,
} from "../../lib/crawl-redis";
import { logCrawl } from "../../services/logging/crawl_log";
import { getScrapeQueue } from "../../services/queue-service";
import { _addScrapeJobToBullMQ, addScrapeJob, addScrapeJobs } from "../../services/queue-jobs";
import { _addScrapeJobToBullMQ } from "../../services/queue-jobs";
import { logger as _logger } from "../../lib/logger";
import { getJobPriority } from "../../lib/job-priority";
import { callWebhook } from "../../services/webhook";
import { scrapeOptions as scrapeOptionsSchema } from "./types";
export async function crawlController(
req: RequestWithAuth<{}, CrawlResponse, CrawlRequest>,

View File

@ -315,6 +315,22 @@ export async function lockURLs(
return res;
}
export async function lockURLsIndividually(
id: string,
sc: StoredCrawl,
jobs: { id: string; url: string; }[],
) {
const out: typeof jobs = [];
for (const job of jobs) {
if (await lockURL(id, sc, job.url)) {
out.push(job);
}
}
return out;
}
export function crawlToCrawler(
id: string,
sc: StoredCrawl,

View File

@ -471,7 +471,7 @@ export class WebCrawler {
}
private async tryFetchSitemapLinks(url: string, urlsHandler: (urls: string[]) => unknown): Promise<number> {
const sitemapUrl = url.endsWith(".xml") ? url : `${url}/sitemap.xml`;
const sitemapUrl = url.endsWith(".xml") ? url : `${url}${url.endsWith("/") ? "" : "/"}sitemap.xml`;
let sitemapCount: number = 0;

View File

@ -26,7 +26,9 @@ import {
getCrawlJobs,
lockURL,
lockURLs,
lockURLsIndividually,
normalizeURL,
saveCrawl,
} from "../lib/crawl-redis";
import { StoredCrawl } from "../lib/crawl-redis";
import { addScrapeJob, addScrapeJobs } from "./queue-jobs";
@ -480,6 +482,47 @@ async function processKickoffJob(job: Job & { id: string }, token: string) {
const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl;
const crawler = crawlToCrawler(job.data.crawl_id, sc);
logger.debug("Locking URL...");
await lockURL(job.data.crawl_id, sc, job.data.url);
const jobId = uuidv4();
logger.debug("Adding scrape job to Redis...", { jobId });
await addScrapeJob(
{
url: job.data.url,
mode: "single_urls",
team_id: job.data.team_id,
crawlerOptions: job.data.crawlerOptions,
scrapeOptions: scrapeOptions.parse(job.data.scrapeOptions),
internalOptions: sc.internalOptions,
plan: job.data.plan!,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
webhook: job.data.webhook,
v1: job.data.v1,
isCrawlSourceScrape: true,
},
{
priority: 15,
},
jobId,
);
logger.debug("Adding scrape job to BullMQ...", { jobId });
await addCrawlJob(job.data.crawl_id, jobId);
if (job.data.webhook) {
logger.debug("Calling webhook with crawl.started...", {
webhook: job.data.webhook,
});
await callWebhook(
job.data.team_id,
job.data.crawl_id,
null,
job.data.webhook,
true,
"crawl.started",
);
}
const sitemap = sc.crawlerOptions.ignoreSitemap
? 0
: await crawler.tryGetSitemap(async (urls) => {
@ -522,68 +565,29 @@ async function processKickoffJob(job: Job & { id: string }, token: string) {
});
logger.debug("Locking URLs...");
await lockURLs(
const lockedIds = await lockURLsIndividually(
job.data.crawl_id,
sc,
jobs.map((x) => x.data.url),
jobs.map((x) => ({ id: x.opts.jobId, url: x.data.url })),
);
const lockedJobs = jobs.filter(x => lockedIds.find(y => y.id === x.opts.jobId));
logger.debug("Adding scrape jobs to Redis...");
await addCrawlJobs(
job.data.crawl_id,
jobs.map((x) => x.opts.jobId),
lockedJobs.map((x) => x.opts.jobId),
);
logger.debug("Adding scrape jobs to BullMQ...");
await addScrapeJobs(jobs);
await addScrapeJobs(lockedJobs);
});
if (sitemap === 0) {
logger.debug("Sitemap not found or ignored.", {
ignoreSitemap: sc.crawlerOptions.ignoreSitemap,
});
logger.debug("Locking URL...");
await lockURL(job.data.crawl_id, sc, job.data.url);
const jobId = uuidv4();
logger.debug("Adding scrape job to Redis...", { jobId });
await addScrapeJob(
{
url: job.data.url,
mode: "single_urls",
team_id: job.data.team_id,
crawlerOptions: job.data.crawlerOptions,
scrapeOptions: scrapeOptions.parse(job.data.scrapeOptions),
internalOptions: sc.internalOptions,
plan: job.data.plan!,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
webhook: job.data.webhook,
v1: job.data.v1,
isCrawlSourceScrape: true,
},
{
priority: 15,
},
jobId,
);
logger.debug("Adding scrape job to BullMQ...", { jobId });
await addCrawlJob(job.data.crawl_id, jobId);
}
logger.debug("Done queueing jobs!");
if (job.data.webhook) {
logger.debug("Calling webhook with crawl.started...", {
webhook: job.data.webhook,
});
await callWebhook(
job.data.team_id,
job.data.crawl_id,
null,
job.data.webhook,
true,
"crawl.started",
);
}
return { success: true };
} catch (error) {
logger.error("An error occurred!", { error });
@ -720,16 +724,18 @@ async function processJob(job: Job & { id: string }, token: string) {
) {
const crawler = crawlToCrawler(job.data.crawl_id, sc);
if (
crawler.filterURL(doc.metadata.url, doc.metadata.sourceURL) === null
crawler.filterURL(doc.metadata.url, doc.metadata.sourceURL) === null &&
!job.data.isCrawlSourceScrape
) {
if (job.data.isCrawlSourceScrape) {
// TODO: re-fetch sitemap for redirect target domain
// TODO: reset crawl source url to new target
} else {
throw new Error(
"Redirected target URL is not allowed by crawlOptions",
); // TODO: make this its own error type that is ignored by error tracking
}
throw new Error(
"Redirected target URL is not allowed by crawlOptions",
); // TODO: make this its own error type that is ignored by error tracking
}
if (job.data.isCrawlSourceScrape) {
// TODO: re-fetch sitemap for redirect target domain
sc.originUrl = doc.metadata.url;
await saveCrawl(job.data.crawl_id, sc);
}
if (isUrlBlocked(doc.metadata.url)) {