mirror of
https://git.mirrors.martin98.com/https://github.com/mendableai/firecrawl
synced 2025-08-12 04:18:59 +08:00
Merge pull request #1073 from mendableai/nsc/index-queue
(feat/index) Index/Insertion queue
This commit is contained in:
commit
34ad9ec25d
@ -13,6 +13,9 @@ if [ $FLY_PROCESS_GROUP = "app" ]; then
|
||||
elif [ $FLY_PROCESS_GROUP = "worker" ]; then
|
||||
echo "RUNNING worker"
|
||||
node --max-old-space-size=8192 dist/src/services/queue-worker.js
|
||||
elif [ $FLY_PROCESS_GROUP = "index-worker" ]; then
|
||||
echo "RUNNING index worker"
|
||||
node --max-old-space-size=8192 dist/src/services/indexing/index-worker.js
|
||||
else
|
||||
echo "NO FLY PROCESS GROUP"
|
||||
node --max-old-space-size=8192 dist/src/index.js
|
||||
|
@ -18,6 +18,8 @@
|
||||
"test:snips": "npx jest --detectOpenHandles --forceExit --openHandlesTimeout=120000 --watchAll=false src/__tests__/snips/*.test.ts",
|
||||
"workers": "nodemon --exec ts-node src/services/queue-worker.ts",
|
||||
"worker:production": "node dist/src/services/queue-worker.js",
|
||||
"index-worker": "nodemon --exec ts-node src/services/indexing/index-worker.ts",
|
||||
"index-worker:production": "node dist/src/services/indexing/index-worker.js",
|
||||
"mongo-docker": "docker run -d -p 2717:27017 -v ./mongo-data:/data/db --name mongodb mongo:latest",
|
||||
"mongo-docker-console": "docker exec -it mongodb mongosh",
|
||||
"run-example": "npx ts-node src/example.ts",
|
||||
|
@ -22,6 +22,7 @@ import { performCosineSimilarity } from "../../lib/map-cosine";
|
||||
import { logger } from "../../lib/logger";
|
||||
import Redis from "ioredis";
|
||||
import { querySitemapIndex } from "../../scraper/WebScraper/sitemap-index";
|
||||
import { getIndexQueue } from "../../services/queue-service";
|
||||
|
||||
configDotenv();
|
||||
const redis = new Redis(process.env.REDIS_URL!);
|
||||
@ -228,6 +229,17 @@ export async function getMapResults({
|
||||
|
||||
//
|
||||
|
||||
await getIndexQueue().add(
|
||||
id,
|
||||
{
|
||||
originUrl: url,
|
||||
visitedUrls: linksToReturn,
|
||||
},
|
||||
{
|
||||
priority: 10,
|
||||
}
|
||||
);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
links: linksToReturn,
|
||||
|
@ -4,7 +4,7 @@ import * as Sentry from "@sentry/node";
|
||||
import express, { NextFunction, Request, Response } from "express";
|
||||
import bodyParser from "body-parser";
|
||||
import cors from "cors";
|
||||
import { getExtractQueue, getScrapeQueue } from "./services/queue-service";
|
||||
import { getExtractQueue, getScrapeQueue, getIndexQueue } from "./services/queue-service";
|
||||
import { v0Router } from "./routes/v0";
|
||||
import os from "os";
|
||||
import { logger } from "./lib/logger";
|
||||
@ -48,6 +48,7 @@ const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
|
||||
queues: [
|
||||
new BullAdapter(getScrapeQueue()),
|
||||
new BullAdapter(getExtractQueue()),
|
||||
new BullAdapter(getIndexQueue()),
|
||||
],
|
||||
serverAdapter: serverAdapter,
|
||||
});
|
||||
|
@ -24,21 +24,40 @@ export async function getLinksFromSitemap(
|
||||
let content: string = "";
|
||||
try {
|
||||
if (mode === "fire-engine" && useFireEngine) {
|
||||
const response = await scrapeURL(
|
||||
"sitemap",
|
||||
// Try TLS client first
|
||||
const tlsResponse = await scrapeURL(
|
||||
"sitemap",
|
||||
sitemapUrl,
|
||||
scrapeOptions.parse({ formats: ["rawHtml"] }),
|
||||
{ forceEngine: "fire-engine;tlsclient", v0DisableJsDom: true },
|
||||
);
|
||||
if (!response.success) {
|
||||
logger.debug(
|
||||
"Failed to scrape sitemap via TLSClient, falling back to axios...",
|
||||
{ error: response.error },
|
||||
);
|
||||
const ar = await axios.get(sitemapUrl, { timeout: axiosTimeout });
|
||||
content = ar.data;
|
||||
|
||||
if (tlsResponse.success) {
|
||||
content = tlsResponse.document.rawHtml!;
|
||||
} else {
|
||||
content = response.document.rawHtml!;
|
||||
logger.debug(
|
||||
"Failed to scrape sitemap via TLSClient, trying Chrome CDP...",
|
||||
{ error: tlsResponse.error },
|
||||
);
|
||||
|
||||
// Try Chrome CDP next
|
||||
const cdpResponse = await scrapeURL(
|
||||
"sitemap",
|
||||
sitemapUrl,
|
||||
scrapeOptions.parse({ formats: ["rawHtml"] }),
|
||||
{ forceEngine: "fire-engine;chrome-cdp" },
|
||||
);
|
||||
|
||||
if (cdpResponse.success) {
|
||||
content = cdpResponse.document.rawHtml!;
|
||||
} else {
|
||||
logger.debug(
|
||||
"Failed to scrape sitemap via Chrome CDP, falling back to axios...",
|
||||
{ error: cdpResponse.error },
|
||||
);
|
||||
const ar = await axios.get(sitemapUrl, { timeout: axiosTimeout });
|
||||
content = ar.data;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const response = await axios.get(sitemapUrl, { timeout: axiosTimeout });
|
||||
|
@ -4,59 +4,213 @@ import {
|
||||
normalizeUrlOnlyHostname,
|
||||
} from "../../lib/canonical-url";
|
||||
import { supabase_service } from "../supabase";
|
||||
import { redisConnection } from "../queue-service";
|
||||
|
||||
const BATCH_KEY = "crawl_maps_batch";
|
||||
const BATCH_LOCK_KEY = "crawl_maps_batch_lock";
|
||||
const BATCH_SIZE = 20;
|
||||
const BATCH_TIMEOUT = 20000; // 10 seconds
|
||||
const LOCK_TIMEOUT = 30000; // 30 seconds
|
||||
|
||||
interface CrawlMapOperation {
|
||||
originUrl: string;
|
||||
standardizedUrls: string[];
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
async function acquireLock(): Promise<boolean> {
|
||||
const redis = redisConnection;
|
||||
// Set lock with NX (only if it doesn't exist) and PX (millisecond expiry)
|
||||
const result = await redis.set(BATCH_LOCK_KEY, "1", "PX", LOCK_TIMEOUT, "NX");
|
||||
const acquired = result === "OK";
|
||||
if (acquired) {
|
||||
logger.info("🔒 Acquired batch processing lock");
|
||||
}
|
||||
return acquired;
|
||||
}
|
||||
|
||||
async function releaseLock() {
|
||||
const redis = redisConnection;
|
||||
await redis.del(BATCH_LOCK_KEY);
|
||||
logger.info("🔓 Released batch processing lock");
|
||||
}
|
||||
|
||||
async function processBatch() {
|
||||
const redis = redisConnection;
|
||||
|
||||
// Try to acquire lock
|
||||
if (!(await acquireLock())) {
|
||||
return;
|
||||
}
|
||||
|
||||
export async function saveCrawlMap(originUrl: string, visitedUrls: string[]) {
|
||||
originUrl = normalizeUrlOnlyHostname(originUrl);
|
||||
// Fire and forget the upload to Supabase
|
||||
try {
|
||||
// Standardize URLs to canonical form (https, no www)
|
||||
const standardizedUrls = [
|
||||
...new Set(
|
||||
visitedUrls.map((url) => {
|
||||
return normalizeUrl(url);
|
||||
}),
|
||||
),
|
||||
];
|
||||
// First check if entry exists for this origin URL
|
||||
const { data: existingMap } = await supabase_service
|
||||
// Get all operations from Redis list
|
||||
const operations: CrawlMapOperation[] = [];
|
||||
while (operations.length < BATCH_SIZE) {
|
||||
const op = await redis.lpop(BATCH_KEY);
|
||||
if (!op) break;
|
||||
operations.push(JSON.parse(op));
|
||||
}
|
||||
|
||||
if (operations.length === 0) {
|
||||
logger.info("No operations to process in batch");
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info(`📦 Processing batch of ${operations.length} operations`, {
|
||||
origins: operations.map((op) => op.originUrl),
|
||||
});
|
||||
|
||||
// Get existing maps for all origins in batch
|
||||
const origins = operations.map((op) => op.originUrl);
|
||||
const { data: existingMaps } = await supabase_service
|
||||
.from("crawl_maps")
|
||||
.select("urls")
|
||||
.eq("origin_url", originUrl)
|
||||
.single();
|
||||
.select("origin_url, urls")
|
||||
.in("origin_url", origins);
|
||||
|
||||
if (existingMap) {
|
||||
// Merge URLs, removing duplicates
|
||||
const mergedUrls = [
|
||||
...new Set([...existingMap.urls, ...standardizedUrls]),
|
||||
];
|
||||
const existingMapsByOrigin = new Map(
|
||||
existingMaps?.map((map) => [map.origin_url, map.urls]) || [],
|
||||
);
|
||||
|
||||
const { error } = await supabase_service
|
||||
.from("crawl_maps")
|
||||
.update({
|
||||
// Prepare updates and inserts
|
||||
interface CrawlMapRecord {
|
||||
origin_url: string;
|
||||
urls: string[];
|
||||
num_urls: number;
|
||||
updated_at: string;
|
||||
created_at?: string;
|
||||
}
|
||||
|
||||
const updates: CrawlMapRecord[] = [];
|
||||
const inserts: CrawlMapRecord[] = [];
|
||||
|
||||
for (const op of operations) {
|
||||
const existingUrls = existingMapsByOrigin.get(op.originUrl);
|
||||
|
||||
if (existingUrls) {
|
||||
// Merge URLs for update
|
||||
const mergedUrls = [
|
||||
...new Set([...existingUrls, ...op.standardizedUrls]),
|
||||
];
|
||||
updates.push({
|
||||
origin_url: op.originUrl,
|
||||
urls: mergedUrls,
|
||||
num_urls: mergedUrls.length,
|
||||
updated_at: new Date().toISOString(),
|
||||
})
|
||||
.eq("origin_url", originUrl);
|
||||
|
||||
if (error) {
|
||||
logger.error("Failed to update crawl map", { error });
|
||||
}
|
||||
} else {
|
||||
// Insert new entry if none exists
|
||||
const { error } = await supabase_service.from("crawl_maps").insert({
|
||||
origin_url: originUrl,
|
||||
urls: standardizedUrls,
|
||||
num_urls: standardizedUrls.length,
|
||||
created_at: new Date().toISOString(),
|
||||
updated_at: new Date().toISOString(),
|
||||
});
|
||||
|
||||
if (error) {
|
||||
logger.error("Failed to save crawl map", { error });
|
||||
updated_at: op.timestamp,
|
||||
});
|
||||
} else {
|
||||
// Prepare insert
|
||||
inserts.push({
|
||||
origin_url: op.originUrl,
|
||||
urls: op.standardizedUrls,
|
||||
num_urls: op.standardizedUrls.length,
|
||||
created_at: op.timestamp,
|
||||
updated_at: op.timestamp,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Execute batch operations
|
||||
if (updates.length > 0) {
|
||||
logger.info(`🔄 Updating ${updates.length} existing crawl maps`, {
|
||||
origins: updates.map((u) => u.origin_url),
|
||||
});
|
||||
const { error: updateError } = await supabase_service
|
||||
.from("crawl_maps")
|
||||
.upsert(updates);
|
||||
|
||||
if (updateError) {
|
||||
logger.error("Failed to batch update crawl maps", {
|
||||
error: updateError,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (inserts.length > 0) {
|
||||
logger.info(`➕ Inserting ${inserts.length} new crawl maps`, {
|
||||
origins: inserts.map((i) => i.origin_url),
|
||||
});
|
||||
const { error: insertError } = await supabase_service
|
||||
.from("crawl_maps")
|
||||
.insert(inserts);
|
||||
|
||||
if (insertError) {
|
||||
logger.error("Failed to batch insert crawl maps", {
|
||||
error: insertError,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("✅ Batch processing completed successfully");
|
||||
} catch (error) {
|
||||
logger.error("Error saving crawl map", { error });
|
||||
logger.error("Error processing crawl map batch", { error });
|
||||
} finally {
|
||||
await releaseLock();
|
||||
}
|
||||
}
|
||||
|
||||
// Start periodic batch processing
|
||||
let batchInterval: NodeJS.Timeout | null = null;
|
||||
|
||||
function startBatchProcessing() {
|
||||
if (batchInterval) return;
|
||||
|
||||
logger.info("🔄 Starting periodic batch processing");
|
||||
batchInterval = setInterval(async () => {
|
||||
const queueLength = await redisConnection.llen(BATCH_KEY);
|
||||
logger.info(`Checking batch queue (${queueLength} items pending)`);
|
||||
await processBatch();
|
||||
}, BATCH_TIMEOUT);
|
||||
|
||||
// Unref to not keep process alive
|
||||
batchInterval.unref();
|
||||
}
|
||||
|
||||
export async function saveCrawlMap(originUrl: string, visitedUrls: string[]) {
|
||||
logger.info("Queueing crawl map", { originUrl });
|
||||
originUrl = normalizeUrlOnlyHostname(originUrl);
|
||||
|
||||
try {
|
||||
// Standardize URLs to canonical form
|
||||
const standardizedUrls = [
|
||||
...new Set(visitedUrls.map((url) => normalizeUrl(url))),
|
||||
];
|
||||
|
||||
const operation: CrawlMapOperation = {
|
||||
originUrl,
|
||||
standardizedUrls,
|
||||
timestamp: new Date().toISOString(),
|
||||
};
|
||||
|
||||
// Add operation to Redis list
|
||||
const redis = redisConnection;
|
||||
await redis.rpush(BATCH_KEY, JSON.stringify(operation));
|
||||
const queueLength = await redis.llen(BATCH_KEY);
|
||||
logger.info(`📥 Added operation to queue (${queueLength} total pending)`, {
|
||||
originUrl,
|
||||
});
|
||||
|
||||
// Start batch processing if not already started
|
||||
startBatchProcessing();
|
||||
|
||||
// If we have enough items, trigger immediate processing
|
||||
if (queueLength >= BATCH_SIZE) {
|
||||
logger.info(
|
||||
"🔄 Queue reached batch size, triggering immediate processing",
|
||||
);
|
||||
await processBatch();
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error("Error queueing crawl map", { error });
|
||||
}
|
||||
}
|
||||
|
||||
// Cleanup on exit
|
||||
process.on("beforeExit", async () => {
|
||||
if (batchInterval) {
|
||||
clearInterval(batchInterval);
|
||||
batchInterval = null;
|
||||
logger.info("Stopped periodic batch processing");
|
||||
}
|
||||
await processBatch();
|
||||
});
|
||||
|
161
apps/api/src/services/indexing/index-worker.ts
Normal file
161
apps/api/src/services/indexing/index-worker.ts
Normal file
@ -0,0 +1,161 @@
|
||||
import "dotenv/config";
|
||||
import "../sentry";
|
||||
import * as Sentry from "@sentry/node";
|
||||
import { Job, Queue, Worker } from "bullmq";
|
||||
import { logger as _logger, logger } from "../../lib/logger";
|
||||
import { redisConnection, indexQueueName, getIndexQueue } from "../queue-service";
|
||||
import { saveCrawlMap } from "./crawl-maps-index";
|
||||
import systemMonitor from "../system-monitor";
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
|
||||
const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000;
|
||||
const workerStalledCheckInterval = Number(process.env.WORKER_STALLED_CHECK_INTERVAL) || 30000;
|
||||
const jobLockExtendInterval = Number(process.env.JOB_LOCK_EXTEND_INTERVAL) || 15000;
|
||||
const jobLockExtensionTime = Number(process.env.JOB_LOCK_EXTENSION_TIME) || 60000;
|
||||
|
||||
const cantAcceptConnectionInterval = Number(process.env.CANT_ACCEPT_CONNECTION_INTERVAL) || 2000;
|
||||
const connectionMonitorInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10;
|
||||
const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20;
|
||||
|
||||
const runningJobs: Set<string> = new Set();
|
||||
|
||||
const processJobInternal = async (token: string, job: Job) => {
|
||||
if (!job.id) {
|
||||
throw new Error("Job has no ID");
|
||||
}
|
||||
|
||||
const logger = _logger.child({
|
||||
module: "index-worker",
|
||||
method: "processJobInternal",
|
||||
jobId: job.id,
|
||||
});
|
||||
|
||||
const extendLockInterval = setInterval(async () => {
|
||||
logger.info(`🔄 Worker extending lock on job ${job.id}`);
|
||||
await job.extendLock(token, jobLockExtensionTime);
|
||||
}, jobLockExtendInterval);
|
||||
|
||||
let err = null;
|
||||
try {
|
||||
const { originUrl, visitedUrls } = job.data;
|
||||
await saveCrawlMap(originUrl, visitedUrls);
|
||||
await job.moveToCompleted({ success: true }, token, false);
|
||||
} catch (error) {
|
||||
logger.error("Error processing index job", { error });
|
||||
Sentry.captureException(error);
|
||||
err = error;
|
||||
await job.moveToFailed(error, token, false);
|
||||
} finally {
|
||||
clearInterval(extendLockInterval);
|
||||
}
|
||||
|
||||
return err;
|
||||
};
|
||||
|
||||
let isShuttingDown = false;
|
||||
|
||||
process.on("SIGINT", () => {
|
||||
logger.info("Received SIGTERM. Shutting down gracefully...");
|
||||
isShuttingDown = true;
|
||||
});
|
||||
|
||||
process.on("SIGTERM", () => {
|
||||
logger.info("Received SIGTERM. Shutting down gracefully...");
|
||||
isShuttingDown = true;
|
||||
});
|
||||
|
||||
let cantAcceptConnectionCount = 0;
|
||||
|
||||
const workerFun = async (queue: Queue) => {
|
||||
const logger = _logger.child({ module: "index-worker", method: "workerFun" });
|
||||
|
||||
const worker = new Worker(queue.name, null, {
|
||||
connection: redisConnection,
|
||||
lockDuration: workerLockDuration,
|
||||
stalledInterval: workerStalledCheckInterval,
|
||||
maxStalledCount: 10,
|
||||
});
|
||||
|
||||
worker.startStalledCheckTimer();
|
||||
|
||||
const monitor = await systemMonitor;
|
||||
|
||||
while (true) {
|
||||
if (isShuttingDown) {
|
||||
logger.info("No longer accepting new jobs. SIGINT");
|
||||
break;
|
||||
}
|
||||
|
||||
const token = uuidv4();
|
||||
const canAcceptConnection = await monitor.acceptConnection();
|
||||
|
||||
if (!canAcceptConnection) {
|
||||
logger.info("Cant accept connection");
|
||||
cantAcceptConnectionCount++;
|
||||
|
||||
if (cantAcceptConnectionCount >= 25) {
|
||||
logger.error("WORKER STALLED", {
|
||||
cpuUsage: await monitor.checkCpuUsage(),
|
||||
memoryUsage: await monitor.checkMemoryUsage(),
|
||||
});
|
||||
}
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, cantAcceptConnectionInterval));
|
||||
continue;
|
||||
} else {
|
||||
cantAcceptConnectionCount = 0;
|
||||
}
|
||||
|
||||
const job = await worker.getNextJob(token);
|
||||
if (job) {
|
||||
if (job.id) {
|
||||
runningJobs.add(job.id);
|
||||
}
|
||||
|
||||
if (job.data && job.data.sentry && Sentry.isInitialized()) {
|
||||
Sentry.continueTrace(
|
||||
{
|
||||
sentryTrace: job.data.sentry.trace,
|
||||
baggage: job.data.sentry.baggage,
|
||||
},
|
||||
() => {
|
||||
Sentry.startSpan(
|
||||
{
|
||||
name: "Index job",
|
||||
attributes: {
|
||||
job: job.id,
|
||||
worker: process.env.FLY_MACHINE_ID ?? worker.id,
|
||||
},
|
||||
},
|
||||
async () => {
|
||||
await processJobInternal(token, job);
|
||||
},
|
||||
);
|
||||
},
|
||||
);
|
||||
} else {
|
||||
await processJobInternal(token, job);
|
||||
}
|
||||
|
||||
if (job.id) {
|
||||
runningJobs.delete(job.id);
|
||||
}
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, gotJobInterval));
|
||||
} else {
|
||||
await new Promise(resolve => setTimeout(resolve, connectionMonitorInterval));
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("Worker loop ended. Waiting for running jobs to finish...");
|
||||
while (runningJobs.size > 0) {
|
||||
await new Promise(resolve => setTimeout(resolve, 500));
|
||||
}
|
||||
logger.info("All jobs finished. Worker exiting!");
|
||||
process.exit(0);
|
||||
};
|
||||
|
||||
// Start the worker
|
||||
(async () => {
|
||||
await workerFun(getIndexQueue());
|
||||
})();
|
@ -5,6 +5,7 @@ import IORedis from "ioredis";
|
||||
let scrapeQueue: Queue;
|
||||
let extractQueue: Queue;
|
||||
let loggingQueue: Queue;
|
||||
let indexQueue: Queue;
|
||||
|
||||
export const redisConnection = new IORedis(process.env.REDIS_URL!, {
|
||||
maxRetriesPerRequest: null,
|
||||
@ -13,6 +14,7 @@ export const redisConnection = new IORedis(process.env.REDIS_URL!, {
|
||||
export const scrapeQueueName = "{scrapeQueue}";
|
||||
export const extractQueueName = "{extractQueue}";
|
||||
export const loggingQueueName = "{loggingQueue}";
|
||||
export const indexQueueName = "{indexQueue}";
|
||||
|
||||
export function getScrapeQueue() {
|
||||
if (!scrapeQueue) {
|
||||
@ -50,6 +52,24 @@ export function getExtractQueue() {
|
||||
return extractQueue;
|
||||
}
|
||||
|
||||
export function getIndexQueue() {
|
||||
if (!indexQueue) {
|
||||
indexQueue = new Queue(indexQueueName, {
|
||||
connection: redisConnection,
|
||||
defaultJobOptions: {
|
||||
removeOnComplete: {
|
||||
age: 90000, // 25 hours
|
||||
},
|
||||
removeOnFail: {
|
||||
age: 90000, // 25 hours
|
||||
},
|
||||
},
|
||||
});
|
||||
logger.info("Index queue created");
|
||||
}
|
||||
return indexQueue;
|
||||
}
|
||||
|
||||
// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE
|
||||
// import { QueueEvents } from 'bullmq';
|
||||
// export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() });
|
||||
|
@ -8,6 +8,7 @@ import {
|
||||
redisConnection,
|
||||
scrapeQueueName,
|
||||
extractQueueName,
|
||||
getIndexQueue,
|
||||
} from "./queue-service";
|
||||
import { startWebScraperPipeline } from "../main/runWebScraper";
|
||||
import { callWebhook } from "./webhook";
|
||||
@ -103,9 +104,17 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) {
|
||||
job.data.crawlerOptions !== null &&
|
||||
originUrl
|
||||
) {
|
||||
saveCrawlMap(originUrl, visitedUrls).catch((e) => {
|
||||
_logger.error("Error saving crawl map", { error: e });
|
||||
});
|
||||
// Queue the indexing job instead of doing it directly
|
||||
await getIndexQueue().add(
|
||||
job.data.crawl_id,
|
||||
{
|
||||
originUrl,
|
||||
visitedUrls,
|
||||
},
|
||||
{
|
||||
priority: 10,
|
||||
}
|
||||
);
|
||||
}
|
||||
})();
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user