diff --git a/apps/api/src/scraper/scrapeURL/engines/index/index.ts b/apps/api/src/scraper/scrapeURL/engines/index/index.ts new file mode 100644 index 00000000..d3a4a779 --- /dev/null +++ b/apps/api/src/scraper/scrapeURL/engines/index/index.ts @@ -0,0 +1,40 @@ +import { EngineScrapeResult } from ".."; +import { Meta } from "../.."; +import { getIndexFromGCS, index_supabase_service } from "../../../../services"; +import { EngineError, IndexMissError } from "../../error"; + +export async function scrapeURLWithIndex(meta: Meta): Promise { + const { data, error } = await index_supabase_service + .from("pages") + .select("*") + .eq("url", meta.url) + .order("created_at", { ascending: false }) + .limit(1); + + if (error) { + throw new EngineError("Failed to scrape URL with index", { + cause: error, + }); + } + + if (data.length === 0) { + throw new IndexMissError(); + } + + const id = data[0].id; + + const doc = await getIndexFromGCS(id + ".json"); + if (!doc) { + throw new EngineError("No document found in index"); + } + + return { + url: doc.url, + html: doc.html, + statusCode: doc.statusCode, + error: doc.error, + screenshot: doc.screenshot, + markdown: doc.markdown, + numPages: doc.numPages, + }; +} diff --git a/apps/api/src/scraper/scrapeURL/error.ts b/apps/api/src/scraper/scrapeURL/error.ts index 33f59c1d..16a9212f 100644 --- a/apps/api/src/scraper/scrapeURL/error.ts +++ b/apps/api/src/scraper/scrapeURL/error.ts @@ -92,3 +92,9 @@ export class PDFInsufficientTimeError extends Error { super(`Insufficient time to process PDF of ${pageCount} pages. Please increase the timeout parameter in your scrape request to at least ${minTimeout}ms.`); } } + +export class IndexMissError extends Error { + constructor() { + super("Index doesn't have the page we're looking for"); + } +} diff --git a/apps/api/src/services/index.ts b/apps/api/src/services/index.ts new file mode 100644 index 00000000..2f0efd32 --- /dev/null +++ b/apps/api/src/services/index.ts @@ -0,0 +1,90 @@ +import { createClient, SupabaseClient } from "@supabase/supabase-js"; +import { logger } from "../lib/logger"; +import { configDotenv } from "dotenv"; +import { Storage } from "@google-cloud/storage"; +configDotenv(); + +// SupabaseService class initializes the Supabase client conditionally based on environment variables. +class IndexSupabaseService { + private client: SupabaseClient | null = null; + + constructor() { + const supabaseUrl = process.env.INDEX_SUPABASE_URL; + const supabaseServiceToken = process.env.INDEX_SUPABASE_SERVICE_TOKEN; + const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true"; + // Only initialize the Supabase client if both URL and Service Token are provided. + if (!useDbAuthentication) { + // Warn the user that Authentication is disabled by setting the client to null + logger.warn( + "Authentication is disabled. Index supabase client will not be initialized.", + ); + this.client = null; + } else if (!supabaseUrl || !supabaseServiceToken) { + logger.error( + "Index supabase environment variables aren't configured correctly. Index supabase client will not be initialized. Fix ENV configuration or disable DB authentication with USE_DB_AUTHENTICATION env variable", + ); + } else { + this.client = createClient(supabaseUrl, supabaseServiceToken); + } + } + + // Provides access to the initialized Supabase client, if available. + getClient(): SupabaseClient | null { + return this.client; + } +} + +const serv = new IndexSupabaseService(); + +// Using a Proxy to handle dynamic access to the Supabase client or service methods. +// This approach ensures that if Supabase is not configured, any attempt to use it will result in a clear error. +export const index_supabase_service: SupabaseClient = new Proxy( + serv, + { + get: function (target, prop, receiver) { + const client = target.getClient(); + // If the Supabase client is not initialized, intercept property access to provide meaningful error feedback. + if (client === null) { + return () => { + throw new Error("Index supabase client is not configured."); + }; + } + // Direct access to SupabaseService properties takes precedence. + if (prop in target) { + return Reflect.get(target, prop, receiver); + } + // Otherwise, delegate access to the Supabase client. + return Reflect.get(client, prop, receiver); + }, + }, +) as unknown as SupabaseClient; + +const credentials = process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined; + +export async function getIndexFromGCS(url: string): Promise { + // logger.info(`Getting f-engine document from GCS`, { + // url, + // }); + try { + if (!process.env.GCS_INDEX_BUCKET_NAME) { + return null; + } + + const storage = new Storage({ credentials }); + const bucket = storage.bucket(process.env.GCS_INDEX_BUCKET_NAME); + const blob = bucket.file(`${url}`); + const [exists] = await blob.exists(); + if (!exists) { + return null; + } + const [blobContent] = await blob.download(); + const parsed = JSON.parse(blobContent.toString()); + return parsed; + } catch (error) { + logger.error(`Error getting f-engine document from GCS`, { + error, + url, + }); + return null; + } +} \ No newline at end of file diff --git a/apps/api/src/services/logging/log_job.ts b/apps/api/src/services/logging/log_job.ts index e73afeb5..162ddb43 100644 --- a/apps/api/src/services/logging/log_job.ts +++ b/apps/api/src/services/logging/log_job.ts @@ -21,47 +21,6 @@ function cleanOfNull(x: T): T { } } -async function indexJob(job: FirecrawlJob): Promise { - try { - if (job.mode !== "single_urls" && job.mode !== "scrape") { - return; - } - - const response = await fetch(`${process.env.FIRE_INDEX_SERVER_URL}/api/jobs`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - url: job.url, - mode: job.mode || "scrape", - docs: job.docs, - origin: job.origin, - success: job.success, - time_taken: job.time_taken, - num_tokens: job.num_tokens, - page_options: job.scrapeOptions, - date_added: new Date().toISOString(), - }), - }); - - if (!response.ok) { - const errorData = await response.json(); - // logger.error(`Failed to send job to external server: ${response.status} ${response.statusText}`, { - // error: errorData, - // scrapeId: job.job_id, - // }); - } else { - // logger.debug("Job sent to external server successfully!", { scrapeId: job.job_id }); - } - } catch (error) { - // logger.error(`Error sending job to external server: ${error.message}`, { - // error, - // scrapeId: job.job_id, - // }); - } -} - export async function logJob(job: FirecrawlJob, force: boolean = false) { try { const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true"; @@ -106,11 +65,6 @@ export async function logJob(job: FirecrawlJob, force: boolean = false) { pdf_num_pages: job.pdf_num_pages ?? null, }; - // Send job to external server - if (process.env.FIRE_INDEX_SERVER_URL) { - indexJob(job); - } - if (process.env.GCS_BUCKET_NAME) { await saveJobToGCS(job); } diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 2738b8ee..d113a3bf 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -1046,25 +1046,6 @@ async function processKickoffJob(job: Job & { id: string }, token: string) { } } -async function indexJob(job: Job & { id: string }, document: Document) { - if ( - document && - document.markdown && - job.data.team_id === process.env.BACKGROUND_INDEX_TEAM_ID! - ) { - // indexPage({ - // document: document, - // originUrl: job.data.crawl_id - // ? (await getCrawl(job.data.crawl_id))?.originUrl! - // : document.metadata.sourceURL!, - // crawlId: job.data.crawl_id, - // teamId: job.data.team_id, - // }).catch((error) => { - // _logger.error("Error indexing page", { error }); - // }); - } -} - async function processJob(job: Job & { id: string }, token: string) { const logger = _logger.child({ module: "queue-worker", @@ -1263,8 +1244,6 @@ async function processJob(job: Job & { id: string }, token: string) { true, ); - indexJob(job, doc); - logger.debug("Declaring job as done..."); await addCrawlJobDone(job.data.crawl_id, job.id, true); @@ -1381,8 +1360,6 @@ async function processJob(job: Job & { id: string }, token: string) { cost_tracking: costTracking, pdf_num_pages: doc.metadata.numPages, }); - - indexJob(job, doc); } if (job.data.is_scrape !== true) {