mirror of
https://git.mirrors.martin98.com/https://github.com/mendableai/firecrawl
synced 2025-08-12 11:19:02 +08:00
Revert "feat(crawl-status): retrieve job data from GCS (#1427)"
This reverts commit 673bf6a2dea0f2340f964ba9c25cab8e92d929e4.
This commit is contained in:
parent
673bf6a2de
commit
670e4a6bf1
@ -13,7 +13,7 @@ import {
|
|||||||
getDoneJobsOrderedLength,
|
getDoneJobsOrderedLength,
|
||||||
isCrawlKickoffFinished,
|
isCrawlKickoffFinished,
|
||||||
} from "../../lib/crawl-redis";
|
} from "../../lib/crawl-redis";
|
||||||
import { getScrapeQueue } from "../../services/queue-service";
|
import { getScrapeQueue, QueueFunction } from "../../services/queue-service";
|
||||||
import {
|
import {
|
||||||
supabaseGetJobById,
|
supabaseGetJobById,
|
||||||
supabaseGetJobsById,
|
supabaseGetJobsById,
|
||||||
@ -23,7 +23,6 @@ import type { Job, JobState, Queue } from "bullmq";
|
|||||||
import { logger } from "../../lib/logger";
|
import { logger } from "../../lib/logger";
|
||||||
import { supabase_rr_service, supabase_service } from "../../services/supabase";
|
import { supabase_rr_service, supabase_service } from "../../services/supabase";
|
||||||
import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit";
|
import { getConcurrencyLimitedJobs } from "../../lib/concurrency-limit";
|
||||||
import { getJobFromGCS } from "../../lib/gcs-jobs";
|
|
||||||
configDotenv();
|
configDotenv();
|
||||||
|
|
||||||
export type PseudoJob<T> = {
|
export type PseudoJob<T> = {
|
||||||
@ -40,20 +39,14 @@ export type PseudoJob<T> = {
|
|||||||
export type DBJob = { docs: any, success: boolean, page_options: any, date_added: any, message: string | null }
|
export type DBJob = { docs: any, success: boolean, page_options: any, date_added: any, message: string | null }
|
||||||
|
|
||||||
export async function getJob(id: string): Promise<PseudoJob<any> | null> {
|
export async function getJob(id: string): Promise<PseudoJob<any> | null> {
|
||||||
const [bullJob, dbJob, gcsJob] = await Promise.all([
|
const [bullJob, dbJob] = await Promise.all([
|
||||||
getScrapeQueue().getJob(id),
|
getScrapeQueue().getJob(id),
|
||||||
(process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobById(id) : null) as Promise<DBJob | null>,
|
(process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobById(id) : null) as Promise<DBJob | null>,
|
||||||
(process.env.GCS_BUCKET_NAME ? getJobFromGCS(id) : null) as Promise<any | null>,
|
|
||||||
]);
|
]);
|
||||||
|
|
||||||
if (!bullJob && !dbJob) return null;
|
if (!bullJob && !dbJob) return null;
|
||||||
|
|
||||||
const data = gcsJob ?? dbJob?.docs ?? bullJob?.returnvalue;
|
const data = dbJob?.docs ?? bullJob?.returnvalue;
|
||||||
if (gcsJob === null && data) {
|
|
||||||
logger.warn("GCS Job not found", {
|
|
||||||
jobId: id,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
const job: PseudoJob<any> = {
|
const job: PseudoJob<any> = {
|
||||||
id,
|
id,
|
||||||
@ -72,15 +65,13 @@ export async function getJob(id: string): Promise<PseudoJob<any> | null> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export async function getJobs(ids: string[]): Promise<PseudoJob<any>[]> {
|
export async function getJobs(ids: string[]): Promise<PseudoJob<any>[]> {
|
||||||
const [bullJobs, dbJobs, gcsJobs] = await Promise.all([
|
const [bullJobs, dbJobs] = await Promise.all([
|
||||||
Promise.all(ids.map((x) => getScrapeQueue().getJob(x))).then(x => x.filter(x => x)) as Promise<(Job<any, any, string> & { id: string })[]>,
|
Promise.all(ids.map((x) => getScrapeQueue().getJob(x))).then(x => x.filter(x => x)) as Promise<(Job<any, any, string> & { id: string })[]>,
|
||||||
process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobsById(ids) : [],
|
process.env.USE_DB_AUTHENTICATION === "true" ? supabaseGetJobsById(ids) : [],
|
||||||
process.env.GCS_BUCKET_NAME ? Promise.all(ids.map(async (x) => ({ id: x, job: await getJobFromGCS(x) }))).then(x => x.filter(x => x.job)) as Promise<({ id: string, job: any | null })[]> : [],
|
|
||||||
]);
|
]);
|
||||||
|
|
||||||
const bullJobMap = new Map<string, PseudoJob<any>>();
|
const bullJobMap = new Map<string, PseudoJob<any>>();
|
||||||
const dbJobMap = new Map<string, DBJob>();
|
const dbJobMap = new Map<string, DBJob>();
|
||||||
const gcsJobMap = new Map<string, any>();
|
|
||||||
|
|
||||||
for (const job of bullJobs) {
|
for (const job of bullJobs) {
|
||||||
bullJobMap.set(job.id, job);
|
bullJobMap.set(job.id, job);
|
||||||
@ -90,25 +81,15 @@ export async function getJobs(ids: string[]): Promise<PseudoJob<any>[]> {
|
|||||||
dbJobMap.set(job.job_id, job);
|
dbJobMap.set(job.job_id, job);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const job of gcsJobs) {
|
|
||||||
gcsJobMap.set(job.id, job.job);
|
|
||||||
}
|
|
||||||
|
|
||||||
const jobs: PseudoJob<any>[] = [];
|
const jobs: PseudoJob<any>[] = [];
|
||||||
|
|
||||||
for (const id of ids) {
|
for (const id of ids) {
|
||||||
const bullJob = bullJobMap.get(id);
|
const bullJob = bullJobMap.get(id);
|
||||||
const dbJob = dbJobMap.get(id);
|
const dbJob = dbJobMap.get(id);
|
||||||
const gcsJob = gcsJobMap.get(id);
|
|
||||||
|
|
||||||
if (!bullJob && !dbJob) continue;
|
if (!bullJob && !dbJob) continue;
|
||||||
|
|
||||||
const data = gcsJob ?? dbJob?.docs ?? bullJob?.returnvalue;
|
const data = dbJob?.docs ?? bullJob?.returnvalue;
|
||||||
if (gcsJob === null && data) {
|
|
||||||
logger.warn("GCS Job not found", {
|
|
||||||
jobId: id,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
const job: PseudoJob<any> = {
|
const job: PseudoJob<any> = {
|
||||||
id,
|
id,
|
||||||
|
@ -1,104 +0,0 @@
|
|||||||
import { FirecrawlJob } from "../types";
|
|
||||||
import { Storage } from "@google-cloud/storage";
|
|
||||||
import { logger } from "./logger";
|
|
||||||
|
|
||||||
const credentials = process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined;
|
|
||||||
|
|
||||||
export async function saveJobToGCS(job: FirecrawlJob): Promise<void> {
|
|
||||||
try {
|
|
||||||
if (!process.env.GCS_BUCKET_NAME) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const storage = new Storage({ credentials });
|
|
||||||
const bucket = storage.bucket(process.env.GCS_BUCKET_NAME);
|
|
||||||
const blob = bucket.file(`${job.job_id}.json`);
|
|
||||||
for (let i = 0; i < 3; i++) {
|
|
||||||
try {
|
|
||||||
await blob.save(JSON.stringify(job.docs), {
|
|
||||||
contentType: "application/json",
|
|
||||||
});
|
|
||||||
break;
|
|
||||||
} catch (error) {
|
|
||||||
if (i === 2) {
|
|
||||||
throw error;
|
|
||||||
} else {
|
|
||||||
logger.error(`Error saving job to GCS, retrying`, {
|
|
||||||
error,
|
|
||||||
scrapeId: job.job_id,
|
|
||||||
jobId: job.job_id,
|
|
||||||
i,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (let i = 0; i < 3; i++) {
|
|
||||||
try {
|
|
||||||
await blob.setMetadata({
|
|
||||||
metadata: {
|
|
||||||
job_id: job.job_id ?? null,
|
|
||||||
success: job.success,
|
|
||||||
message: job.message ?? null,
|
|
||||||
num_docs: job.num_docs,
|
|
||||||
time_taken: job.time_taken,
|
|
||||||
team_id: (job.team_id === "preview" || job.team_id?.startsWith("preview_")) ? null : job.team_id,
|
|
||||||
mode: job.mode,
|
|
||||||
url: job.url,
|
|
||||||
crawler_options: JSON.stringify(job.crawlerOptions),
|
|
||||||
page_options: JSON.stringify(job.scrapeOptions),
|
|
||||||
origin: job.origin,
|
|
||||||
num_tokens: job.num_tokens ?? null,
|
|
||||||
retry: !!job.retry,
|
|
||||||
crawl_id: job.crawl_id ?? null,
|
|
||||||
tokens_billed: job.tokens_billed ?? null,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
break;
|
|
||||||
} catch (error) {
|
|
||||||
if (i === 2) {
|
|
||||||
throw error;
|
|
||||||
} else {
|
|
||||||
logger.error(`Error saving job metadata to GCS, retrying`, {
|
|
||||||
error,
|
|
||||||
scrapeId: job.job_id,
|
|
||||||
jobId: job.job_id,
|
|
||||||
i,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(`Error saving job to GCS`, {
|
|
||||||
error,
|
|
||||||
scrapeId: job.job_id,
|
|
||||||
jobId: job.job_id,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function getJobFromGCS(jobId: string): Promise<Document[] | null> {
|
|
||||||
try {
|
|
||||||
if (!process.env.GCS_BUCKET_NAME) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
const storage = new Storage({ credentials });
|
|
||||||
const bucket = storage.bucket(process.env.GCS_BUCKET_NAME);
|
|
||||||
const blob = bucket.file(`${jobId}.json`);
|
|
||||||
const [exists] = await blob.exists();
|
|
||||||
if (!exists) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
const [content] = await blob.download();
|
|
||||||
const x = JSON.parse(content.toString());
|
|
||||||
console.log("Downloaded file ", jobId, x);
|
|
||||||
return x;
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(`Error getting job from GCS`, {
|
|
||||||
error,
|
|
||||||
jobId,
|
|
||||||
scrapeId: jobId,
|
|
||||||
});
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
@ -4,7 +4,7 @@ import { posthog } from "../posthog";
|
|||||||
import "dotenv/config";
|
import "dotenv/config";
|
||||||
import { logger } from "../../lib/logger";
|
import { logger } from "../../lib/logger";
|
||||||
import { configDotenv } from "dotenv";
|
import { configDotenv } from "dotenv";
|
||||||
import { saveJobToGCS } from "../../lib/gcs-jobs";
|
import { Storage } from "@google-cloud/storage";
|
||||||
configDotenv();
|
configDotenv();
|
||||||
|
|
||||||
function cleanOfNull<T>(x: T): T {
|
function cleanOfNull<T>(x: T): T {
|
||||||
@ -21,6 +21,45 @@ function cleanOfNull<T>(x: T): T {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async function saveJobToGCS(job: FirecrawlJob, bucketName: string): Promise<void> {
|
||||||
|
try {
|
||||||
|
const storage = new Storage({
|
||||||
|
credentials: process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined,
|
||||||
|
});
|
||||||
|
const bucket = storage.bucket(bucketName);
|
||||||
|
const blob = bucket.file(`${job.job_id}.json`);
|
||||||
|
await blob.save(JSON.stringify(job.docs), {
|
||||||
|
contentType: "application/json",
|
||||||
|
});
|
||||||
|
await blob.setMetadata({
|
||||||
|
metadata: {
|
||||||
|
job_id: job.job_id ?? null,
|
||||||
|
success: job.success,
|
||||||
|
message: job.message ?? null,
|
||||||
|
num_docs: job.num_docs,
|
||||||
|
time_taken: job.time_taken,
|
||||||
|
team_id: (job.team_id === "preview" || job.team_id?.startsWith("preview_"))? null : job.team_id,
|
||||||
|
mode: job.mode,
|
||||||
|
url: job.url,
|
||||||
|
crawler_options: job.crawlerOptions,
|
||||||
|
page_options: job.scrapeOptions,
|
||||||
|
origin: job.origin,
|
||||||
|
num_tokens: job.num_tokens ?? null,
|
||||||
|
retry: !!job.retry,
|
||||||
|
crawl_id: job.crawl_id ?? null,
|
||||||
|
tokens_billed: job.tokens_billed ?? null,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`Error saving job to GCS`, {
|
||||||
|
error,
|
||||||
|
scrapeId: job.job_id,
|
||||||
|
jobId: job.job_id,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async function indexJob(job: FirecrawlJob): Promise<void> {
|
async function indexJob(job: FirecrawlJob): Promise<void> {
|
||||||
try {
|
try {
|
||||||
if (job.mode !== "single_urls" && job.mode !== "scrape") {
|
if (job.mode !== "single_urls" && job.mode !== "scrape") {
|
||||||
@ -109,7 +148,7 @@ export async function logJob(job: FirecrawlJob, force: boolean = false) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (process.env.GCS_BUCKET_NAME) {
|
if (process.env.GCS_BUCKET_NAME) {
|
||||||
await saveJobToGCS(job);
|
await saveJobToGCS(job, process.env.GCS_BUCKET_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (force) {
|
if (force) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user