diff --git a/apps/api/src/__tests__/snips/search.test.ts b/apps/api/src/__tests__/snips/search.test.ts index 67b07674..22e7bb92 100644 --- a/apps/api/src/__tests__/snips/search.test.ts +++ b/apps/api/src/__tests__/snips/search.test.ts @@ -6,4 +6,18 @@ describe("Search tests", () => { query: "firecrawl" }); }, 60000); + + it.concurrent("works with scrape", async () => { + const res = await search({ + query: "firecrawl", + limit: 5, + scrapeOptions: { + formats: ["markdown"], + }, + }); + + for (const doc of res) { + expect(doc.markdown).toBeDefined(); + } + }, 60000); }); diff --git a/apps/api/src/controllers/v0/scrape.ts b/apps/api/src/controllers/v0/scrape.ts index 053afd50..b7bb3359 100644 --- a/apps/api/src/controllers/v0/scrape.ts +++ b/apps/api/src/controllers/v0/scrape.ts @@ -30,6 +30,7 @@ import { fromLegacyScrapeOptions } from "../v1/types"; import { ZodError } from "zod"; import { Document as V0Document } from "./../../lib/entities"; import { BLOCKLISTED_URL_MESSAGE } from "../../lib/strings"; +import { getJobFromGCS } from "../../lib/gcs-jobs"; export async function scrapeHelper( jobId: string, @@ -93,7 +94,7 @@ export async function scrapeHelper( }, async (span) => { try { - doc = await waitForJob(jobId, timeout); + doc = await waitForJob(jobId, timeout); } catch (e) { if ( e instanceof Error && diff --git a/apps/api/src/controllers/v0/search.ts b/apps/api/src/controllers/v0/search.ts index 45092b77..e216db4b 100644 --- a/apps/api/src/controllers/v0/search.ts +++ b/apps/api/src/controllers/v0/search.ts @@ -22,6 +22,7 @@ import { fromLegacyScrapeOptions, toLegacyDocument, } from "../v1/types"; +import { getJobFromGCS } from "../../lib/gcs-jobs"; export async function searchHelper( jobId: string, @@ -123,7 +124,7 @@ export async function searchHelper( const docs = ( await Promise.all( - jobDatas.map((x) => waitForJob(x.opts.jobId, 60000)), + jobDatas.map((x) => waitForJob(x.opts.jobId, 60000)), ) ).map((x) => toLegacyDocument(x, internalOptions)); diff --git a/apps/api/src/controllers/v1/scrape.ts b/apps/api/src/controllers/v1/scrape.ts index 6259981d..15fa097d 100644 --- a/apps/api/src/controllers/v1/scrape.ts +++ b/apps/api/src/controllers/v1/scrape.ts @@ -13,6 +13,8 @@ import { addScrapeJob, waitForJob } from "../../services/queue-jobs"; import { logJob } from "../../services/logging/log_job"; import { getJobPriority } from "../../lib/job-priority"; import { getScrapeQueue } from "../../services/queue-service"; +import { getJob } from "./crawl-status"; +import { getJobFromGCS } from "../../lib/gcs-jobs"; export async function scrapeController( req: RequestWithAuth<{}, ScrapeResponse, ScrapeRequest>, @@ -66,7 +68,7 @@ export async function scrapeController( let doc: Document; try { - doc = await waitForJob(jobId, timeout + totalWait); // TODO: better types for this + doc = await waitForJob(jobId, timeout + totalWait); } catch (e) { logger.error(`Error in scrapeController: ${e}`, { jobId, @@ -123,21 +125,6 @@ export async function scrapeController( } } - logJob({ - job_id: jobId, - success: true, - message: "Scrape completed", - num_docs: 1, - docs: [doc], - time_taken: timeTakenInSeconds, - team_id: req.auth.team_id, - mode: "scrape", - url: req.body.url, - scrapeOptions: req.body, - origin: origin, - num_tokens: numTokens, - }); - return res.status(200).json({ success: true, data: doc, diff --git a/apps/api/src/controllers/v1/search.ts b/apps/api/src/controllers/v1/search.ts index bcb18166..06f26700 100644 --- a/apps/api/src/controllers/v1/search.ts +++ b/apps/api/src/controllers/v1/search.ts @@ -20,6 +20,7 @@ import * as Sentry from "@sentry/node"; import { BLOCKLISTED_URL_MESSAGE } from "../../lib/strings"; import { logger as _logger } from "../../lib/logger"; import type { Logger } from "winston"; +import { getJobFromGCS } from "../../lib/gcs-jobs"; // Used for deep research export async function searchAndScrapeSearchResult( @@ -99,7 +100,8 @@ async function scrapeSearchResult( jobPriority, ); - const doc = await waitForJob(jobId, options.timeout); + const doc: Document = await waitForJob(jobId, options.timeout); + logger.info("Scrape job completed", { scrapeId: jobId, url: searchResult.url, diff --git a/apps/api/src/lib/extract/document-scraper.ts b/apps/api/src/lib/extract/document-scraper.ts index f5501230..b1d14193 100644 --- a/apps/api/src/lib/extract/document-scraper.ts +++ b/apps/api/src/lib/extract/document-scraper.ts @@ -5,6 +5,7 @@ import { waitForJob } from "../../services/queue-jobs"; import { addScrapeJob } from "../../services/queue-jobs"; import { getJobPriority } from "../job-priority"; import type { Logger } from "winston"; +import { getJobFromGCS } from "../gcs-jobs"; interface ScrapeDocumentOptions { url: string; @@ -53,7 +54,8 @@ export async function scrapeDocument( jobPriority, ); - const doc = await waitForJob(jobId, timeout); + const doc = await waitForJob(jobId, timeout); + await getScrapeQueue().remove(jobId); if (trace) { diff --git a/apps/api/src/lib/gcs-jobs.ts b/apps/api/src/lib/gcs-jobs.ts index 6895c7e1..f4e68cd2 100644 --- a/apps/api/src/lib/gcs-jobs.ts +++ b/apps/api/src/lib/gcs-jobs.ts @@ -1,6 +1,7 @@ import { FirecrawlJob } from "../types"; import { Storage } from "@google-cloud/storage"; import { logger } from "./logger"; +import { Document } from "../controllers/v1/types"; const credentials = process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined; diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 24924d7d..7778393a 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -13,6 +13,8 @@ import { logger } from "../lib/logger"; import { sendNotificationWithCustomDays } from './notification/email_notification'; import { shouldSendConcurrencyLimitNotification } from './notification/notification-check'; import { getACUC, getACUCTeam } from "../controllers/auth"; +import { getJobFromGCS } from "../lib/gcs-jobs"; +import { Document } from "../controllers/v1/types"; /** * Checks if a job is a crawl or batch scrape based on its options @@ -263,10 +265,10 @@ export async function addScrapeJobs( ); } -export function waitForJob( +export function waitForJob( jobId: string, timeout: number, -): Promise { +): Promise { return new Promise((resolve, reject) => { const start = Date.now(); const int = setInterval(async () => { @@ -277,7 +279,18 @@ export function waitForJob( const state = await getScrapeQueue().getJobState(jobId); if (state === "completed") { clearInterval(int); - resolve((await getScrapeQueue().getJob(jobId))!.returnvalue); + let doc: Document; + doc = (await getScrapeQueue().getJob(jobId))!.returnvalue; + + if (!doc) { + const docs = await getJobFromGCS(jobId); + if (!docs || docs.length === 0) { + throw new Error("Job not found in GCS"); + } + doc = docs[0]; + } + + resolve(doc); } else if (state === "failed") { // console.log("failed", (await getScrapeQueue().getJob(jobId)).failedReason); const job = await getScrapeQueue().getJob(jobId); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index a1bf4df9..57ae8eb8 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -353,11 +353,11 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => { if (result.success) { try { if ( - job.data.crawl_id && - process.env.USE_DB_AUTHENTICATION === "true" + process.env.USE_DB_AUTHENTICATION === "true" && + (job.data.crawl_id || process.env.GCS_BUCKET_NAME) ) { logger.debug( - "Job succeeded -- has crawl associated, putting null in Redis", + "Job succeeded -- putting null in Redis", ); await job.moveToCompleted(null, token, false); } else { @@ -1207,6 +1207,21 @@ async function processJob(job: Job & { id: string }, token: string) { await finishCrawlIfNeeded(job, sc); } else { + await logJob({ + job_id: job.id, + success: true, + message: "Scrape completed", + num_docs: 1, + docs: [doc], + time_taken: timeTakenInSeconds, + team_id: job.data.team_id, + mode: "scrape", + url: job.data.url, + scrapeOptions: job.data.scrapeOptions, + origin: job.data.origin, + num_tokens: 0, // TODO: fix + }); + indexJob(job, doc); }