diff --git a/apps/api/src/controllers/v1/crawl-status.ts b/apps/api/src/controllers/v1/crawl-status.ts index 96aa578e..680db785 100644 --- a/apps/api/src/controllers/v1/crawl-status.ts +++ b/apps/api/src/controllers/v1/crawl-status.ts @@ -13,7 +13,7 @@ import { getDoneJobsOrderedLength, isCrawlKickoffFinished, } from "../../lib/crawl-redis"; -import { getScrapeQueue, QueueFunction } from "../../services/queue-service"; +import { getScrapeQueue } from "../../services/queue-service"; import { supabaseGetJobById, supabaseGetJobsById, @@ -23,6 +23,7 @@ import type { Job, JobState, Queue } from "bullmq"; import { logger } from "../../lib/logger"; import { supabase_rr_service, supabase_service } from "../../services/supabase"; import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit"; +import { getJobFromGCS } from "../../lib/gcs-jobs"; configDotenv(); export type PseudoJob = { @@ -39,14 +40,15 @@ export type PseudoJob = { export type DBJob = { docs: any, success: boolean, page_options: any, date_added: any, message: string | null } export async function getJob(id: string): Promise | null> { - const [bullJob, dbJob] = await Promise.all([ + const [bullJob, dbJob, gcsJob] = await Promise.all([ getScrapeQueue().getJob(id), (process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobById(id) : null) as Promise, + (process.env.GCS_BUCKET_NAME ? getJobFromGCS(id) : null) as Promise, ]); if (!bullJob && !dbJob) return null; - const data = dbJob?.docs ?? bullJob?.returnvalue; + const data = gcsJob ?? dbJob?.docs ?? bullJob?.returnvalue; const job: PseudoJob = { id, @@ -65,13 +67,15 @@ export async function getJob(id: string): Promise | null> { } export async function getJobs(ids: string[]): Promise[]> { - const [bullJobs, dbJobs] = await Promise.all([ + const [bullJobs, dbJobs, gcsJobs] = await Promise.all([ Promise.all(ids.map((x) => getScrapeQueue().getJob(x))).then(x => x.filter(x => x)) as Promise<(Job & { id: string })[]>, process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobsById(ids) : [], + process.env.GCS_BUCKET_NAME ? Promise.all(ids.map(async (x) => ({ id: x, job: await getJobFromGCS(x) }))).then(x => x.filter(x => x.job)) as Promise<({ id: string, job: any | null })[]> : [], ]); const bullJobMap = new Map>(); const dbJobMap = new Map(); + const gcsJobMap = new Map(); for (const job of bullJobs) { bullJobMap.set(job.id, job); @@ -81,15 +85,20 @@ export async function getJobs(ids: string[]): Promise[]> { dbJobMap.set(job.job_id, job); } + for (const job of gcsJobs) { + gcsJobMap.set(job.id, job.job); + } + const jobs: PseudoJob[] = []; for (const id of ids) { const bullJob = bullJobMap.get(id); const dbJob = dbJobMap.get(id); + const gcsJob = gcsJobMap.get(id); if (!bullJob && !dbJob) continue; - const data = dbJob?.docs ?? bullJob?.returnvalue; + const data = gcsJob ?? dbJob?.docs ?? bullJob?.returnvalue; const job: PseudoJob = { id, diff --git a/apps/api/src/lib/gcs-jobs.ts b/apps/api/src/lib/gcs-jobs.ts new file mode 100644 index 00000000..8e3ffd84 --- /dev/null +++ b/apps/api/src/lib/gcs-jobs.ts @@ -0,0 +1,72 @@ +import { FirecrawlJob } from "../types"; +import { Storage } from "@google-cloud/storage"; +import { logger } from "./logger"; + +const credentials = process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined; + +export async function saveJobToGCS(job: FirecrawlJob): Promise { + try { + if (!process.env.GCS_BUCKET_NAME) { + return; + } + + const storage = new Storage({ credentials }); + const bucket = storage.bucket(process.env.GCS_BUCKET_NAME); + const blob = bucket.file(`${job.job_id}.json`); + await blob.save(JSON.stringify(job.docs), { + contentType: "application/json", + }); + await blob.setMetadata({ + metadata: { + job_id: job.job_id ?? null, + success: job.success, + message: job.message ?? null, + num_docs: job.num_docs, + time_taken: job.time_taken, + team_id: (job.team_id === "preview" || job.team_id?.startsWith("preview_")) ? null : job.team_id, + mode: job.mode, + url: job.url, + crawler_options: job.crawlerOptions, + page_options: job.scrapeOptions, + origin: job.origin, + num_tokens: job.num_tokens ?? null, + retry: !!job.retry, + crawl_id: job.crawl_id ?? null, + tokens_billed: job.tokens_billed ?? null, + }, + }) + } catch (error) { + logger.error(`Error saving job to GCS`, { + error, + scrapeId: job.job_id, + jobId: job.job_id, + }); + } +} + +export async function getJobFromGCS(jobId: string): Promise { + try { + if (!process.env.GCS_BUCKET_NAME) { + return null; + } + + const storage = new Storage({ credentials }); + const bucket = storage.bucket(process.env.GCS_BUCKET_NAME); + const blob = bucket.file(`${jobId}.json`); + const [exists] = await blob.exists(); + if (!exists) { + return null; + } + const [content] = await blob.download(); + const x = JSON.parse(content.toString()); + console.log("Downloaded file ", jobId, x); + return x; + } catch (error) { + logger.error(`Error getting job from GCS`, { + error, + jobId, + scrapeId: jobId, + }); + return null; + } +} \ No newline at end of file diff --git a/apps/api/src/services/logging/log_job.ts b/apps/api/src/services/logging/log_job.ts index 9caa6716..ae1d66ae 100644 --- a/apps/api/src/services/logging/log_job.ts +++ b/apps/api/src/services/logging/log_job.ts @@ -4,7 +4,7 @@ import { posthog } from "../posthog"; import "dotenv/config"; import { logger } from "../../lib/logger"; import { configDotenv } from "dotenv"; -import { Storage } from "@google-cloud/storage"; +import { saveJobToGCS } from "../../lib/gcs-jobs"; configDotenv(); function cleanOfNull(x: T): T { @@ -21,45 +21,6 @@ function cleanOfNull(x: T): T { } } - -async function saveJobToGCS(job: FirecrawlJob, bucketName: string): Promise { - try { - const storage = new Storage({ - credentials: process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined, - }); - const bucket = storage.bucket(bucketName); - const blob = bucket.file(`${job.job_id}.json`); - await blob.save(JSON.stringify(job.docs), { - contentType: "application/json", - }); - await blob.setMetadata({ - metadata: { - job_id: job.job_id ?? null, - success: job.success, - message: job.message ?? null, - num_docs: job.num_docs, - time_taken: job.time_taken, - team_id: (job.team_id === "preview" || job.team_id?.startsWith("preview_"))? null : job.team_id, - mode: job.mode, - url: job.url, - crawler_options: job.crawlerOptions, - page_options: job.scrapeOptions, - origin: job.origin, - num_tokens: job.num_tokens ?? null, - retry: !!job.retry, - crawl_id: job.crawl_id ?? null, - tokens_billed: job.tokens_billed ?? null, - }, - }) - } catch (error) { - logger.error(`Error saving job to GCS`, { - error, - scrapeId: job.job_id, - jobId: job.job_id, - }); - } -} - async function indexJob(job: FirecrawlJob): Promise { try { if (job.mode !== "single_urls" && job.mode !== "scrape") { @@ -148,7 +109,7 @@ export async function logJob(job: FirecrawlJob, force: boolean = false) { } if (process.env.GCS_BUCKET_NAME) { - await saveJobToGCS(job, process.env.GCS_BUCKET_NAME); + await saveJobToGCS(job); } if (force) {