From 18a7462feaa7b0b7839d60d847c1fd9be2201382 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Mon, 2 Jun 2025 17:07:21 +0200 Subject: [PATCH] feat(index): batch insert --- .../v0/admin/index-queue-prometheus.ts | 12 +++++++ apps/api/src/routes/admin.ts | 6 ++++ .../src/scraper/scrapeURL/engines/index.ts | 3 +- .../scraper/scrapeURL/engines/index/index.ts | 13 +++----- apps/api/src/services/index.ts | 33 ++++++++++++++++++- .../api/src/services/indexing/index-worker.ts | 15 ++++++++- 6 files changed, 71 insertions(+), 11 deletions(-) create mode 100644 apps/api/src/controllers/v0/admin/index-queue-prometheus.ts diff --git a/apps/api/src/controllers/v0/admin/index-queue-prometheus.ts b/apps/api/src/controllers/v0/admin/index-queue-prometheus.ts new file mode 100644 index 00000000..d1476869 --- /dev/null +++ b/apps/api/src/controllers/v0/admin/index-queue-prometheus.ts @@ -0,0 +1,12 @@ +import type { Request, Response } from "express"; +import { getIndexInsertQueueLength } from "../../../services"; + +export async function indexQueuePrometheus(req: Request, res: Response) { + const queueLength = await getIndexInsertQueueLength(); + res.setHeader("Content-Type", "text/plain"); + res.send(`\ +# HELP firecrawl_index_queue_length The number of items in the index insert queue +# TYPE firecrawl_index_queue_length gauge +firecrawl_index_queue_length ${queueLength} +`); +} \ No newline at end of file diff --git a/apps/api/src/routes/admin.ts b/apps/api/src/routes/admin.ts index 27545486..ea1fcf0f 100644 --- a/apps/api/src/routes/admin.ts +++ b/apps/api/src/routes/admin.ts @@ -10,6 +10,7 @@ import { wrap } from "./v1"; import { acucCacheClearController } from "../controllers/v0/admin/acuc-cache-clear"; import { checkFireEngine } from "../controllers/v0/admin/check-fire-engine"; import { cclogController } from "../controllers/v0/admin/cclog"; +import { indexQueuePrometheus } from "../controllers/v0/admin/index-queue-prometheus"; export const adminRouter = express.Router(); @@ -49,3 +50,8 @@ adminRouter.get( `/admin/${process.env.BULL_AUTH_KEY}/cclog`, wrap(cclogController), ); + +adminRouter.get( + `/admin/${process.env.BULL_AUTH_KEY}/index-queue-prometheus`, + wrap(indexQueuePrometheus), +); diff --git a/apps/api/src/scraper/scrapeURL/engines/index.ts b/apps/api/src/scraper/scrapeURL/engines/index.ts index a935f6ca..711f6bee 100644 --- a/apps/api/src/scraper/scrapeURL/engines/index.ts +++ b/apps/api/src/scraper/scrapeURL/engines/index.ts @@ -433,7 +433,8 @@ export function buildFallbackList(meta: Meta): { } const shouldUseIndex = - !meta.options.formats.includes("changeTracking") + useIndex + && !meta.options.formats.includes("changeTracking") && meta.options.maxAge !== 0 && ( meta.options.headers === undefined diff --git a/apps/api/src/scraper/scrapeURL/engines/index/index.ts b/apps/api/src/scraper/scrapeURL/engines/index/index.ts index 940bf135..79093a68 100644 --- a/apps/api/src/scraper/scrapeURL/engines/index/index.ts +++ b/apps/api/src/scraper/scrapeURL/engines/index/index.ts @@ -1,7 +1,7 @@ import { Document } from "../../../../controllers/v1/types"; import { EngineScrapeResult } from ".."; import { Meta } from "../.."; -import { getIndexFromGCS, hashURL, index_supabase_service, normalizeURLForIndex, saveIndexToGCS, generateURLSplits } from "../../../../services"; +import { getIndexFromGCS, hashURL, index_supabase_service, normalizeURLForIndex, saveIndexToGCS, generateURLSplits, addIndexInsertJob } from "../../../../services"; import { EngineError, IndexMissError } from "../../error"; import crypto from "crypto"; @@ -51,9 +51,8 @@ export async function sendDocumentToIndex(meta: Meta, document: Document) { return document; } - const { error } = await index_supabase_service - .from("index") - .insert({ + try { + await addIndexInsertJob({ id: indexId, url: normalizedURL, url_hash: urlHash, @@ -73,12 +72,10 @@ export async function sendDocumentToIndex(meta: Meta, document: Document) { [`url_split_${i}_hash`]: x, }), {})), }); - - if (error) { - meta.logger.error("Failed to save document to index", { + } catch (error) { + meta.logger.error("Failed to add document to index insert queue", { error, }); - return document; } return document; diff --git a/apps/api/src/services/index.ts b/apps/api/src/services/index.ts index 9b01a5f1..85b686d0 100644 --- a/apps/api/src/services/index.ts +++ b/apps/api/src/services/index.ts @@ -3,6 +3,7 @@ import { logger } from "../lib/logger"; import { configDotenv } from "dotenv"; import { Storage } from "@google-cloud/storage"; import crypto from "crypto"; +import { redisEvictConnection } from "./redis"; configDotenv(); // SupabaseService class initializes the Supabase client conditionally based on environment variables. @@ -182,4 +183,34 @@ export function generateURLSplits(url: string): string[] { urls.push(url); return [...new Set(urls.map(x => normalizeURLForIndex(x)))]; -} \ No newline at end of file +} + +const INDEX_INSERT_QUEUE_KEY = "index-insert-queue"; +const INDEX_INSERT_BATCH_SIZE = 1000; + +export async function addIndexInsertJob(data: any) { + await redisEvictConnection.rpush(INDEX_INSERT_QUEUE_KEY, JSON.stringify(data)); +} + +export async function getIndexInsertJobs(): Promise { + const jobs = (await redisEvictConnection.lpop(INDEX_INSERT_QUEUE_KEY, INDEX_INSERT_BATCH_SIZE)) ?? []; + return jobs.map(x => JSON.parse(x)); +} + +export async function processIndexInsertJobs() { + const jobs = await getIndexInsertJobs(); + if (jobs.length === 0) { + return; + } + logger.info(`Index inserter found jobs to insert`, { jobCount: jobs.length }); + try { + await index_supabase_service.from("index").insert(jobs); + logger.info(`Index inserter inserted jobs`, { jobCount: jobs.length }); + } catch (error) { + logger.error(`Index inserter failed to insert jobs`, { error, jobCount: jobs.length }); + } +} + +export async function getIndexInsertQueueLength(): Promise { + return await redisEvictConnection.llen(INDEX_INSERT_QUEUE_KEY) ?? 0; +} diff --git a/apps/api/src/services/indexing/index-worker.ts b/apps/api/src/services/indexing/index-worker.ts index 30285978..677d33fa 100644 --- a/apps/api/src/services/indexing/index-worker.ts +++ b/apps/api/src/services/indexing/index-worker.ts @@ -14,6 +14,7 @@ import { saveCrawlMap } from "./crawl-maps-index"; import { processBillingBatch, queueBillingOperation, startBillingBatchProcessing } from "../billing/batch_billing"; import systemMonitor from "../system-monitor"; import { v4 as uuidv4 } from "uuid"; +import { processIndexInsertJobs } from ".."; const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000; const workerStalledCheckInterval = @@ -226,6 +227,8 @@ const workerFun = async (queue: Queue, jobProcessor: (token: string, job: Job) = process.exit(0); }; +const INDEX_INSERT_INTERVAL = 15000; + // Start the workers (async () => { // Start index worker @@ -234,7 +237,17 @@ const workerFun = async (queue: Queue, jobProcessor: (token: string, job: Job) = // Start billing worker and batch processing startBillingBatchProcessing(); const billingWorkerPromise = workerFun(getBillingQueue(), processBillingJobInternal); - + + const indexInserterInterval = setInterval(async () => { + if (isShuttingDown) { + return; + } + + await processIndexInsertJobs(); + }, INDEX_INSERT_INTERVAL); + // Wait for both workers to complete (which should only happen on shutdown) await Promise.all([indexWorkerPromise, billingWorkerPromise]); + + clearInterval(indexInserterInterval); })();