poc progress

This commit is contained in:
Gergő Móricz 2025-05-27 21:03:45 +02:00
parent 474e5a0543
commit 8fc02f8604
5 changed files with 136 additions and 69 deletions

View File

@ -0,0 +1,40 @@
import { EngineScrapeResult } from "..";
import { Meta } from "../..";
import { getIndexFromGCS, index_supabase_service } from "../../../../services";
import { EngineError, IndexMissError } from "../../error";
export async function scrapeURLWithIndex(meta: Meta): Promise<EngineScrapeResult> {
const { data, error } = await index_supabase_service
.from("pages")
.select("*")
.eq("url", meta.url)
.order("created_at", { ascending: false })
.limit(1);
if (error) {
throw new EngineError("Failed to scrape URL with index", {
cause: error,
});
}
if (data.length === 0) {
throw new IndexMissError();
}
const id = data[0].id;
const doc = await getIndexFromGCS(id + ".json");
if (!doc) {
throw new EngineError("No document found in index");
}
return {
url: doc.url,
html: doc.html,
statusCode: doc.statusCode,
error: doc.error,
screenshot: doc.screenshot,
markdown: doc.markdown,
numPages: doc.numPages,
};
}

View File

@ -92,3 +92,9 @@ export class PDFInsufficientTimeError extends Error {
super(`Insufficient time to process PDF of ${pageCount} pages. Please increase the timeout parameter in your scrape request to at least ${minTimeout}ms.`);
}
}
export class IndexMissError extends Error {
constructor() {
super("Index doesn't have the page we're looking for");
}
}

View File

@ -0,0 +1,90 @@
import { createClient, SupabaseClient } from "@supabase/supabase-js";
import { logger } from "../lib/logger";
import { configDotenv } from "dotenv";
import { Storage } from "@google-cloud/storage";
configDotenv();
// SupabaseService class initializes the Supabase client conditionally based on environment variables.
class IndexSupabaseService {
private client: SupabaseClient | null = null;
constructor() {
const supabaseUrl = process.env.INDEX_SUPABASE_URL;
const supabaseServiceToken = process.env.INDEX_SUPABASE_SERVICE_TOKEN;
const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true";
// Only initialize the Supabase client if both URL and Service Token are provided.
if (!useDbAuthentication) {
// Warn the user that Authentication is disabled by setting the client to null
logger.warn(
"Authentication is disabled. Index supabase client will not be initialized.",
);
this.client = null;
} else if (!supabaseUrl || !supabaseServiceToken) {
logger.error(
"Index supabase environment variables aren't configured correctly. Index supabase client will not be initialized. Fix ENV configuration or disable DB authentication with USE_DB_AUTHENTICATION env variable",
);
} else {
this.client = createClient(supabaseUrl, supabaseServiceToken);
}
}
// Provides access to the initialized Supabase client, if available.
getClient(): SupabaseClient | null {
return this.client;
}
}
const serv = new IndexSupabaseService();
// Using a Proxy to handle dynamic access to the Supabase client or service methods.
// This approach ensures that if Supabase is not configured, any attempt to use it will result in a clear error.
export const index_supabase_service: SupabaseClient = new Proxy(
serv,
{
get: function (target, prop, receiver) {
const client = target.getClient();
// If the Supabase client is not initialized, intercept property access to provide meaningful error feedback.
if (client === null) {
return () => {
throw new Error("Index supabase client is not configured.");
};
}
// Direct access to SupabaseService properties takes precedence.
if (prop in target) {
return Reflect.get(target, prop, receiver);
}
// Otherwise, delegate access to the Supabase client.
return Reflect.get(client, prop, receiver);
},
},
) as unknown as SupabaseClient;
const credentials = process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined;
export async function getIndexFromGCS(url: string): Promise<any | null> {
// logger.info(`Getting f-engine document from GCS`, {
// url,
// });
try {
if (!process.env.GCS_INDEX_BUCKET_NAME) {
return null;
}
const storage = new Storage({ credentials });
const bucket = storage.bucket(process.env.GCS_INDEX_BUCKET_NAME);
const blob = bucket.file(`${url}`);
const [exists] = await blob.exists();
if (!exists) {
return null;
}
const [blobContent] = await blob.download();
const parsed = JSON.parse(blobContent.toString());
return parsed;
} catch (error) {
logger.error(`Error getting f-engine document from GCS`, {
error,
url,
});
return null;
}
}

View File

@ -21,47 +21,6 @@ function cleanOfNull<T>(x: T): T {
}
}
async function indexJob(job: FirecrawlJob): Promise<void> {
try {
if (job.mode !== "single_urls" && job.mode !== "scrape") {
return;
}
const response = await fetch(`${process.env.FIRE_INDEX_SERVER_URL}/api/jobs`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
url: job.url,
mode: job.mode || "scrape",
docs: job.docs,
origin: job.origin,
success: job.success,
time_taken: job.time_taken,
num_tokens: job.num_tokens,
page_options: job.scrapeOptions,
date_added: new Date().toISOString(),
}),
});
if (!response.ok) {
const errorData = await response.json();
// logger.error(`Failed to send job to external server: ${response.status} ${response.statusText}`, {
// error: errorData,
// scrapeId: job.job_id,
// });
} else {
// logger.debug("Job sent to external server successfully!", { scrapeId: job.job_id });
}
} catch (error) {
// logger.error(`Error sending job to external server: ${error.message}`, {
// error,
// scrapeId: job.job_id,
// });
}
}
export async function logJob(job: FirecrawlJob, force: boolean = false) {
try {
const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true";
@ -106,11 +65,6 @@ export async function logJob(job: FirecrawlJob, force: boolean = false) {
pdf_num_pages: job.pdf_num_pages ?? null,
};
// Send job to external server
if (process.env.FIRE_INDEX_SERVER_URL) {
indexJob(job);
}
if (process.env.GCS_BUCKET_NAME) {
await saveJobToGCS(job);
}

View File

@ -1046,25 +1046,6 @@ async function processKickoffJob(job: Job & { id: string }, token: string) {
}
}
async function indexJob(job: Job & { id: string }, document: Document) {
if (
document &&
document.markdown &&
job.data.team_id === process.env.BACKGROUND_INDEX_TEAM_ID!
) {
// indexPage({
// document: document,
// originUrl: job.data.crawl_id
// ? (await getCrawl(job.data.crawl_id))?.originUrl!
// : document.metadata.sourceURL!,
// crawlId: job.data.crawl_id,
// teamId: job.data.team_id,
// }).catch((error) => {
// _logger.error("Error indexing page", { error });
// });
}
}
async function processJob(job: Job & { id: string }, token: string) {
const logger = _logger.child({
module: "queue-worker",
@ -1263,8 +1244,6 @@ async function processJob(job: Job & { id: string }, token: string) {
true,
);
indexJob(job, doc);
logger.debug("Declaring job as done...");
await addCrawlJobDone(job.data.crawl_id, job.id, true);
@ -1381,8 +1360,6 @@ async function processJob(job: Job & { id: string }, token: string) {
cost_tracking: costTracking,
pdf_num_pages: doc.metadata.numPages,
});
indexJob(job, doc);
}
if (job.data.is_scrape !== true) {