feat(index): batch insert

This commit is contained in:
Gergő Móricz 2025-06-02 17:07:21 +02:00
parent 369a8f6050
commit 18a7462fea
6 changed files with 71 additions and 11 deletions

View File

@ -0,0 +1,12 @@
import type { Request, Response } from "express";
import { getIndexInsertQueueLength } from "../../../services";
export async function indexQueuePrometheus(req: Request, res: Response) {
const queueLength = await getIndexInsertQueueLength();
res.setHeader("Content-Type", "text/plain");
res.send(`\
# HELP firecrawl_index_queue_length The number of items in the index insert queue
# TYPE firecrawl_index_queue_length gauge
firecrawl_index_queue_length ${queueLength}
`);
}

View File

@ -10,6 +10,7 @@ import { wrap } from "./v1";
import { acucCacheClearController } from "../controllers/v0/admin/acuc-cache-clear";
import { checkFireEngine } from "../controllers/v0/admin/check-fire-engine";
import { cclogController } from "../controllers/v0/admin/cclog";
import { indexQueuePrometheus } from "../controllers/v0/admin/index-queue-prometheus";
export const adminRouter = express.Router();
@ -49,3 +50,8 @@ adminRouter.get(
`/admin/${process.env.BULL_AUTH_KEY}/cclog`,
wrap(cclogController),
);
adminRouter.get(
`/admin/${process.env.BULL_AUTH_KEY}/index-queue-prometheus`,
wrap(indexQueuePrometheus),
);

View File

@ -433,7 +433,8 @@ export function buildFallbackList(meta: Meta): {
}
const shouldUseIndex =
!meta.options.formats.includes("changeTracking")
useIndex
&& !meta.options.formats.includes("changeTracking")
&& meta.options.maxAge !== 0
&& (
meta.options.headers === undefined

View File

@ -1,7 +1,7 @@
import { Document } from "../../../../controllers/v1/types";
import { EngineScrapeResult } from "..";
import { Meta } from "../..";
import { getIndexFromGCS, hashURL, index_supabase_service, normalizeURLForIndex, saveIndexToGCS, generateURLSplits } from "../../../../services";
import { getIndexFromGCS, hashURL, index_supabase_service, normalizeURLForIndex, saveIndexToGCS, generateURLSplits, addIndexInsertJob } from "../../../../services";
import { EngineError, IndexMissError } from "../../error";
import crypto from "crypto";
@ -51,9 +51,8 @@ export async function sendDocumentToIndex(meta: Meta, document: Document) {
return document;
}
const { error } = await index_supabase_service
.from("index")
.insert({
try {
await addIndexInsertJob({
id: indexId,
url: normalizedURL,
url_hash: urlHash,
@ -73,12 +72,10 @@ export async function sendDocumentToIndex(meta: Meta, document: Document) {
[`url_split_${i}_hash`]: x,
}), {})),
});
if (error) {
meta.logger.error("Failed to save document to index", {
} catch (error) {
meta.logger.error("Failed to add document to index insert queue", {
error,
});
return document;
}
return document;

View File

@ -3,6 +3,7 @@ import { logger } from "../lib/logger";
import { configDotenv } from "dotenv";
import { Storage } from "@google-cloud/storage";
import crypto from "crypto";
import { redisEvictConnection } from "./redis";
configDotenv();
// SupabaseService class initializes the Supabase client conditionally based on environment variables.
@ -182,4 +183,34 @@ export function generateURLSplits(url: string): string[] {
urls.push(url);
return [...new Set(urls.map(x => normalizeURLForIndex(x)))];
}
}
const INDEX_INSERT_QUEUE_KEY = "index-insert-queue";
const INDEX_INSERT_BATCH_SIZE = 1000;
export async function addIndexInsertJob(data: any) {
await redisEvictConnection.rpush(INDEX_INSERT_QUEUE_KEY, JSON.stringify(data));
}
export async function getIndexInsertJobs(): Promise<any[]> {
const jobs = (await redisEvictConnection.lpop(INDEX_INSERT_QUEUE_KEY, INDEX_INSERT_BATCH_SIZE)) ?? [];
return jobs.map(x => JSON.parse(x));
}
export async function processIndexInsertJobs() {
const jobs = await getIndexInsertJobs();
if (jobs.length === 0) {
return;
}
logger.info(`Index inserter found jobs to insert`, { jobCount: jobs.length });
try {
await index_supabase_service.from("index").insert(jobs);
logger.info(`Index inserter inserted jobs`, { jobCount: jobs.length });
} catch (error) {
logger.error(`Index inserter failed to insert jobs`, { error, jobCount: jobs.length });
}
}
export async function getIndexInsertQueueLength(): Promise<number> {
return await redisEvictConnection.llen(INDEX_INSERT_QUEUE_KEY) ?? 0;
}

View File

@ -14,6 +14,7 @@ import { saveCrawlMap } from "./crawl-maps-index";
import { processBillingBatch, queueBillingOperation, startBillingBatchProcessing } from "../billing/batch_billing";
import systemMonitor from "../system-monitor";
import { v4 as uuidv4 } from "uuid";
import { processIndexInsertJobs } from "..";
const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000;
const workerStalledCheckInterval =
@ -226,6 +227,8 @@ const workerFun = async (queue: Queue, jobProcessor: (token: string, job: Job) =
process.exit(0);
};
const INDEX_INSERT_INTERVAL = 15000;
// Start the workers
(async () => {
// Start index worker
@ -234,7 +237,17 @@ const workerFun = async (queue: Queue, jobProcessor: (token: string, job: Job) =
// Start billing worker and batch processing
startBillingBatchProcessing();
const billingWorkerPromise = workerFun(getBillingQueue(), processBillingJobInternal);
const indexInserterInterval = setInterval(async () => {
if (isShuttingDown) {
return;
}
await processIndexInsertJobs();
}, INDEX_INSERT_INTERVAL);
// Wait for both workers to complete (which should only happen on shutdown)
await Promise.all([indexWorkerPromise, billingWorkerPromise]);
clearInterval(indexInserterInterval);
})();