From 92b8d97be3134dd13e30a1e4ce7b1aa52f535465 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 19 Jan 2025 13:09:29 -0300 Subject: [PATCH 1/2] Nick: --- apps/api/docker-entrypoint.sh | 3 + apps/api/package.json | 2 + apps/api/src/controllers/v1/map.ts | 12 + apps/api/src/index.ts | 3 +- apps/api/src/scraper/WebScraper/sitemap.ts | 39 ++- .../src/services/indexing/crawl-maps-index.ts | 242 ++++++++++++++---- .../api/src/services/indexing/index-worker.ts | 161 ++++++++++++ apps/api/src/services/queue-service.ts | 20 ++ apps/api/src/services/queue-worker.ts | 15 +- 9 files changed, 439 insertions(+), 58 deletions(-) create mode 100644 apps/api/src/services/indexing/index-worker.ts diff --git a/apps/api/docker-entrypoint.sh b/apps/api/docker-entrypoint.sh index 269e982f..a8bd3e2a 100755 --- a/apps/api/docker-entrypoint.sh +++ b/apps/api/docker-entrypoint.sh @@ -13,6 +13,9 @@ if [ $FLY_PROCESS_GROUP = "app" ]; then elif [ $FLY_PROCESS_GROUP = "worker" ]; then echo "RUNNING worker" node --max-old-space-size=8192 dist/src/services/queue-worker.js +elif [ $FLY_PROCESS_GROUP = "index-worker" ]; then + echo "RUNNING index worker" + node --max-old-space-size=8192 dist/src/services/indexing/index-worker.js else echo "NO FLY PROCESS GROUP" node --max-old-space-size=8192 dist/src/index.js diff --git a/apps/api/package.json b/apps/api/package.json index 88233235..c4cb5775 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -18,6 +18,8 @@ "test:snips": "npx jest --detectOpenHandles --forceExit --openHandlesTimeout=120000 --watchAll=false src/__tests__/snips/*.test.ts", "workers": "nodemon --exec ts-node src/services/queue-worker.ts", "worker:production": "node dist/src/services/queue-worker.js", + "index-worker": "nodemon --exec ts-node src/services/indexing/index-worker.ts", + "index-worker:production": "node dist/src/services/indexing/index-worker.js", "mongo-docker": "docker run -d -p 2717:27017 -v ./mongo-data:/data/db --name mongodb mongo:latest", "mongo-docker-console": "docker exec -it mongodb mongosh", "run-example": "npx ts-node src/example.ts", diff --git a/apps/api/src/controllers/v1/map.ts b/apps/api/src/controllers/v1/map.ts index 6f8c0b15..75e253ba 100644 --- a/apps/api/src/controllers/v1/map.ts +++ b/apps/api/src/controllers/v1/map.ts @@ -22,6 +22,7 @@ import { performCosineSimilarity } from "../../lib/map-cosine"; import { logger } from "../../lib/logger"; import Redis from "ioredis"; import { querySitemapIndex } from "../../scraper/WebScraper/sitemap-index"; +import { getIndexQueue } from "../../services/queue-service"; configDotenv(); const redis = new Redis(process.env.REDIS_URL!); @@ -228,6 +229,17 @@ export async function getMapResults({ // + await getIndexQueue().add( + id, + { + originUrl: url, + visitedUrls: linksToReturn, + }, + { + priority: 10, + } + ); + return { success: true, links: linksToReturn, diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 5b5208f3..c1dc4c04 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -4,7 +4,7 @@ import * as Sentry from "@sentry/node"; import express, { NextFunction, Request, Response } from "express"; import bodyParser from "body-parser"; import cors from "cors"; -import { getExtractQueue, getScrapeQueue } from "./services/queue-service"; +import { getExtractQueue, getScrapeQueue, getIndexQueue } from "./services/queue-service"; import { v0Router } from "./routes/v0"; import os from "os"; import { logger } from "./lib/logger"; @@ -48,6 +48,7 @@ const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({ queues: [ new BullAdapter(getScrapeQueue()), new BullAdapter(getExtractQueue()), + new BullAdapter(getIndexQueue()), ], serverAdapter: serverAdapter, }); diff --git a/apps/api/src/scraper/WebScraper/sitemap.ts b/apps/api/src/scraper/WebScraper/sitemap.ts index 679ac5b9..7ea04663 100644 --- a/apps/api/src/scraper/WebScraper/sitemap.ts +++ b/apps/api/src/scraper/WebScraper/sitemap.ts @@ -24,21 +24,40 @@ export async function getLinksFromSitemap( let content: string = ""; try { if (mode === "fire-engine" && useFireEngine) { - const response = await scrapeURL( - "sitemap", + // Try TLS client first + const tlsResponse = await scrapeURL( + "sitemap", sitemapUrl, scrapeOptions.parse({ formats: ["rawHtml"] }), { forceEngine: "fire-engine;tlsclient", v0DisableJsDom: true }, ); - if (!response.success) { - logger.debug( - "Failed to scrape sitemap via TLSClient, falling back to axios...", - { error: response.error }, - ); - const ar = await axios.get(sitemapUrl, { timeout: axiosTimeout }); - content = ar.data; + + if (tlsResponse.success) { + content = tlsResponse.document.rawHtml!; } else { - content = response.document.rawHtml!; + logger.debug( + "Failed to scrape sitemap via TLSClient, trying Chrome CDP...", + { error: tlsResponse.error }, + ); + + // Try Chrome CDP next + const cdpResponse = await scrapeURL( + "sitemap", + sitemapUrl, + scrapeOptions.parse({ formats: ["rawHtml"] }), + { forceEngine: "fire-engine;chrome-cdp" }, + ); + + if (cdpResponse.success) { + content = cdpResponse.document.rawHtml!; + } else { + logger.debug( + "Failed to scrape sitemap via Chrome CDP, falling back to axios...", + { error: cdpResponse.error }, + ); + const ar = await axios.get(sitemapUrl, { timeout: axiosTimeout }); + content = ar.data; + } } } else { const response = await axios.get(sitemapUrl, { timeout: axiosTimeout }); diff --git a/apps/api/src/services/indexing/crawl-maps-index.ts b/apps/api/src/services/indexing/crawl-maps-index.ts index d5a08547..4cccd763 100644 --- a/apps/api/src/services/indexing/crawl-maps-index.ts +++ b/apps/api/src/services/indexing/crawl-maps-index.ts @@ -4,59 +4,213 @@ import { normalizeUrlOnlyHostname, } from "../../lib/canonical-url"; import { supabase_service } from "../supabase"; +import { redisConnection } from "../queue-service"; + +const BATCH_KEY = "crawl_maps_batch"; +const BATCH_LOCK_KEY = "crawl_maps_batch_lock"; +const BATCH_SIZE = 10; +const BATCH_TIMEOUT = 10000; // 10 seconds +const LOCK_TIMEOUT = 30000; // 30 seconds + +interface CrawlMapOperation { + originUrl: string; + standardizedUrls: string[]; + timestamp: string; +} + +async function acquireLock(): Promise { + const redis = redisConnection; + // Set lock with NX (only if it doesn't exist) and PX (millisecond expiry) + const result = await redis.set(BATCH_LOCK_KEY, "1", "PX", LOCK_TIMEOUT, "NX"); + const acquired = result === "OK"; + if (acquired) { + logger.info("🔒 Acquired batch processing lock"); + } + return acquired; +} + +async function releaseLock() { + const redis = redisConnection; + await redis.del(BATCH_LOCK_KEY); + logger.info("🔓 Released batch processing lock"); +} + +async function processBatch() { + const redis = redisConnection; + + // Try to acquire lock + if (!(await acquireLock())) { + return; + } -export async function saveCrawlMap(originUrl: string, visitedUrls: string[]) { - originUrl = normalizeUrlOnlyHostname(originUrl); - // Fire and forget the upload to Supabase try { - // Standardize URLs to canonical form (https, no www) - const standardizedUrls = [ - ...new Set( - visitedUrls.map((url) => { - return normalizeUrl(url); - }), - ), - ]; - // First check if entry exists for this origin URL - const { data: existingMap } = await supabase_service + // Get all operations from Redis list + const operations: CrawlMapOperation[] = []; + while (operations.length < BATCH_SIZE) { + const op = await redis.lpop(BATCH_KEY); + if (!op) break; + operations.push(JSON.parse(op)); + } + + if (operations.length === 0) { + logger.info("No operations to process in batch"); + return; + } + + logger.info(`📦 Processing batch of ${operations.length} operations`, { + origins: operations.map((op) => op.originUrl), + }); + + // Get existing maps for all origins in batch + const origins = operations.map((op) => op.originUrl); + const { data: existingMaps } = await supabase_service .from("crawl_maps") - .select("urls") - .eq("origin_url", originUrl) - .single(); + .select("origin_url, urls") + .in("origin_url", origins); - if (existingMap) { - // Merge URLs, removing duplicates - const mergedUrls = [ - ...new Set([...existingMap.urls, ...standardizedUrls]), - ]; + const existingMapsByOrigin = new Map( + existingMaps?.map((map) => [map.origin_url, map.urls]) || [], + ); - const { error } = await supabase_service - .from("crawl_maps") - .update({ + // Prepare updates and inserts + interface CrawlMapRecord { + origin_url: string; + urls: string[]; + num_urls: number; + updated_at: string; + created_at?: string; + } + + const updates: CrawlMapRecord[] = []; + const inserts: CrawlMapRecord[] = []; + + for (const op of operations) { + const existingUrls = existingMapsByOrigin.get(op.originUrl); + + if (existingUrls) { + // Merge URLs for update + const mergedUrls = [ + ...new Set([...existingUrls, ...op.standardizedUrls]), + ]; + updates.push({ + origin_url: op.originUrl, urls: mergedUrls, num_urls: mergedUrls.length, - updated_at: new Date().toISOString(), - }) - .eq("origin_url", originUrl); - - if (error) { - logger.error("Failed to update crawl map", { error }); - } - } else { - // Insert new entry if none exists - const { error } = await supabase_service.from("crawl_maps").insert({ - origin_url: originUrl, - urls: standardizedUrls, - num_urls: standardizedUrls.length, - created_at: new Date().toISOString(), - updated_at: new Date().toISOString(), - }); - - if (error) { - logger.error("Failed to save crawl map", { error }); + updated_at: op.timestamp, + }); + } else { + // Prepare insert + inserts.push({ + origin_url: op.originUrl, + urls: op.standardizedUrls, + num_urls: op.standardizedUrls.length, + created_at: op.timestamp, + updated_at: op.timestamp, + }); } } + + // Execute batch operations + if (updates.length > 0) { + logger.info(`🔄 Updating ${updates.length} existing crawl maps`, { + origins: updates.map((u) => u.origin_url), + }); + const { error: updateError } = await supabase_service + .from("crawl_maps") + .upsert(updates); + + if (updateError) { + logger.error("Failed to batch update crawl maps", { + error: updateError, + }); + } + } + + if (inserts.length > 0) { + logger.info(`➕ Inserting ${inserts.length} new crawl maps`, { + origins: inserts.map((i) => i.origin_url), + }); + const { error: insertError } = await supabase_service + .from("crawl_maps") + .insert(inserts); + + if (insertError) { + logger.error("Failed to batch insert crawl maps", { + error: insertError, + }); + } + } + + logger.info("✅ Batch processing completed successfully"); } catch (error) { - logger.error("Error saving crawl map", { error }); + logger.error("Error processing crawl map batch", { error }); + } finally { + await releaseLock(); } } + +// Start periodic batch processing +let batchInterval: NodeJS.Timeout | null = null; + +function startBatchProcessing() { + if (batchInterval) return; + + logger.info("🔄 Starting periodic batch processing"); + batchInterval = setInterval(async () => { + const queueLength = await redisConnection.llen(BATCH_KEY); + logger.info(`Checking batch queue (${queueLength} items pending)`); + await processBatch(); + }, BATCH_TIMEOUT); + + // Unref to not keep process alive + batchInterval.unref(); +} + +export async function saveCrawlMap(originUrl: string, visitedUrls: string[]) { + logger.info("Queueing crawl map", { originUrl }); + originUrl = normalizeUrlOnlyHostname(originUrl); + + try { + // Standardize URLs to canonical form + const standardizedUrls = [ + ...new Set(visitedUrls.map((url) => normalizeUrl(url))), + ]; + + const operation: CrawlMapOperation = { + originUrl, + standardizedUrls, + timestamp: new Date().toISOString(), + }; + + // Add operation to Redis list + const redis = redisConnection; + await redis.rpush(BATCH_KEY, JSON.stringify(operation)); + const queueLength = await redis.llen(BATCH_KEY); + logger.info(`📥 Added operation to queue (${queueLength} total pending)`, { + originUrl, + }); + + // Start batch processing if not already started + startBatchProcessing(); + + // If we have enough items, trigger immediate processing + if (queueLength >= BATCH_SIZE) { + logger.info( + "🔄 Queue reached batch size, triggering immediate processing", + ); + await processBatch(); + } + } catch (error) { + logger.error("Error queueing crawl map", { error }); + } +} + +// Cleanup on exit +process.on("beforeExit", async () => { + if (batchInterval) { + clearInterval(batchInterval); + batchInterval = null; + logger.info("Stopped periodic batch processing"); + } + await processBatch(); +}); diff --git a/apps/api/src/services/indexing/index-worker.ts b/apps/api/src/services/indexing/index-worker.ts new file mode 100644 index 00000000..0d964477 --- /dev/null +++ b/apps/api/src/services/indexing/index-worker.ts @@ -0,0 +1,161 @@ +import "dotenv/config"; +import "../sentry"; +import * as Sentry from "@sentry/node"; +import { Job, Queue, Worker } from "bullmq"; +import { logger as _logger, logger } from "../../lib/logger"; +import { redisConnection, indexQueueName, getIndexQueue } from "../queue-service"; +import { saveCrawlMap } from "./crawl-maps-index"; +import systemMonitor from "../system-monitor"; +import { v4 as uuidv4 } from "uuid"; + +const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000; +const workerStalledCheckInterval = Number(process.env.WORKER_STALLED_CHECK_INTERVAL) || 30000; +const jobLockExtendInterval = Number(process.env.JOB_LOCK_EXTEND_INTERVAL) || 15000; +const jobLockExtensionTime = Number(process.env.JOB_LOCK_EXTENSION_TIME) || 60000; + +const cantAcceptConnectionInterval = Number(process.env.CANT_ACCEPT_CONNECTION_INTERVAL) || 2000; +const connectionMonitorInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10; +const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20; + +const runningJobs: Set = new Set(); + +const processJobInternal = async (token: string, job: Job) => { + if (!job.id) { + throw new Error("Job has no ID"); + } + + const logger = _logger.child({ + module: "index-worker", + method: "processJobInternal", + jobId: job.id, + }); + + const extendLockInterval = setInterval(async () => { + logger.info(`🔄 Worker extending lock on job ${job.id}`); + await job.extendLock(token, jobLockExtensionTime); + }, jobLockExtendInterval); + + let err = null; + try { + const { originUrl, visitedUrls } = job.data; + await saveCrawlMap(originUrl, visitedUrls); + await job.moveToCompleted({ success: true }, token, false); + } catch (error) { + logger.error("Error processing index job", { error }); + Sentry.captureException(error); + err = error; + await job.moveToFailed(error, token, false); + } finally { + clearInterval(extendLockInterval); + } + + return err; +}; + +let isShuttingDown = false; + +process.on("SIGINT", () => { + logger.info("Received SIGTERM. Shutting down gracefully..."); + isShuttingDown = true; +}); + +process.on("SIGTERM", () => { + logger.info("Received SIGTERM. Shutting down gracefully..."); + isShuttingDown = true; +}); + +let cantAcceptConnectionCount = 0; + +const workerFun = async (queue: Queue) => { + const logger = _logger.child({ module: "index-worker", method: "workerFun" }); + + const worker = new Worker(queue.name, null, { + connection: redisConnection, + lockDuration: workerLockDuration, + stalledInterval: workerStalledCheckInterval, + maxStalledCount: 10, + }); + + worker.startStalledCheckTimer(); + + const monitor = await systemMonitor; + + while (true) { + if (isShuttingDown) { + logger.info("No longer accepting new jobs. SIGINT"); + break; + } + + const token = uuidv4(); + const canAcceptConnection = await monitor.acceptConnection(); + + if (!canAcceptConnection) { + logger.info("Cant accept connection"); + cantAcceptConnectionCount++; + + if (cantAcceptConnectionCount >= 25) { + logger.error("WORKER STALLED", { + cpuUsage: await monitor.checkCpuUsage(), + memoryUsage: await monitor.checkMemoryUsage(), + }); + } + + await new Promise(resolve => setTimeout(resolve, cantAcceptConnectionInterval)); + continue; + } else { + cantAcceptConnectionCount = 0; + } + + const job = await worker.getNextJob(token); + if (job) { + if (job.id) { + runningJobs.add(job.id); + } + + if (job.data && job.data.sentry && Sentry.isInitialized()) { + Sentry.continueTrace( + { + sentryTrace: job.data.sentry.trace, + baggage: job.data.sentry.baggage, + }, + () => { + Sentry.startSpan( + { + name: "Index job", + attributes: { + job: job.id, + worker: process.env.FLY_MACHINE_ID ?? worker.id, + }, + }, + async () => { + await processJobInternal(token, job); + }, + ); + }, + ); + } else { + await processJobInternal(token, job); + } + + if (job.id) { + runningJobs.delete(job.id); + } + + await new Promise(resolve => setTimeout(resolve, gotJobInterval)); + } else { + await new Promise(resolve => setTimeout(resolve, connectionMonitorInterval)); + } + } + + logger.info("Worker loop ended. Waiting for running jobs to finish..."); + while (runningJobs.size > 0) { + await new Promise(resolve => setTimeout(resolve, 500)); + } + logger.info("All jobs finished. Worker exiting!"); + process.exit(0); +}; + +// Start the worker +(async () => { + await workerFun(getIndexQueue()); +})(); \ No newline at end of file diff --git a/apps/api/src/services/queue-service.ts b/apps/api/src/services/queue-service.ts index db222814..688b84a2 100644 --- a/apps/api/src/services/queue-service.ts +++ b/apps/api/src/services/queue-service.ts @@ -5,6 +5,7 @@ import IORedis from "ioredis"; let scrapeQueue: Queue; let extractQueue: Queue; let loggingQueue: Queue; +let indexQueue: Queue; export const redisConnection = new IORedis(process.env.REDIS_URL!, { maxRetriesPerRequest: null, @@ -13,6 +14,7 @@ export const redisConnection = new IORedis(process.env.REDIS_URL!, { export const scrapeQueueName = "{scrapeQueue}"; export const extractQueueName = "{extractQueue}"; export const loggingQueueName = "{loggingQueue}"; +export const indexQueueName = "{indexQueue}"; export function getScrapeQueue() { if (!scrapeQueue) { @@ -50,6 +52,24 @@ export function getExtractQueue() { return extractQueue; } +export function getIndexQueue() { + if (!indexQueue) { + indexQueue = new Queue(indexQueueName, { + connection: redisConnection, + defaultJobOptions: { + removeOnComplete: { + age: 90000, // 25 hours + }, + removeOnFail: { + age: 90000, // 25 hours + }, + }, + }); + logger.info("Index queue created"); + } + return indexQueue; +} + // === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE // import { QueueEvents } from 'bullmq'; // export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() }); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index ee24e115..9828bbeb 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -8,6 +8,7 @@ import { redisConnection, scrapeQueueName, extractQueueName, + getIndexQueue, } from "./queue-service"; import { startWebScraperPipeline } from "../main/runWebScraper"; import { callWebhook } from "./webhook"; @@ -103,9 +104,17 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) { job.data.crawlerOptions !== null && originUrl ) { - saveCrawlMap(originUrl, visitedUrls).catch((e) => { - _logger.error("Error saving crawl map", { error: e }); - }); + // Queue the indexing job instead of doing it directly + await getIndexQueue().add( + job.data.crawl_id, + { + originUrl, + visitedUrls, + }, + { + priority: 10, + } + ); } })(); From baa2f9476593bd31a90792bd9acd2a7273d14c7f Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 19 Jan 2025 13:15:20 -0300 Subject: [PATCH 2/2] Update crawl-maps-index.ts --- apps/api/src/services/indexing/crawl-maps-index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/api/src/services/indexing/crawl-maps-index.ts b/apps/api/src/services/indexing/crawl-maps-index.ts index 4cccd763..2687e814 100644 --- a/apps/api/src/services/indexing/crawl-maps-index.ts +++ b/apps/api/src/services/indexing/crawl-maps-index.ts @@ -8,8 +8,8 @@ import { redisConnection } from "../queue-service"; const BATCH_KEY = "crawl_maps_batch"; const BATCH_LOCK_KEY = "crawl_maps_batch_lock"; -const BATCH_SIZE = 10; -const BATCH_TIMEOUT = 10000; // 10 seconds +const BATCH_SIZE = 20; +const BATCH_TIMEOUT = 20000; // 10 seconds const LOCK_TIMEOUT = 30000; // 30 seconds interface CrawlMapOperation {