mirror of
https://git.mirrors.martin98.com/https://github.com/mendableai/firecrawl
synced 2025-08-11 20:48:58 +08:00
feat(crawl-status): retrieve job data from GCS (#1427)
* feat(crawl-status): retrieve job data from GCS * feat(gcs-jobs/save): retrying saving metadata (might conflict) * feat(gcs-jobs/save): retry save operation * fix(gcs-jobs/save): respect metadata rules * feat(crawl-status): log if gcs job is not found
This commit is contained in:
parent
ab6fb48e6e
commit
673bf6a2de
@ -13,7 +13,7 @@ import {
|
|||||||
getDoneJobsOrderedLength,
|
getDoneJobsOrderedLength,
|
||||||
isCrawlKickoffFinished,
|
isCrawlKickoffFinished,
|
||||||
} from "../../lib/crawl-redis";
|
} from "../../lib/crawl-redis";
|
||||||
import { getScrapeQueue, QueueFunction } from "../../services/queue-service";
|
import { getScrapeQueue } from "../../services/queue-service";
|
||||||
import {
|
import {
|
||||||
supabaseGetJobById,
|
supabaseGetJobById,
|
||||||
supabaseGetJobsById,
|
supabaseGetJobsById,
|
||||||
@ -23,6 +23,7 @@ import type { Job, JobState, Queue } from "bullmq";
|
|||||||
import { logger } from "../../lib/logger";
|
import { logger } from "../../lib/logger";
|
||||||
import { supabase_rr_service, supabase_service } from "../../services/supabase";
|
import { supabase_rr_service, supabase_service } from "../../services/supabase";
|
||||||
import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit";
|
import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit";
|
||||||
|
import { getJobFromGCS } from "../../lib/gcs-jobs";
|
||||||
configDotenv();
|
configDotenv();
|
||||||
|
|
||||||
export type PseudoJob<T> = {
|
export type PseudoJob<T> = {
|
||||||
@ -39,14 +40,20 @@ export type PseudoJob<T> = {
|
|||||||
export type DBJob = { docs: any, success: boolean, page_options: any, date_added: any, message: string | null }
|
export type DBJob = { docs: any, success: boolean, page_options: any, date_added: any, message: string | null }
|
||||||
|
|
||||||
export async function getJob(id: string): Promise<PseudoJob<any> | null> {
|
export async function getJob(id: string): Promise<PseudoJob<any> | null> {
|
||||||
const [bullJob, dbJob] = await Promise.all([
|
const [bullJob, dbJob, gcsJob] = await Promise.all([
|
||||||
getScrapeQueue().getJob(id),
|
getScrapeQueue().getJob(id),
|
||||||
(process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobById(id) : null) as Promise<DBJob | null>,
|
(process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobById(id) : null) as Promise<DBJob | null>,
|
||||||
|
(process.env.GCS_BUCKET_NAME ? getJobFromGCS(id) : null) as Promise<any | null>,
|
||||||
]);
|
]);
|
||||||
|
|
||||||
if (!bullJob && !dbJob) return null;
|
if (!bullJob && !dbJob) return null;
|
||||||
|
|
||||||
const data = dbJob?.docs ?? bullJob?.returnvalue;
|
const data = gcsJob ?? dbJob?.docs ?? bullJob?.returnvalue;
|
||||||
|
if (gcsJob === null && data) {
|
||||||
|
logger.warn("GCS Job not found", {
|
||||||
|
jobId: id,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
const job: PseudoJob<any> = {
|
const job: PseudoJob<any> = {
|
||||||
id,
|
id,
|
||||||
@ -65,13 +72,15 @@ export async function getJob(id: string): Promise<PseudoJob<any> | null> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export async function getJobs(ids: string[]): Promise<PseudoJob<any>[]> {
|
export async function getJobs(ids: string[]): Promise<PseudoJob<any>[]> {
|
||||||
const [bullJobs, dbJobs] = await Promise.all([
|
const [bullJobs, dbJobs, gcsJobs] = await Promise.all([
|
||||||
Promise.all(ids.map((x) => getScrapeQueue().getJob(x))).then(x => x.filter(x => x)) as Promise<(Job<any, any, string> & { id: string })[]>,
|
Promise.all(ids.map((x) => getScrapeQueue().getJob(x))).then(x => x.filter(x => x)) as Promise<(Job<any, any, string> & { id: string })[]>,
|
||||||
process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobsById(ids) : [],
|
process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobsById(ids) : [],
|
||||||
|
process.env.GCS_BUCKET_NAME ? Promise.all(ids.map(async (x) => ({ id: x, job: await getJobFromGCS(x) }))).then(x => x.filter(x => x.job)) as Promise<({ id: string, job: any | null })[]> : [],
|
||||||
]);
|
]);
|
||||||
|
|
||||||
const bullJobMap = new Map<string, PseudoJob<any>>();
|
const bullJobMap = new Map<string, PseudoJob<any>>();
|
||||||
const dbJobMap = new Map<string, DBJob>();
|
const dbJobMap = new Map<string, DBJob>();
|
||||||
|
const gcsJobMap = new Map<string, any>();
|
||||||
|
|
||||||
for (const job of bullJobs) {
|
for (const job of bullJobs) {
|
||||||
bullJobMap.set(job.id, job);
|
bullJobMap.set(job.id, job);
|
||||||
@ -81,15 +90,25 @@ export async function getJobs(ids: string[]): Promise<PseudoJob<any>[]> {
|
|||||||
dbJobMap.set(job.job_id, job);
|
dbJobMap.set(job.job_id, job);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (const job of gcsJobs) {
|
||||||
|
gcsJobMap.set(job.id, job.job);
|
||||||
|
}
|
||||||
|
|
||||||
const jobs: PseudoJob<any>[] = [];
|
const jobs: PseudoJob<any>[] = [];
|
||||||
|
|
||||||
for (const id of ids) {
|
for (const id of ids) {
|
||||||
const bullJob = bullJobMap.get(id);
|
const bullJob = bullJobMap.get(id);
|
||||||
const dbJob = dbJobMap.get(id);
|
const dbJob = dbJobMap.get(id);
|
||||||
|
const gcsJob = gcsJobMap.get(id);
|
||||||
|
|
||||||
if (!bullJob && !dbJob) continue;
|
if (!bullJob && !dbJob) continue;
|
||||||
|
|
||||||
const data = dbJob?.docs ?? bullJob?.returnvalue;
|
const data = gcsJob ?? dbJob?.docs ?? bullJob?.returnvalue;
|
||||||
|
if (gcsJob === null && data) {
|
||||||
|
logger.warn("GCS Job not found", {
|
||||||
|
jobId: id,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
const job: PseudoJob<any> = {
|
const job: PseudoJob<any> = {
|
||||||
id,
|
id,
|
||||||
|
104
apps/api/src/lib/gcs-jobs.ts
Normal file
104
apps/api/src/lib/gcs-jobs.ts
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
import { FirecrawlJob } from "../types";
|
||||||
|
import { Storage } from "@google-cloud/storage";
|
||||||
|
import { logger } from "./logger";
|
||||||
|
|
||||||
|
const credentials = process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined;
|
||||||
|
|
||||||
|
export async function saveJobToGCS(job: FirecrawlJob): Promise<void> {
|
||||||
|
try {
|
||||||
|
if (!process.env.GCS_BUCKET_NAME) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const storage = new Storage({ credentials });
|
||||||
|
const bucket = storage.bucket(process.env.GCS_BUCKET_NAME);
|
||||||
|
const blob = bucket.file(`${job.job_id}.json`);
|
||||||
|
for (let i = 0; i < 3; i++) {
|
||||||
|
try {
|
||||||
|
await blob.save(JSON.stringify(job.docs), {
|
||||||
|
contentType: "application/json",
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
} catch (error) {
|
||||||
|
if (i === 2) {
|
||||||
|
throw error;
|
||||||
|
} else {
|
||||||
|
logger.error(`Error saving job to GCS, retrying`, {
|
||||||
|
error,
|
||||||
|
scrapeId: job.job_id,
|
||||||
|
jobId: job.job_id,
|
||||||
|
i,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (let i = 0; i < 3; i++) {
|
||||||
|
try {
|
||||||
|
await blob.setMetadata({
|
||||||
|
metadata: {
|
||||||
|
job_id: job.job_id ?? null,
|
||||||
|
success: job.success,
|
||||||
|
message: job.message ?? null,
|
||||||
|
num_docs: job.num_docs,
|
||||||
|
time_taken: job.time_taken,
|
||||||
|
team_id: (job.team_id === "preview" || job.team_id?.startsWith("preview_")) ? null : job.team_id,
|
||||||
|
mode: job.mode,
|
||||||
|
url: job.url,
|
||||||
|
crawler_options: JSON.stringify(job.crawlerOptions),
|
||||||
|
page_options: JSON.stringify(job.scrapeOptions),
|
||||||
|
origin: job.origin,
|
||||||
|
num_tokens: job.num_tokens ?? null,
|
||||||
|
retry: !!job.retry,
|
||||||
|
crawl_id: job.crawl_id ?? null,
|
||||||
|
tokens_billed: job.tokens_billed ?? null,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
} catch (error) {
|
||||||
|
if (i === 2) {
|
||||||
|
throw error;
|
||||||
|
} else {
|
||||||
|
logger.error(`Error saving job metadata to GCS, retrying`, {
|
||||||
|
error,
|
||||||
|
scrapeId: job.job_id,
|
||||||
|
jobId: job.job_id,
|
||||||
|
i,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`Error saving job to GCS`, {
|
||||||
|
error,
|
||||||
|
scrapeId: job.job_id,
|
||||||
|
jobId: job.job_id,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function getJobFromGCS(jobId: string): Promise<Document[] | null> {
|
||||||
|
try {
|
||||||
|
if (!process.env.GCS_BUCKET_NAME) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const storage = new Storage({ credentials });
|
||||||
|
const bucket = storage.bucket(process.env.GCS_BUCKET_NAME);
|
||||||
|
const blob = bucket.file(`${jobId}.json`);
|
||||||
|
const [exists] = await blob.exists();
|
||||||
|
if (!exists) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
const [content] = await blob.download();
|
||||||
|
const x = JSON.parse(content.toString());
|
||||||
|
console.log("Downloaded file ", jobId, x);
|
||||||
|
return x;
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`Error getting job from GCS`, {
|
||||||
|
error,
|
||||||
|
jobId,
|
||||||
|
scrapeId: jobId,
|
||||||
|
});
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
@ -4,7 +4,7 @@ import { posthog } from "../posthog";
|
|||||||
import "dotenv/config";
|
import "dotenv/config";
|
||||||
import { logger } from "../../lib/logger";
|
import { logger } from "../../lib/logger";
|
||||||
import { configDotenv } from "dotenv";
|
import { configDotenv } from "dotenv";
|
||||||
import { Storage } from "@google-cloud/storage";
|
import { saveJobToGCS } from "../../lib/gcs-jobs";
|
||||||
configDotenv();
|
configDotenv();
|
||||||
|
|
||||||
function cleanOfNull<T>(x: T): T {
|
function cleanOfNull<T>(x: T): T {
|
||||||
@ -21,45 +21,6 @@ function cleanOfNull<T>(x: T): T {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async function saveJobToGCS(job: FirecrawlJob, bucketName: string): Promise<void> {
|
|
||||||
try {
|
|
||||||
const storage = new Storage({
|
|
||||||
credentials: process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined,
|
|
||||||
});
|
|
||||||
const bucket = storage.bucket(bucketName);
|
|
||||||
const blob = bucket.file(`${job.job_id}.json`);
|
|
||||||
await blob.save(JSON.stringify(job.docs), {
|
|
||||||
contentType: "application/json",
|
|
||||||
});
|
|
||||||
await blob.setMetadata({
|
|
||||||
metadata: {
|
|
||||||
job_id: job.job_id ?? null,
|
|
||||||
success: job.success,
|
|
||||||
message: job.message ?? null,
|
|
||||||
num_docs: job.num_docs,
|
|
||||||
time_taken: job.time_taken,
|
|
||||||
team_id: (job.team_id === "preview" || job.team_id?.startsWith("preview_"))? null : job.team_id,
|
|
||||||
mode: job.mode,
|
|
||||||
url: job.url,
|
|
||||||
crawler_options: job.crawlerOptions,
|
|
||||||
page_options: job.scrapeOptions,
|
|
||||||
origin: job.origin,
|
|
||||||
num_tokens: job.num_tokens ?? null,
|
|
||||||
retry: !!job.retry,
|
|
||||||
crawl_id: job.crawl_id ?? null,
|
|
||||||
tokens_billed: job.tokens_billed ?? null,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(`Error saving job to GCS`, {
|
|
||||||
error,
|
|
||||||
scrapeId: job.job_id,
|
|
||||||
jobId: job.job_id,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function indexJob(job: FirecrawlJob): Promise<void> {
|
async function indexJob(job: FirecrawlJob): Promise<void> {
|
||||||
try {
|
try {
|
||||||
if (job.mode !== "single_urls" && job.mode !== "scrape") {
|
if (job.mode !== "single_urls" && job.mode !== "scrape") {
|
||||||
@ -148,7 +109,7 @@ export async function logJob(job: FirecrawlJob, force: boolean = false) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (process.env.GCS_BUCKET_NAME) {
|
if (process.env.GCS_BUCKET_NAME) {
|
||||||
await saveJobToGCS(job, process.env.GCS_BUCKET_NAME);
|
await saveJobToGCS(job);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (force) {
|
if (force) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user