diff --git a/apps/api/src/controllers/v1/crawl-status.ts b/apps/api/src/controllers/v1/crawl-status.ts index bcefa9c3..96aa578e 100644 --- a/apps/api/src/controllers/v1/crawl-status.ts +++ b/apps/api/src/controllers/v1/crawl-status.ts @@ -13,7 +13,7 @@ import { getDoneJobsOrderedLength, isCrawlKickoffFinished, } from "../../lib/crawl-redis"; -import { getScrapeQueue } from "../../services/queue-service"; +import { getScrapeQueue, QueueFunction } from "../../services/queue-service"; import { supabaseGetJobById, supabaseGetJobsById, @@ -23,7 +23,6 @@ import type { Job, JobState, Queue } from "bullmq"; import { logger } from "../../lib/logger"; import { supabase_rr_service, supabase_service } from "../../services/supabase"; import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit"; -import { getJobFromGCS } from "../../lib/gcs-jobs"; configDotenv(); export type PseudoJob = { @@ -40,20 +39,14 @@ export type PseudoJob = { export type DBJob = { docs: any, success: boolean, page_options: any, date_added: any, message: string | null } export async function getJob(id: string): Promise | null> { - const [bullJob, dbJob, gcsJob] = await Promise.all([ + const [bullJob, dbJob] = await Promise.all([ getScrapeQueue().getJob(id), (process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobById(id) : null) as Promise, - (process.env.GCS_BUCKET_NAME ? getJobFromGCS(id) : null) as Promise, ]); if (!bullJob && !dbJob) return null; - const data = gcsJob ?? dbJob?.docs ?? bullJob?.returnvalue; - if (gcsJob === null && data) { - logger.warn("GCS Job not found", { - jobId: id, - }); - } + const data = dbJob?.docs ?? bullJob?.returnvalue; const job: PseudoJob = { id, @@ -72,15 +65,13 @@ export async function getJob(id: string): Promise | null> { } export async function getJobs(ids: string[]): Promise[]> { - const [bullJobs, dbJobs, gcsJobs] = await Promise.all([ + const [bullJobs, dbJobs] = await Promise.all([ Promise.all(ids.map((x) => getScrapeQueue().getJob(x))).then(x => x.filter(x => x)) as Promise<(Job & { id: string })[]>, process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobsById(ids) : [], - process.env.GCS_BUCKET_NAME ? Promise.all(ids.map(async (x) => ({ id: x, job: await getJobFromGCS(x) }))).then(x => x.filter(x => x.job)) as Promise<({ id: string, job: any | null })[]> : [], ]); const bullJobMap = new Map>(); const dbJobMap = new Map(); - const gcsJobMap = new Map(); for (const job of bullJobs) { bullJobMap.set(job.id, job); @@ -90,25 +81,15 @@ export async function getJobs(ids: string[]): Promise[]> { dbJobMap.set(job.job_id, job); } - for (const job of gcsJobs) { - gcsJobMap.set(job.id, job.job); - } - const jobs: PseudoJob[] = []; for (const id of ids) { const bullJob = bullJobMap.get(id); const dbJob = dbJobMap.get(id); - const gcsJob = gcsJobMap.get(id); if (!bullJob && !dbJob) continue; - const data = gcsJob ?? dbJob?.docs ?? bullJob?.returnvalue; - if (gcsJob === null && data) { - logger.warn("GCS Job not found", { - jobId: id, - }); - } + const data = dbJob?.docs ?? bullJob?.returnvalue; const job: PseudoJob = { id, diff --git a/apps/api/src/lib/gcs-jobs.ts b/apps/api/src/lib/gcs-jobs.ts deleted file mode 100644 index 024cc410..00000000 --- a/apps/api/src/lib/gcs-jobs.ts +++ /dev/null @@ -1,104 +0,0 @@ -import { FirecrawlJob } from "../types"; -import { Storage } from "@google-cloud/storage"; -import { logger } from "./logger"; - -const credentials = process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined; - -export async function saveJobToGCS(job: FirecrawlJob): Promise { - try { - if (!process.env.GCS_BUCKET_NAME) { - return; - } - - const storage = new Storage({ credentials }); - const bucket = storage.bucket(process.env.GCS_BUCKET_NAME); - const blob = bucket.file(`${job.job_id}.json`); - for (let i = 0; i < 3; i++) { - try { - await blob.save(JSON.stringify(job.docs), { - contentType: "application/json", - }); - break; - } catch (error) { - if (i === 2) { - throw error; - } else { - logger.error(`Error saving job to GCS, retrying`, { - error, - scrapeId: job.job_id, - jobId: job.job_id, - i, - }); - } - } - } - for (let i = 0; i < 3; i++) { - try { - await blob.setMetadata({ - metadata: { - job_id: job.job_id ?? null, - success: job.success, - message: job.message ?? null, - num_docs: job.num_docs, - time_taken: job.time_taken, - team_id: (job.team_id === "preview" || job.team_id?.startsWith("preview_")) ? null : job.team_id, - mode: job.mode, - url: job.url, - crawler_options: JSON.stringify(job.crawlerOptions), - page_options: JSON.stringify(job.scrapeOptions), - origin: job.origin, - num_tokens: job.num_tokens ?? null, - retry: !!job.retry, - crawl_id: job.crawl_id ?? null, - tokens_billed: job.tokens_billed ?? null, - }, - }); - break; - } catch (error) { - if (i === 2) { - throw error; - } else { - logger.error(`Error saving job metadata to GCS, retrying`, { - error, - scrapeId: job.job_id, - jobId: job.job_id, - i, - }); - } - } - } - } catch (error) { - logger.error(`Error saving job to GCS`, { - error, - scrapeId: job.job_id, - jobId: job.job_id, - }); - } -} - -export async function getJobFromGCS(jobId: string): Promise { - try { - if (!process.env.GCS_BUCKET_NAME) { - return null; - } - - const storage = new Storage({ credentials }); - const bucket = storage.bucket(process.env.GCS_BUCKET_NAME); - const blob = bucket.file(`${jobId}.json`); - const [exists] = await blob.exists(); - if (!exists) { - return null; - } - const [content] = await blob.download(); - const x = JSON.parse(content.toString()); - console.log("Downloaded file ", jobId, x); - return x; - } catch (error) { - logger.error(`Error getting job from GCS`, { - error, - jobId, - scrapeId: jobId, - }); - return null; - } -} \ No newline at end of file diff --git a/apps/api/src/services/logging/log_job.ts b/apps/api/src/services/logging/log_job.ts index ae1d66ae..9caa6716 100644 --- a/apps/api/src/services/logging/log_job.ts +++ b/apps/api/src/services/logging/log_job.ts @@ -4,7 +4,7 @@ import { posthog } from "../posthog"; import "dotenv/config"; import { logger } from "../../lib/logger"; import { configDotenv } from "dotenv"; -import { saveJobToGCS } from "../../lib/gcs-jobs"; +import { Storage } from "@google-cloud/storage"; configDotenv(); function cleanOfNull(x: T): T { @@ -21,6 +21,45 @@ function cleanOfNull(x: T): T { } } + +async function saveJobToGCS(job: FirecrawlJob, bucketName: string): Promise { + try { + const storage = new Storage({ + credentials: process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined, + }); + const bucket = storage.bucket(bucketName); + const blob = bucket.file(`${job.job_id}.json`); + await blob.save(JSON.stringify(job.docs), { + contentType: "application/json", + }); + await blob.setMetadata({ + metadata: { + job_id: job.job_id ?? null, + success: job.success, + message: job.message ?? null, + num_docs: job.num_docs, + time_taken: job.time_taken, + team_id: (job.team_id === "preview" || job.team_id?.startsWith("preview_"))? null : job.team_id, + mode: job.mode, + url: job.url, + crawler_options: job.crawlerOptions, + page_options: job.scrapeOptions, + origin: job.origin, + num_tokens: job.num_tokens ?? null, + retry: !!job.retry, + crawl_id: job.crawl_id ?? null, + tokens_billed: job.tokens_billed ?? null, + }, + }) + } catch (error) { + logger.error(`Error saving job to GCS`, { + error, + scrapeId: job.job_id, + jobId: job.job_id, + }); + } +} + async function indexJob(job: FirecrawlJob): Promise { try { if (job.mode !== "single_urls" && job.mode !== "scrape") { @@ -109,7 +148,7 @@ export async function logJob(job: FirecrawlJob, force: boolean = false) { } if (process.env.GCS_BUCKET_NAME) { - await saveJobToGCS(job); + await saveJobToGCS(job, process.env.GCS_BUCKET_NAME); } if (force) {