From 98ceda9bd534350f1b441135c956499c2234fe03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Mon, 2 Jun 2025 22:07:44 +0200 Subject: [PATCH 1/2] feat(search): ignore concurrency limit for search (FIR-2187) (#1617) * feat(search): ignore concurrency limit for search (temp) * feat(search): only for low tier users for good DX --- apps/api/src/controllers/v1/search.ts | 5 +++-- apps/api/src/services/queue-jobs.ts | 5 ++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/apps/api/src/controllers/v1/search.ts b/apps/api/src/controllers/v1/search.ts index 8a34c438..fe104dc1 100644 --- a/apps/api/src/controllers/v1/search.ts +++ b/apps/api/src/controllers/v1/search.ts @@ -76,6 +76,7 @@ async function scrapeSearchResult( logger: Logger, costTracking: CostTracking, flags: TeamFlags, + directToBullMQ: boolean = false, ): Promise { const jobId = uuidv4(); const jobPriority = await getJobPriority({ @@ -102,11 +103,11 @@ async function scrapeSearchResult( internalOptions: { teamId: options.teamId, useCache: true }, origin: options.origin, is_scrape: true, - }, {}, jobId, jobPriority, + directToBullMQ, ); const doc: Document = await waitForJob(jobId, options.timeout); @@ -229,7 +230,7 @@ export async function searchController( origin: req.body.origin, timeout: req.body.timeout, scrapeOptions: req.body.scrapeOptions, - }, logger, costTracking, req.acuc?.flags ?? null), + }, logger, costTracking, req.acuc?.flags ?? null, (req.acuc?.price_credits ?? 0) <= 3000), ); const docs = await Promise.all(scrapePromises); diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 3456c82c..3ea7de22 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -97,6 +97,7 @@ async function addScrapeJobRaw( options: any, jobId: string, jobPriority: number, + directToBullMQ: boolean = false, ) { const hasCrawlDelay = webScraperOptions.crawl_id && webScraperOptions.crawlerOptions?.delay; @@ -127,7 +128,7 @@ async function addScrapeJobRaw( const concurrencyQueueJobs = await getConcurrencyQueueJobsCount(webScraperOptions.team_id); - if (concurrencyLimited) { + if (concurrencyLimited && !directToBullMQ) { // Detect if they hit their concurrent limit // If above by 2x, send them an email // No need to 2x as if there are more than the max concurrency in the concurrency queue, it is already 2x @@ -161,6 +162,7 @@ export async function addScrapeJob( options: any = {}, jobId: string = uuidv4(), jobPriority: number = 10, + directToBullMQ: boolean = false, ) { if (Sentry.isInitialized()) { const size = JSON.stringify(webScraperOptions).length; @@ -187,6 +189,7 @@ export async function addScrapeJob( options, jobId, jobPriority, + directToBullMQ, ); }, ); From 7a8be132204b8f9042a0fae0eafe302a181022c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Mon, 2 Jun 2025 22:09:50 +0200 Subject: [PATCH 2/2] remove indexes that are no longer used --- apps/api/package.json | 1 - apps/api/pnpm-lock.yaml | 12 +- apps/api/src/lib/extract/index/pinecone.ts | 164 --------------------- apps/api/src/lib/extract/reranker.ts | 1 - apps/api/src/main/runWebScraper.ts | 1 - apps/api/src/services/logging/log_job.ts | 46 ------ apps/api/src/services/queue-worker.ts | 24 --- 7 files changed, 1 insertion(+), 248 deletions(-) delete mode 100644 apps/api/src/lib/extract/index/pinecone.ts diff --git a/apps/api/package.json b/apps/api/package.json index 2461df18..9866c3bc 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -69,7 +69,6 @@ "@google-cloud/storage": "^7.16.0", "@nangohq/node": "^0.40.8", "@openrouter/ai-sdk-provider": "^0.4.5", - "@pinecone-database/pinecone": "^4.0.0", "@sentry/cli": "^2.33.1", "@sentry/node": "^8.26.0", "@sentry/profiling-node": "^8.26.0", diff --git a/apps/api/pnpm-lock.yaml b/apps/api/pnpm-lock.yaml index 333cd85c..d394f007 100644 --- a/apps/api/pnpm-lock.yaml +++ b/apps/api/pnpm-lock.yaml @@ -59,9 +59,6 @@ importers: '@openrouter/ai-sdk-provider': specifier: ^0.4.5 version: 0.4.5(zod@3.24.2) - '@pinecone-database/pinecone': - specifier: ^4.0.0 - version: 4.0.0 '@sentry/cli': specifier: ^2.33.1 version: 2.33.1(encoding@0.1.13) @@ -1232,10 +1229,6 @@ packages: '@pdf-lib/upng@1.0.1': resolution: {integrity: sha512-dQK2FUMQtowVP00mtIksrlZhdFXQZPC+taih1q4CvPZ5vqdxR/LKBaFg0oAfzd1GlHZXXSPdQfzQnt+ViGvEIQ==} - '@pinecone-database/pinecone@4.0.0': - resolution: {integrity: sha512-INYS+GBys9v5BRTyn0tv8srVsPTlSRvE3BPE4Wkc/lOEyAIyB9F7DEMXbeF19FOLEgRwCuHTLjzm1niENl+4FA==} - engines: {node: '>=18.0.0'} - '@pkgjs/parseargs@0.11.0': resolution: {integrity: sha512-+1VkjdD0QBLPodGrJUeqarH8VAIvQODIbwh9XpP5Syisf7YoQgsJKPNFoqqLQlu+VQ/tVSshMR6loPMn8U+dPg==} engines: {node: '>=14'} @@ -6331,10 +6324,6 @@ snapshots: dependencies: pako: 1.0.11 - '@pinecone-database/pinecone@4.0.0': - dependencies: - encoding: 0.1.13 - '@pkgjs/parseargs@0.11.0': optional: true @@ -7921,6 +7910,7 @@ snapshots: encoding@0.1.13: dependencies: iconv-lite: 0.6.3 + optional: true end-of-stream@1.4.4: dependencies: diff --git a/apps/api/src/lib/extract/index/pinecone.ts b/apps/api/src/lib/extract/index/pinecone.ts deleted file mode 100644 index 44aed960..00000000 --- a/apps/api/src/lib/extract/index/pinecone.ts +++ /dev/null @@ -1,164 +0,0 @@ -import { Pinecone } from "@pinecone-database/pinecone"; -import { Document } from "../../../controllers/v1/types"; -import { logger } from "../../logger"; -import { embed } from "ai"; -import { getEmbeddingModel } from "../../generic-ai"; - -const pinecone = new Pinecone({ - apiKey: process.env.PINECONE_API_KEY!, -}); - -const INDEX_NAME = process.env.PINECONE_INDEX_NAME ?? ""; - -const MAX_METADATA_SIZE = 30 * 1024; // 30KB in bytes - -export interface PageMetadata { - url: string; - originUrl: string; - title?: string; - description?: string; - crawlId?: string; - teamId?: string; - timestamp: number; - markdown?: string; -} - -async function getEmbedding(text: string) { - const { embedding } = await embed({ - model: getEmbeddingModel("text-embedding-3-small"), - value: text, - }); - - return embedding; -} - -function normalizeUrl(url: string) { - const urlO = new URL(url); - if (!urlO.hostname.startsWith("www.")) { - urlO.hostname = "www." + urlO.hostname; - } - return urlO.href; -} - -export async function indexPage({ - document, - originUrl, - crawlId, - teamId, -}: { - document: Document; - originUrl: string; - crawlId?: string; - teamId?: string; -}) { - try { - const index = pinecone.index(INDEX_NAME); - - // Trim markdown if it's too long - let trimmedMarkdown = document.markdown; - if ( - trimmedMarkdown && - Buffer.byteLength(trimmedMarkdown, "utf-8") > MAX_METADATA_SIZE - ) { - trimmedMarkdown = trimmedMarkdown.slice( - 0, - Math.floor(MAX_METADATA_SIZE / 2), - ); // Using half the size to be safe with UTF-8 encoding - } - - // Create text to embed - const textToEmbed = [ - document.metadata.title, - document.metadata.description, - trimmedMarkdown, - ] - .filter(Boolean) - .join("\n\n"); - - // Get embedding from OpenAI - const embedding = await getEmbedding(textToEmbed); - - const normalizedUrl = normalizeUrl( - document.metadata.sourceURL || document.metadata.url!, - ); - - // Prepare metadata - const metadata: PageMetadata = { - url: normalizedUrl, - originUrl: normalizeUrl(originUrl), - title: document.metadata.title ?? document.metadata.ogTitle ?? "", - description: - document.metadata.description ?? document.metadata.ogDescription ?? "", - crawlId, - teamId, - markdown: trimmedMarkdown, - timestamp: Date.now(), - }; - - // Upsert to Pinecone - await index.upsert([ - { - id: normalizedUrl, - values: embedding, - metadata: { - ...metadata, - [document.metadata.sourceURL || document.metadata.url!]: true, - }, - }, - ]); - - logger.debug("Successfully indexed page in Pinecone", { - url: metadata.url, - crawlId, - }); - } catch (error) { - logger.error("Failed to index page in Pinecone", { - error, - url: document.metadata.sourceURL || document.metadata.url, - crawlId, - }); - } -} - -export async function searchSimilarPages( - query: string, - originUrl?: string, - limit: number = 1000, -): Promise { - try { - const index = pinecone.index(INDEX_NAME); - - // Get query embedding from OpenAI - const queryEmbedding = await getEmbedding(query); - - const queryParams: any = { - vector: queryEmbedding, - topK: limit, - includeMetadata: true, - }; - - const normalizedOriginUrl = originUrl ? normalizeUrl(originUrl) : undefined; - // Add filter if originUrl is provided - if (normalizedOriginUrl) { - queryParams.filter = { - originUrl: { $eq: normalizedOriginUrl }, - }; - } - - const results = await index.query(queryParams); - return results.matches.map((match) => ({ - url: match.metadata?.url, - title: match.metadata?.title, - description: match.metadata?.description, - score: match.score, - markdown: match.metadata?.markdown, - })); - } catch (error) { - logger.error("Failed to search similar pages in Pinecone", { - error, - query, - originUrl, - }); - return []; - } -} diff --git a/apps/api/src/lib/extract/reranker.ts b/apps/api/src/lib/extract/reranker.ts index 62b1d266..6ff6e77c 100644 --- a/apps/api/src/lib/extract/reranker.ts +++ b/apps/api/src/lib/extract/reranker.ts @@ -4,7 +4,6 @@ import { isUrlBlocked } from "../../scraper/WebScraper/utils/blocklist"; import { logger } from "../logger"; import { CohereClient } from "cohere-ai"; import { extractConfig } from "./config"; -import { searchSimilarPages } from "./index/pinecone"; import { generateCompletions } from "../../scraper/scrapeURL/transformers/llmExtract"; import { buildRerankerUserPrompt } from "./build-prompts"; import { buildRerankerSystemPrompt } from "./build-prompts"; diff --git a/apps/api/src/main/runWebScraper.ts b/apps/api/src/main/runWebScraper.ts index 2f9b6495..cddba97b 100644 --- a/apps/api/src/main/runWebScraper.ts +++ b/apps/api/src/main/runWebScraper.ts @@ -15,7 +15,6 @@ import { ScrapeUrlResponse, } from "../scraper/scrapeURL"; import { Engine } from "../scraper/scrapeURL/engines"; -import { indexPage } from "../lib/extract/index/pinecone"; import { CostTracking } from "../lib/extract/extraction-service"; configDotenv(); diff --git a/apps/api/src/services/logging/log_job.ts b/apps/api/src/services/logging/log_job.ts index e73afeb5..162ddb43 100644 --- a/apps/api/src/services/logging/log_job.ts +++ b/apps/api/src/services/logging/log_job.ts @@ -21,47 +21,6 @@ function cleanOfNull(x: T): T { } } -async function indexJob(job: FirecrawlJob): Promise { - try { - if (job.mode !== "single_urls" && job.mode !== "scrape") { - return; - } - - const response = await fetch(`${process.env.FIRE_INDEX_SERVER_URL}/api/jobs`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - url: job.url, - mode: job.mode || "scrape", - docs: job.docs, - origin: job.origin, - success: job.success, - time_taken: job.time_taken, - num_tokens: job.num_tokens, - page_options: job.scrapeOptions, - date_added: new Date().toISOString(), - }), - }); - - if (!response.ok) { - const errorData = await response.json(); - // logger.error(`Failed to send job to external server: ${response.status} ${response.statusText}`, { - // error: errorData, - // scrapeId: job.job_id, - // }); - } else { - // logger.debug("Job sent to external server successfully!", { scrapeId: job.job_id }); - } - } catch (error) { - // logger.error(`Error sending job to external server: ${error.message}`, { - // error, - // scrapeId: job.job_id, - // }); - } -} - export async function logJob(job: FirecrawlJob, force: boolean = false) { try { const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true"; @@ -106,11 +65,6 @@ export async function logJob(job: FirecrawlJob, force: boolean = false) { pdf_num_pages: job.pdf_num_pages ?? null, }; - // Send job to external server - if (process.env.FIRE_INDEX_SERVER_URL) { - indexJob(job); - } - if (process.env.GCS_BUCKET_NAME) { await saveJobToGCS(job); } diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 9624cfda..c007e18b 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -61,7 +61,6 @@ import { } from "../lib/concurrency-limit"; import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings"; -import { indexPage } from "../lib/extract/index/pinecone"; import { Document } from "../controllers/v1/types"; import { ExtractResult, @@ -1045,25 +1044,6 @@ async function processKickoffJob(job: Job & { id: string }, token: string) { } } -async function indexJob(job: Job & { id: string }, document: Document) { - if ( - document && - document.markdown && - job.data.team_id === process.env.BACKGROUND_INDEX_TEAM_ID! - ) { - // indexPage({ - // document: document, - // originUrl: job.data.crawl_id - // ? (await getCrawl(job.data.crawl_id))?.originUrl! - // : document.metadata.sourceURL!, - // crawlId: job.data.crawl_id, - // teamId: job.data.team_id, - // }).catch((error) => { - // _logger.error("Error indexing page", { error }); - // }); - } -} - async function processJob(job: Job & { id: string }, token: string) { const logger = _logger.child({ module: "queue-worker", @@ -1262,8 +1242,6 @@ async function processJob(job: Job & { id: string }, token: string) { ); } - indexJob(job, doc); - logger.debug("Declaring job as done..."); await addCrawlJobDone(job.data.crawl_id, job.id, true); @@ -1380,8 +1358,6 @@ async function processJob(job: Job & { id: string }, token: string) { cost_tracking: costTracking, pdf_num_pages: doc.metadata.numPages, }); - - indexJob(job, doc); } if (job.data.is_scrape !== true) {