This commit is contained in:
Nicolas 2025-01-19 13:09:29 -03:00
parent 513f61a2d1
commit 92b8d97be3
9 changed files with 439 additions and 58 deletions

View File

@ -13,6 +13,9 @@ if [ $FLY_PROCESS_GROUP = "app" ]; then
elif [ $FLY_PROCESS_GROUP = "worker" ]; then
echo "RUNNING worker"
node --max-old-space-size=8192 dist/src/services/queue-worker.js
elif [ $FLY_PROCESS_GROUP = "index-worker" ]; then
echo "RUNNING index worker"
node --max-old-space-size=8192 dist/src/services/indexing/index-worker.js
else
echo "NO FLY PROCESS GROUP"
node --max-old-space-size=8192 dist/src/index.js

View File

@ -18,6 +18,8 @@
"test:snips": "npx jest --detectOpenHandles --forceExit --openHandlesTimeout=120000 --watchAll=false src/__tests__/snips/*.test.ts",
"workers": "nodemon --exec ts-node src/services/queue-worker.ts",
"worker:production": "node dist/src/services/queue-worker.js",
"index-worker": "nodemon --exec ts-node src/services/indexing/index-worker.ts",
"index-worker:production": "node dist/src/services/indexing/index-worker.js",
"mongo-docker": "docker run -d -p 2717:27017 -v ./mongo-data:/data/db --name mongodb mongo:latest",
"mongo-docker-console": "docker exec -it mongodb mongosh",
"run-example": "npx ts-node src/example.ts",

View File

@ -22,6 +22,7 @@ import { performCosineSimilarity } from "../../lib/map-cosine";
import { logger } from "../../lib/logger";
import Redis from "ioredis";
import { querySitemapIndex } from "../../scraper/WebScraper/sitemap-index";
import { getIndexQueue } from "../../services/queue-service";
configDotenv();
const redis = new Redis(process.env.REDIS_URL!);
@ -228,6 +229,17 @@ export async function getMapResults({
//
await getIndexQueue().add(
id,
{
originUrl: url,
visitedUrls: linksToReturn,
},
{
priority: 10,
}
);
return {
success: true,
links: linksToReturn,

View File

@ -4,7 +4,7 @@ import * as Sentry from "@sentry/node";
import express, { NextFunction, Request, Response } from "express";
import bodyParser from "body-parser";
import cors from "cors";
import { getExtractQueue, getScrapeQueue } from "./services/queue-service";
import { getExtractQueue, getScrapeQueue, getIndexQueue } from "./services/queue-service";
import { v0Router } from "./routes/v0";
import os from "os";
import { logger } from "./lib/logger";
@ -48,6 +48,7 @@ const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
queues: [
new BullAdapter(getScrapeQueue()),
new BullAdapter(getExtractQueue()),
new BullAdapter(getIndexQueue()),
],
serverAdapter: serverAdapter,
});

View File

@ -24,21 +24,40 @@ export async function getLinksFromSitemap(
let content: string = "";
try {
if (mode === "fire-engine" && useFireEngine) {
const response = await scrapeURL(
"sitemap",
// Try TLS client first
const tlsResponse = await scrapeURL(
"sitemap",
sitemapUrl,
scrapeOptions.parse({ formats: ["rawHtml"] }),
{ forceEngine: "fire-engine;tlsclient", v0DisableJsDom: true },
);
if (!response.success) {
logger.debug(
"Failed to scrape sitemap via TLSClient, falling back to axios...",
{ error: response.error },
);
const ar = await axios.get(sitemapUrl, { timeout: axiosTimeout });
content = ar.data;
if (tlsResponse.success) {
content = tlsResponse.document.rawHtml!;
} else {
content = response.document.rawHtml!;
logger.debug(
"Failed to scrape sitemap via TLSClient, trying Chrome CDP...",
{ error: tlsResponse.error },
);
// Try Chrome CDP next
const cdpResponse = await scrapeURL(
"sitemap",
sitemapUrl,
scrapeOptions.parse({ formats: ["rawHtml"] }),
{ forceEngine: "fire-engine;chrome-cdp" },
);
if (cdpResponse.success) {
content = cdpResponse.document.rawHtml!;
} else {
logger.debug(
"Failed to scrape sitemap via Chrome CDP, falling back to axios...",
{ error: cdpResponse.error },
);
const ar = await axios.get(sitemapUrl, { timeout: axiosTimeout });
content = ar.data;
}
}
} else {
const response = await axios.get(sitemapUrl, { timeout: axiosTimeout });

View File

@ -4,59 +4,213 @@ import {
normalizeUrlOnlyHostname,
} from "../../lib/canonical-url";
import { supabase_service } from "../supabase";
import { redisConnection } from "../queue-service";
const BATCH_KEY = "crawl_maps_batch";
const BATCH_LOCK_KEY = "crawl_maps_batch_lock";
const BATCH_SIZE = 10;
const BATCH_TIMEOUT = 10000; // 10 seconds
const LOCK_TIMEOUT = 30000; // 30 seconds
interface CrawlMapOperation {
originUrl: string;
standardizedUrls: string[];
timestamp: string;
}
async function acquireLock(): Promise<boolean> {
const redis = redisConnection;
// Set lock with NX (only if it doesn't exist) and PX (millisecond expiry)
const result = await redis.set(BATCH_LOCK_KEY, "1", "PX", LOCK_TIMEOUT, "NX");
const acquired = result === "OK";
if (acquired) {
logger.info("🔒 Acquired batch processing lock");
}
return acquired;
}
async function releaseLock() {
const redis = redisConnection;
await redis.del(BATCH_LOCK_KEY);
logger.info("🔓 Released batch processing lock");
}
async function processBatch() {
const redis = redisConnection;
// Try to acquire lock
if (!(await acquireLock())) {
return;
}
export async function saveCrawlMap(originUrl: string, visitedUrls: string[]) {
originUrl = normalizeUrlOnlyHostname(originUrl);
// Fire and forget the upload to Supabase
try {
// Standardize URLs to canonical form (https, no www)
const standardizedUrls = [
...new Set(
visitedUrls.map((url) => {
return normalizeUrl(url);
}),
),
];
// First check if entry exists for this origin URL
const { data: existingMap } = await supabase_service
// Get all operations from Redis list
const operations: CrawlMapOperation[] = [];
while (operations.length < BATCH_SIZE) {
const op = await redis.lpop(BATCH_KEY);
if (!op) break;
operations.push(JSON.parse(op));
}
if (operations.length === 0) {
logger.info("No operations to process in batch");
return;
}
logger.info(`📦 Processing batch of ${operations.length} operations`, {
origins: operations.map((op) => op.originUrl),
});
// Get existing maps for all origins in batch
const origins = operations.map((op) => op.originUrl);
const { data: existingMaps } = await supabase_service
.from("crawl_maps")
.select("urls")
.eq("origin_url", originUrl)
.single();
.select("origin_url, urls")
.in("origin_url", origins);
if (existingMap) {
// Merge URLs, removing duplicates
const mergedUrls = [
...new Set([...existingMap.urls, ...standardizedUrls]),
];
const existingMapsByOrigin = new Map(
existingMaps?.map((map) => [map.origin_url, map.urls]) || [],
);
const { error } = await supabase_service
.from("crawl_maps")
.update({
// Prepare updates and inserts
interface CrawlMapRecord {
origin_url: string;
urls: string[];
num_urls: number;
updated_at: string;
created_at?: string;
}
const updates: CrawlMapRecord[] = [];
const inserts: CrawlMapRecord[] = [];
for (const op of operations) {
const existingUrls = existingMapsByOrigin.get(op.originUrl);
if (existingUrls) {
// Merge URLs for update
const mergedUrls = [
...new Set([...existingUrls, ...op.standardizedUrls]),
];
updates.push({
origin_url: op.originUrl,
urls: mergedUrls,
num_urls: mergedUrls.length,
updated_at: new Date().toISOString(),
})
.eq("origin_url", originUrl);
if (error) {
logger.error("Failed to update crawl map", { error });
}
} else {
// Insert new entry if none exists
const { error } = await supabase_service.from("crawl_maps").insert({
origin_url: originUrl,
urls: standardizedUrls,
num_urls: standardizedUrls.length,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
});
if (error) {
logger.error("Failed to save crawl map", { error });
updated_at: op.timestamp,
});
} else {
// Prepare insert
inserts.push({
origin_url: op.originUrl,
urls: op.standardizedUrls,
num_urls: op.standardizedUrls.length,
created_at: op.timestamp,
updated_at: op.timestamp,
});
}
}
// Execute batch operations
if (updates.length > 0) {
logger.info(`🔄 Updating ${updates.length} existing crawl maps`, {
origins: updates.map((u) => u.origin_url),
});
const { error: updateError } = await supabase_service
.from("crawl_maps")
.upsert(updates);
if (updateError) {
logger.error("Failed to batch update crawl maps", {
error: updateError,
});
}
}
if (inserts.length > 0) {
logger.info(` Inserting ${inserts.length} new crawl maps`, {
origins: inserts.map((i) => i.origin_url),
});
const { error: insertError } = await supabase_service
.from("crawl_maps")
.insert(inserts);
if (insertError) {
logger.error("Failed to batch insert crawl maps", {
error: insertError,
});
}
}
logger.info("✅ Batch processing completed successfully");
} catch (error) {
logger.error("Error saving crawl map", { error });
logger.error("Error processing crawl map batch", { error });
} finally {
await releaseLock();
}
}
// Start periodic batch processing
let batchInterval: NodeJS.Timeout | null = null;
function startBatchProcessing() {
if (batchInterval) return;
logger.info("🔄 Starting periodic batch processing");
batchInterval = setInterval(async () => {
const queueLength = await redisConnection.llen(BATCH_KEY);
logger.info(`Checking batch queue (${queueLength} items pending)`);
await processBatch();
}, BATCH_TIMEOUT);
// Unref to not keep process alive
batchInterval.unref();
}
export async function saveCrawlMap(originUrl: string, visitedUrls: string[]) {
logger.info("Queueing crawl map", { originUrl });
originUrl = normalizeUrlOnlyHostname(originUrl);
try {
// Standardize URLs to canonical form
const standardizedUrls = [
...new Set(visitedUrls.map((url) => normalizeUrl(url))),
];
const operation: CrawlMapOperation = {
originUrl,
standardizedUrls,
timestamp: new Date().toISOString(),
};
// Add operation to Redis list
const redis = redisConnection;
await redis.rpush(BATCH_KEY, JSON.stringify(operation));
const queueLength = await redis.llen(BATCH_KEY);
logger.info(`📥 Added operation to queue (${queueLength} total pending)`, {
originUrl,
});
// Start batch processing if not already started
startBatchProcessing();
// If we have enough items, trigger immediate processing
if (queueLength >= BATCH_SIZE) {
logger.info(
"🔄 Queue reached batch size, triggering immediate processing",
);
await processBatch();
}
} catch (error) {
logger.error("Error queueing crawl map", { error });
}
}
// Cleanup on exit
process.on("beforeExit", async () => {
if (batchInterval) {
clearInterval(batchInterval);
batchInterval = null;
logger.info("Stopped periodic batch processing");
}
await processBatch();
});

View File

@ -0,0 +1,161 @@
import "dotenv/config";
import "../sentry";
import * as Sentry from "@sentry/node";
import { Job, Queue, Worker } from "bullmq";
import { logger as _logger, logger } from "../../lib/logger";
import { redisConnection, indexQueueName, getIndexQueue } from "../queue-service";
import { saveCrawlMap } from "./crawl-maps-index";
import systemMonitor from "../system-monitor";
import { v4 as uuidv4 } from "uuid";
const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000;
const workerStalledCheckInterval = Number(process.env.WORKER_STALLED_CHECK_INTERVAL) || 30000;
const jobLockExtendInterval = Number(process.env.JOB_LOCK_EXTEND_INTERVAL) || 15000;
const jobLockExtensionTime = Number(process.env.JOB_LOCK_EXTENSION_TIME) || 60000;
const cantAcceptConnectionInterval = Number(process.env.CANT_ACCEPT_CONNECTION_INTERVAL) || 2000;
const connectionMonitorInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10;
const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20;
const runningJobs: Set<string> = new Set();
const processJobInternal = async (token: string, job: Job) => {
if (!job.id) {
throw new Error("Job has no ID");
}
const logger = _logger.child({
module: "index-worker",
method: "processJobInternal",
jobId: job.id,
});
const extendLockInterval = setInterval(async () => {
logger.info(`🔄 Worker extending lock on job ${job.id}`);
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);
let err = null;
try {
const { originUrl, visitedUrls } = job.data;
await saveCrawlMap(originUrl, visitedUrls);
await job.moveToCompleted({ success: true }, token, false);
} catch (error) {
logger.error("Error processing index job", { error });
Sentry.captureException(error);
err = error;
await job.moveToFailed(error, token, false);
} finally {
clearInterval(extendLockInterval);
}
return err;
};
let isShuttingDown = false;
process.on("SIGINT", () => {
logger.info("Received SIGTERM. Shutting down gracefully...");
isShuttingDown = true;
});
process.on("SIGTERM", () => {
logger.info("Received SIGTERM. Shutting down gracefully...");
isShuttingDown = true;
});
let cantAcceptConnectionCount = 0;
const workerFun = async (queue: Queue) => {
const logger = _logger.child({ module: "index-worker", method: "workerFun" });
const worker = new Worker(queue.name, null, {
connection: redisConnection,
lockDuration: workerLockDuration,
stalledInterval: workerStalledCheckInterval,
maxStalledCount: 10,
});
worker.startStalledCheckTimer();
const monitor = await systemMonitor;
while (true) {
if (isShuttingDown) {
logger.info("No longer accepting new jobs. SIGINT");
break;
}
const token = uuidv4();
const canAcceptConnection = await monitor.acceptConnection();
if (!canAcceptConnection) {
logger.info("Cant accept connection");
cantAcceptConnectionCount++;
if (cantAcceptConnectionCount >= 25) {
logger.error("WORKER STALLED", {
cpuUsage: await monitor.checkCpuUsage(),
memoryUsage: await monitor.checkMemoryUsage(),
});
}
await new Promise(resolve => setTimeout(resolve, cantAcceptConnectionInterval));
continue;
} else {
cantAcceptConnectionCount = 0;
}
const job = await worker.getNextJob(token);
if (job) {
if (job.id) {
runningJobs.add(job.id);
}
if (job.data && job.data.sentry && Sentry.isInitialized()) {
Sentry.continueTrace(
{
sentryTrace: job.data.sentry.trace,
baggage: job.data.sentry.baggage,
},
() => {
Sentry.startSpan(
{
name: "Index job",
attributes: {
job: job.id,
worker: process.env.FLY_MACHINE_ID ?? worker.id,
},
},
async () => {
await processJobInternal(token, job);
},
);
},
);
} else {
await processJobInternal(token, job);
}
if (job.id) {
runningJobs.delete(job.id);
}
await new Promise(resolve => setTimeout(resolve, gotJobInterval));
} else {
await new Promise(resolve => setTimeout(resolve, connectionMonitorInterval));
}
}
logger.info("Worker loop ended. Waiting for running jobs to finish...");
while (runningJobs.size > 0) {
await new Promise(resolve => setTimeout(resolve, 500));
}
logger.info("All jobs finished. Worker exiting!");
process.exit(0);
};
// Start the worker
(async () => {
await workerFun(getIndexQueue());
})();

View File

@ -5,6 +5,7 @@ import IORedis from "ioredis";
let scrapeQueue: Queue;
let extractQueue: Queue;
let loggingQueue: Queue;
let indexQueue: Queue;
export const redisConnection = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null,
@ -13,6 +14,7 @@ export const redisConnection = new IORedis(process.env.REDIS_URL!, {
export const scrapeQueueName = "{scrapeQueue}";
export const extractQueueName = "{extractQueue}";
export const loggingQueueName = "{loggingQueue}";
export const indexQueueName = "{indexQueue}";
export function getScrapeQueue() {
if (!scrapeQueue) {
@ -50,6 +52,24 @@ export function getExtractQueue() {
return extractQueue;
}
export function getIndexQueue() {
if (!indexQueue) {
indexQueue = new Queue(indexQueueName, {
connection: redisConnection,
defaultJobOptions: {
removeOnComplete: {
age: 90000, // 25 hours
},
removeOnFail: {
age: 90000, // 25 hours
},
},
});
logger.info("Index queue created");
}
return indexQueue;
}
// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE
// import { QueueEvents } from 'bullmq';
// export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() });

View File

@ -8,6 +8,7 @@ import {
redisConnection,
scrapeQueueName,
extractQueueName,
getIndexQueue,
} from "./queue-service";
import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook";
@ -103,9 +104,17 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) {
job.data.crawlerOptions !== null &&
originUrl
) {
saveCrawlMap(originUrl, visitedUrls).catch((e) => {
_logger.error("Error saving crawl map", { error: e });
});
// Queue the indexing job instead of doing it directly
await getIndexQueue().add(
job.data.crawl_id,
{
originUrl,
visitedUrls,
},
{
priority: 10,
}
);
}
})();