feat(crawl-status): retrieve job data from GCS

This commit is contained in:
Gergő Móricz 2025-04-08 21:06:23 +02:00
parent 670e4a6bf1
commit 777914e968
3 changed files with 88 additions and 46 deletions

View File

@ -13,7 +13,7 @@ import {
getDoneJobsOrderedLength,
isCrawlKickoffFinished,
} from "../../lib/crawl-redis";
import { getScrapeQueue, QueueFunction } from "../../services/queue-service";
import { getScrapeQueue } from "../../services/queue-service";
import {
supabaseGetJobById,
supabaseGetJobsById,
@ -23,6 +23,7 @@ import type { Job, JobState, Queue } from "bullmq";
import { logger } from "../../lib/logger";
import { supabase_rr_service, supabase_service } from "../../services/supabase";
import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit";
import { getJobFromGCS } from "../../lib/gcs-jobs";
configDotenv();
export type PseudoJob<T> = {
@ -39,14 +40,15 @@ export type PseudoJob<T> = {
export type DBJob = { docs: any, success: boolean, page_options: any, date_added: any, message: string | null }
export async function getJob(id: string): Promise<PseudoJob<any> | null> {
const [bullJob, dbJob] = await Promise.all([
const [bullJob, dbJob, gcsJob] = await Promise.all([
getScrapeQueue().getJob(id),
(process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobById(id) : null) as Promise<DBJob | null>,
(process.env.GCS_BUCKET_NAME ? getJobFromGCS(id) : null) as Promise<any | null>,
]);
if (!bullJob && !dbJob) return null;
const data = dbJob?.docs ?? bullJob?.returnvalue;
const data = gcsJob ?? dbJob?.docs ?? bullJob?.returnvalue;
const job: PseudoJob<any> = {
id,
@ -65,13 +67,15 @@ export async function getJob(id: string): Promise<PseudoJob<any> | null> {
}
export async function getJobs(ids: string[]): Promise<PseudoJob<any>[]> {
const [bullJobs, dbJobs] = await Promise.all([
const [bullJobs, dbJobs, gcsJobs] = await Promise.all([
Promise.all(ids.map((x) => getScrapeQueue().getJob(x))).then(x => x.filter(x => x)) as Promise<(Job<any, any, string> & { id: string })[]>,
process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobsById(ids) : [],
process.env.GCS_BUCKET_NAME ? Promise.all(ids.map(async (x) => ({ id: x, job: await getJobFromGCS(x) }))).then(x => x.filter(x => x.job)) as Promise<({ id: string, job: any | null })[]> : [],
]);
const bullJobMap = new Map<string, PseudoJob<any>>();
const dbJobMap = new Map<string, DBJob>();
const gcsJobMap = new Map<string, any>();
for (const job of bullJobs) {
bullJobMap.set(job.id, job);
@ -81,15 +85,20 @@ export async function getJobs(ids: string[]): Promise<PseudoJob<any>[]> {
dbJobMap.set(job.job_id, job);
}
for (const job of gcsJobs) {
gcsJobMap.set(job.id, job.job);
}
const jobs: PseudoJob<any>[] = [];
for (const id of ids) {
const bullJob = bullJobMap.get(id);
const dbJob = dbJobMap.get(id);
const gcsJob = gcsJobMap.get(id);
if (!bullJob && !dbJob) continue;
const data = dbJob?.docs ?? bullJob?.returnvalue;
const data = gcsJob ?? dbJob?.docs ?? bullJob?.returnvalue;
const job: PseudoJob<any> = {
id,

View File

@ -0,0 +1,72 @@
import { FirecrawlJob } from "../types";
import { Storage } from "@google-cloud/storage";
import { logger } from "./logger";
const credentials = process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined;
export async function saveJobToGCS(job: FirecrawlJob): Promise<void> {
try {
if (!process.env.GCS_BUCKET_NAME) {
return;
}
const storage = new Storage({ credentials });
const bucket = storage.bucket(process.env.GCS_BUCKET_NAME);
const blob = bucket.file(`${job.job_id}.json`);
await blob.save(JSON.stringify(job.docs), {
contentType: "application/json",
});
await blob.setMetadata({
metadata: {
job_id: job.job_id ?? null,
success: job.success,
message: job.message ?? null,
num_docs: job.num_docs,
time_taken: job.time_taken,
team_id: (job.team_id === "preview" || job.team_id?.startsWith("preview_")) ? null : job.team_id,
mode: job.mode,
url: job.url,
crawler_options: job.crawlerOptions,
page_options: job.scrapeOptions,
origin: job.origin,
num_tokens: job.num_tokens ?? null,
retry: !!job.retry,
crawl_id: job.crawl_id ?? null,
tokens_billed: job.tokens_billed ?? null,
},
})
} catch (error) {
logger.error(`Error saving job to GCS`, {
error,
scrapeId: job.job_id,
jobId: job.job_id,
});
}
}
export async function getJobFromGCS(jobId: string): Promise<Document[] | null> {
try {
if (!process.env.GCS_BUCKET_NAME) {
return null;
}
const storage = new Storage({ credentials });
const bucket = storage.bucket(process.env.GCS_BUCKET_NAME);
const blob = bucket.file(`${jobId}.json`);
const [exists] = await blob.exists();
if (!exists) {
return null;
}
const [content] = await blob.download();
const x = JSON.parse(content.toString());
console.log("Downloaded file ", jobId, x);
return x;
} catch (error) {
logger.error(`Error getting job from GCS`, {
error,
jobId,
scrapeId: jobId,
});
return null;
}
}

View File

@ -4,7 +4,7 @@ import { posthog } from "../posthog";
import "dotenv/config";
import { logger } from "../../lib/logger";
import { configDotenv } from "dotenv";
import { Storage } from "@google-cloud/storage";
import { saveJobToGCS } from "../../lib/gcs-jobs";
configDotenv();
function cleanOfNull<T>(x: T): T {
@ -21,45 +21,6 @@ function cleanOfNull<T>(x: T): T {
}
}
async function saveJobToGCS(job: FirecrawlJob, bucketName: string): Promise<void> {
try {
const storage = new Storage({
credentials: process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined,
});
const bucket = storage.bucket(bucketName);
const blob = bucket.file(`${job.job_id}.json`);
await blob.save(JSON.stringify(job.docs), {
contentType: "application/json",
});
await blob.setMetadata({
metadata: {
job_id: job.job_id ?? null,
success: job.success,
message: job.message ?? null,
num_docs: job.num_docs,
time_taken: job.time_taken,
team_id: (job.team_id === "preview" || job.team_id?.startsWith("preview_"))? null : job.team_id,
mode: job.mode,
url: job.url,
crawler_options: job.crawlerOptions,
page_options: job.scrapeOptions,
origin: job.origin,
num_tokens: job.num_tokens ?? null,
retry: !!job.retry,
crawl_id: job.crawl_id ?? null,
tokens_billed: job.tokens_billed ?? null,
},
})
} catch (error) {
logger.error(`Error saving job to GCS`, {
error,
scrapeId: job.job_id,
jobId: job.job_id,
});
}
}
async function indexJob(job: FirecrawlJob): Promise<void> {
try {
if (job.mode !== "single_urls" && job.mode !== "scrape") {
@ -148,7 +109,7 @@ export async function logJob(job: FirecrawlJob, force: boolean = false) {
}
if (process.env.GCS_BUCKET_NAME) {
await saveJobToGCS(job, process.env.GCS_BUCKET_NAME);
await saveJobToGCS(job);
}
if (force) {