feat(scrape): get job result from GCS, avoid Redis (#1461)

* feat(scrape): get job result from GCS, avoid Redis

* call logjob on scrapes

* Fix inverse bool

* fix more

* migrate gracefully

* refactor

* feat(tests/search): test with scrape
This commit is contained in:
Gergő Móricz 2025-04-15 00:07:44 +02:00 committed by GitHub
parent 0ac86abead
commit b415e625a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 62 additions and 26 deletions

View File

@ -6,4 +6,18 @@ describe("Search tests", () => {
query: "firecrawl" query: "firecrawl"
}); });
}, 60000); }, 60000);
it.concurrent("works with scrape", async () => {
const res = await search({
query: "firecrawl",
limit: 5,
scrapeOptions: {
formats: ["markdown"],
},
});
for (const doc of res) {
expect(doc.markdown).toBeDefined();
}
}, 60000);
}); });

View File

@ -30,6 +30,7 @@ import { fromLegacyScrapeOptions } from "../v1/types";
import { ZodError } from "zod"; import { ZodError } from "zod";
import { Document as V0Document } from "./../../lib/entities"; import { Document as V0Document } from "./../../lib/entities";
import { BLOCKLISTED_URL_MESSAGE } from "../../lib/strings"; import { BLOCKLISTED_URL_MESSAGE } from "../../lib/strings";
import { getJobFromGCS } from "../../lib/gcs-jobs";
export async function scrapeHelper( export async function scrapeHelper(
jobId: string, jobId: string,
@ -93,7 +94,7 @@ export async function scrapeHelper(
}, },
async (span) => { async (span) => {
try { try {
doc = await waitForJob<Document>(jobId, timeout); doc = await waitForJob(jobId, timeout);
} catch (e) { } catch (e) {
if ( if (
e instanceof Error && e instanceof Error &&

View File

@ -22,6 +22,7 @@ import {
fromLegacyScrapeOptions, fromLegacyScrapeOptions,
toLegacyDocument, toLegacyDocument,
} from "../v1/types"; } from "../v1/types";
import { getJobFromGCS } from "../../lib/gcs-jobs";
export async function searchHelper( export async function searchHelper(
jobId: string, jobId: string,
@ -123,7 +124,7 @@ export async function searchHelper(
const docs = ( const docs = (
await Promise.all( await Promise.all(
jobDatas.map((x) => waitForJob<Document>(x.opts.jobId, 60000)), jobDatas.map((x) => waitForJob(x.opts.jobId, 60000)),
) )
).map((x) => toLegacyDocument(x, internalOptions)); ).map((x) => toLegacyDocument(x, internalOptions));

View File

@ -13,6 +13,8 @@ import { addScrapeJob, waitForJob } from "../../services/queue-jobs";
import { logJob } from "../../services/logging/log_job"; import { logJob } from "../../services/logging/log_job";
import { getJobPriority } from "../../lib/job-priority"; import { getJobPriority } from "../../lib/job-priority";
import { getScrapeQueue } from "../../services/queue-service"; import { getScrapeQueue } from "../../services/queue-service";
import { getJob } from "./crawl-status";
import { getJobFromGCS } from "../../lib/gcs-jobs";
export async function scrapeController( export async function scrapeController(
req: RequestWithAuth<{}, ScrapeResponse, ScrapeRequest>, req: RequestWithAuth<{}, ScrapeResponse, ScrapeRequest>,
@ -66,7 +68,7 @@ export async function scrapeController(
let doc: Document; let doc: Document;
try { try {
doc = await waitForJob<Document>(jobId, timeout + totalWait); // TODO: better types for this doc = await waitForJob(jobId, timeout + totalWait);
} catch (e) { } catch (e) {
logger.error(`Error in scrapeController: ${e}`, { logger.error(`Error in scrapeController: ${e}`, {
jobId, jobId,
@ -123,21 +125,6 @@ export async function scrapeController(
} }
} }
logJob({
job_id: jobId,
success: true,
message: "Scrape completed",
num_docs: 1,
docs: [doc],
time_taken: timeTakenInSeconds,
team_id: req.auth.team_id,
mode: "scrape",
url: req.body.url,
scrapeOptions: req.body,
origin: origin,
num_tokens: numTokens,
});
return res.status(200).json({ return res.status(200).json({
success: true, success: true,
data: doc, data: doc,

View File

@ -20,6 +20,7 @@ import * as Sentry from "@sentry/node";
import { BLOCKLISTED_URL_MESSAGE } from "../../lib/strings"; import { BLOCKLISTED_URL_MESSAGE } from "../../lib/strings";
import { logger as _logger } from "../../lib/logger"; import { logger as _logger } from "../../lib/logger";
import type { Logger } from "winston"; import type { Logger } from "winston";
import { getJobFromGCS } from "../../lib/gcs-jobs";
// Used for deep research // Used for deep research
export async function searchAndScrapeSearchResult( export async function searchAndScrapeSearchResult(
@ -99,7 +100,8 @@ async function scrapeSearchResult(
jobPriority, jobPriority,
); );
const doc = await waitForJob<Document>(jobId, options.timeout); const doc: Document = await waitForJob(jobId, options.timeout);
logger.info("Scrape job completed", { logger.info("Scrape job completed", {
scrapeId: jobId, scrapeId: jobId,
url: searchResult.url, url: searchResult.url,

View File

@ -5,6 +5,7 @@ import { waitForJob } from "../../services/queue-jobs";
import { addScrapeJob } from "../../services/queue-jobs"; import { addScrapeJob } from "../../services/queue-jobs";
import { getJobPriority } from "../job-priority"; import { getJobPriority } from "../job-priority";
import type { Logger } from "winston"; import type { Logger } from "winston";
import { getJobFromGCS } from "../gcs-jobs";
interface ScrapeDocumentOptions { interface ScrapeDocumentOptions {
url: string; url: string;
@ -53,7 +54,8 @@ export async function scrapeDocument(
jobPriority, jobPriority,
); );
const doc = await waitForJob<Document>(jobId, timeout); const doc = await waitForJob(jobId, timeout);
await getScrapeQueue().remove(jobId); await getScrapeQueue().remove(jobId);
if (trace) { if (trace) {

View File

@ -1,6 +1,7 @@
import { FirecrawlJob } from "../types"; import { FirecrawlJob } from "../types";
import { Storage } from "@google-cloud/storage"; import { Storage } from "@google-cloud/storage";
import { logger } from "./logger"; import { logger } from "./logger";
import { Document } from "../controllers/v1/types";
const credentials = process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined; const credentials = process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined;

View File

@ -13,6 +13,8 @@ import { logger } from "../lib/logger";
import { sendNotificationWithCustomDays } from './notification/email_notification'; import { sendNotificationWithCustomDays } from './notification/email_notification';
import { shouldSendConcurrencyLimitNotification } from './notification/notification-check'; import { shouldSendConcurrencyLimitNotification } from './notification/notification-check';
import { getACUC, getACUCTeam } from "../controllers/auth"; import { getACUC, getACUCTeam } from "../controllers/auth";
import { getJobFromGCS } from "../lib/gcs-jobs";
import { Document } from "../controllers/v1/types";
/** /**
* Checks if a job is a crawl or batch scrape based on its options * Checks if a job is a crawl or batch scrape based on its options
@ -263,10 +265,10 @@ export async function addScrapeJobs(
); );
} }
export function waitForJob<T = unknown>( export function waitForJob(
jobId: string, jobId: string,
timeout: number, timeout: number,
): Promise<T> { ): Promise<Document> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const start = Date.now(); const start = Date.now();
const int = setInterval(async () => { const int = setInterval(async () => {
@ -277,7 +279,18 @@ export function waitForJob<T = unknown>(
const state = await getScrapeQueue().getJobState(jobId); const state = await getScrapeQueue().getJobState(jobId);
if (state === "completed") { if (state === "completed") {
clearInterval(int); clearInterval(int);
resolve((await getScrapeQueue().getJob(jobId))!.returnvalue); let doc: Document;
doc = (await getScrapeQueue().getJob(jobId))!.returnvalue;
if (!doc) {
const docs = await getJobFromGCS(jobId);
if (!docs || docs.length === 0) {
throw new Error("Job not found in GCS");
}
doc = docs[0];
}
resolve(doc);
} else if (state === "failed") { } else if (state === "failed") {
// console.log("failed", (await getScrapeQueue().getJob(jobId)).failedReason); // console.log("failed", (await getScrapeQueue().getJob(jobId)).failedReason);
const job = await getScrapeQueue().getJob(jobId); const job = await getScrapeQueue().getJob(jobId);

View File

@ -353,11 +353,11 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => {
if (result.success) { if (result.success) {
try { try {
if ( if (
job.data.crawl_id && process.env.USE_DB_AUTHENTICATION === "true" &&
process.env.USE_DB_AUTHENTICATION === "true" (job.data.crawl_id || process.env.GCS_BUCKET_NAME)
) { ) {
logger.debug( logger.debug(
"Job succeeded -- has crawl associated, putting null in Redis", "Job succeeded -- putting null in Redis",
); );
await job.moveToCompleted(null, token, false); await job.moveToCompleted(null, token, false);
} else { } else {
@ -1207,6 +1207,21 @@ async function processJob(job: Job & { id: string }, token: string) {
await finishCrawlIfNeeded(job, sc); await finishCrawlIfNeeded(job, sc);
} else { } else {
await logJob({
job_id: job.id,
success: true,
message: "Scrape completed",
num_docs: 1,
docs: [doc],
time_taken: timeTakenInSeconds,
team_id: job.data.team_id,
mode: "scrape",
url: job.data.url,
scrapeOptions: job.data.scrapeOptions,
origin: job.data.origin,
num_tokens: 0, // TODO: fix
});
indexJob(job, doc); indexJob(job, doc);
} }