From 27457ed5db3700bc0b042b6fd7f21a35b26cbabb Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 3 Jan 2025 20:44:27 -0300 Subject: [PATCH 01/10] Nick: init --- apps/api/src/controllers/v1/extract-status.ts | 53 +++++++++++++++++++ apps/api/src/controllers/v1/extract.ts | 47 ++++++++++++++-- apps/api/src/controllers/v1/types.ts | 3 +- .../api/src/lib/extract/extraction-service.ts | 13 +++-- apps/api/src/routes/v1.ts | 14 ++--- apps/api/src/services/queue-service.ts | 39 +++++++++----- apps/api/src/services/queue-worker.ts | 51 ++++++++++++++++++ 7 files changed, 188 insertions(+), 32 deletions(-) create mode 100644 apps/api/src/controllers/v1/extract-status.ts diff --git a/apps/api/src/controllers/v1/extract-status.ts b/apps/api/src/controllers/v1/extract-status.ts new file mode 100644 index 00000000..8ab2d11b --- /dev/null +++ b/apps/api/src/controllers/v1/extract-status.ts @@ -0,0 +1,53 @@ +import { Response } from "express"; +import { + supabaseGetJobByIdOnlyData, + supabaseGetJobsById, +} from "../../lib/supabase-jobs"; +import { scrapeStatusRateLimiter } from "../../services/rate-limiter"; +import { RequestWithAuth } from "./types"; + +export async function extractStatusController( + req: RequestWithAuth<{ jobId: string }, any, any>, + res: Response, +) { + try { + const rateLimiter = scrapeStatusRateLimiter; + const incomingIP = (req.headers["x-forwarded-for"] || + req.socket.remoteAddress) as string; + const iptoken = incomingIP; + await rateLimiter.consume(iptoken); + + const job = await supabaseGetJobByIdOnlyData(req.params.jobId); + if (!job || job.team_id !== req.auth.team_id) { + return res.status(403).json({ + success: false, + error: "You are not allowed to access this resource.", + }); + } + + const jobData = await supabaseGetJobsById([req.params.jobId]); + if (!jobData || jobData.length === 0) { + return res.status(404).json({ + success: false, + error: "Job not found", + }); + } + + return res.status(200).json({ + success: true, + data: jobData[0].docs, + }); + } catch (error) { + if (error instanceof Error && error.message == "Too Many Requests") { + return res.status(429).json({ + success: false, + error: "Rate limit exceeded. Please try again later.", + }); + } else { + return res.status(500).json({ + success: false, + error: "An unexpected error occurred.", + }); + } + } +} diff --git a/apps/api/src/controllers/v1/extract.ts b/apps/api/src/controllers/v1/extract.ts index af63f446..2838779b 100644 --- a/apps/api/src/controllers/v1/extract.ts +++ b/apps/api/src/controllers/v1/extract.ts @@ -5,7 +5,9 @@ import { extractRequestSchema, ExtractResponse, } from "./types"; -import { performExtraction } from "../../lib/extract/extraction-service"; +import { getExtractQueue } from "../../services/queue-service"; +import * as Sentry from "@sentry/node"; +import { v4 as uuidv4 } from "uuid"; /** * Extracts data from the provided URLs based on the request parameters. @@ -29,12 +31,47 @@ export async function extractController( }); } - const result = await performExtraction({ + const extractId = crypto.randomUUID(); + const jobData = { request: req.body, teamId: req.auth.team_id, plan: req.auth.plan, - subId: req.acuc?.sub_id || undefined, - }); + subId: req.acuc?.sub_id, + extractId, + }; - return res.status(result.success ? 200 : 400).json(result); + if (Sentry.isInitialized()) { + const size = JSON.stringify(jobData).length; + await Sentry.startSpan( + { + name: "Add extract job", + op: "queue.publish", + attributes: { + "messaging.message.id": extractId, + "messaging.destination.name": getExtractQueue().name, + "messaging.message.body.size": size, + }, + }, + async (span) => { + await getExtractQueue().add(extractId, { + ...jobData, + sentry: { + trace: Sentry.spanToTraceHeader(span), + baggage: Sentry.spanToBaggageHeader(span), + size, + }, + }); + }, + ); + } else { + await getExtractQueue().add(extractId, jobData, { + jobId: extractId, + }); + } + + return res.status(202).json({ + success: true, + id: extractId, + urlTrace: [], + }); } diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index ccb11586..8727f706 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -478,10 +478,11 @@ export interface URLTrace { export interface ExtractResponse { success: boolean; + error?: string; data?: any; scrape_id?: string; + id?: string; warning?: string; - error?: string; urlTrace?: URLTrace[]; } diff --git a/apps/api/src/lib/extract/extraction-service.ts b/apps/api/src/lib/extract/extraction-service.ts index 2791df3c..6a98ef94 100644 --- a/apps/api/src/lib/extract/extraction-service.ts +++ b/apps/api/src/lib/extract/extraction-service.ts @@ -20,7 +20,7 @@ interface ExtractServiceOptions { interface ExtractResult { success: boolean; data?: any; - scrapeId: string; + extractId: string; warning?: string; urlTrace?: URLTrace[]; error?: string; @@ -38,9 +38,8 @@ function getRootDomain(url: string): string { } } -export async function performExtraction(options: ExtractServiceOptions): Promise { +export async function performExtraction(extractId, options: ExtractServiceOptions): Promise { const { request, teamId, plan, subId } = options; - const scrapeId = crypto.randomUUID(); const urlTraces: URLTrace[] = []; let docs: Document[] = []; @@ -65,7 +64,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise return { success: false, error: "No valid URLs found to scrape. Try adjusting your search criteria or including more URLs.", - scrapeId, + extractId, urlTrace: urlTraces, }; } @@ -89,7 +88,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise return { success: false, error: error.message, - scrapeId, + extractId, urlTrace: urlTraces, }; } @@ -191,7 +190,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise // Log job logJob({ - job_id: scrapeId, + job_id: extractId, success: true, message: "Extract completed", num_docs: 1, @@ -208,7 +207,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise return { success: true, data: completions.extract ?? {}, - scrapeId, + extractId, warning: completions.warning, urlTrace: request.urlTrace ? urlTraces : undefined, }; diff --git a/apps/api/src/routes/v1.ts b/apps/api/src/routes/v1.ts index b6ab2ee8..bc500325 100644 --- a/apps/api/src/routes/v1.ts +++ b/apps/api/src/routes/v1.ts @@ -24,13 +24,7 @@ import { scrapeStatusController } from "../controllers/v1/scrape-status"; import { concurrencyCheckController } from "../controllers/v1/concurrency-check"; import { batchScrapeController } from "../controllers/v1/batch-scrape"; import { extractController } from "../controllers/v1/extract"; -// import { crawlPreviewController } from "../../src/controllers/v1/crawlPreview"; -// import { crawlJobStatusPreviewController } from "../../src/controllers/v1/status"; -// import { searchController } from "../../src/controllers/v1/search"; -// import { crawlCancelController } from "../../src/controllers/v1/crawl-cancel"; -// import { keyAuthController } from "../../src/controllers/v1/keyAuth"; -// import { livenessController } from "../controllers/v1/liveness"; -// import { readinessController } from "../controllers/v1/readiness"; +import { extractStatusController } from "../controllers/v1/extract-status"; import { creditUsageController } from "../controllers/v1/credit-usage"; import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings"; import { searchController } from "../controllers/v1/search"; @@ -215,6 +209,12 @@ v1Router.post( wrap(extractController), ); +v1Router.get( + "/extract/:jobId", + authMiddleware(RateLimiterMode.CrawlStatus), + wrap(extractStatusController), +); + // v1Router.post("/crawlWebsitePreview", crawlPreviewController); v1Router.delete( diff --git a/apps/api/src/services/queue-service.ts b/apps/api/src/services/queue-service.ts index 3cfd8c91..d3d8a4e5 100644 --- a/apps/api/src/services/queue-service.ts +++ b/apps/api/src/services/queue-service.ts @@ -3,12 +3,16 @@ import { logger } from "../lib/logger"; import IORedis from "ioredis"; let scrapeQueue: Queue; +let extractQueue: Queue; +let loggingQueue: Queue; export const redisConnection = new IORedis(process.env.REDIS_URL!, { maxRetriesPerRequest: null, }); export const scrapeQueueName = "{scrapeQueue}"; +export const extractQueueName = "{extractQueue}"; +export const loggingQueueName = "{loggingQueue}"; export function getScrapeQueue() { if (!scrapeQueue) { @@ -24,24 +28,35 @@ export function getScrapeQueue() { age: 90000, // 25 hours }, }, - }, - // { - // settings: { - // lockDuration: 1 * 60 * 1000, // 1 minute in milliseconds, - // lockRenewTime: 15 * 1000, // 15 seconds in milliseconds - // stalledInterval: 30 * 1000, - // maxStalledCount: 10, - // }, - // defaultJobOptions:{ - // attempts: 5 - // } - // } + } ); logger.info("Web scraper queue created"); } return scrapeQueue; } +export function getExtractQueue() { + if (!extractQueue) { + extractQueue = new Queue( + extractQueueName, + { + connection: redisConnection, + defaultJobOptions: { + removeOnComplete: { + age: 90000, // 25 hours + }, + removeOnFail: { + age: 90000, // 25 hours + }, + }, + } + ); + logger.info("Extraction queue created"); + } + return extractQueue; +} + + // === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE // import { QueueEvents } from 'bullmq'; // export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() }); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index f6a033cb..c235a87b 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -4,8 +4,10 @@ import * as Sentry from "@sentry/node"; import { CustomError } from "../lib/custom-error"; import { getScrapeQueue, + getExtractQueue, redisConnection, scrapeQueueName, + extractQueueName, } from "./queue-service"; import { startWebScraperPipeline } from "../main/runWebScraper"; import { callWebhook } from "./webhook"; @@ -50,6 +52,7 @@ import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings"; import { indexPage } from "../lib/extract/index/pinecone"; import { Document } from "../controllers/v1/types"; +import { performExtraction } from "../lib/extract/extraction-service"; configDotenv(); @@ -243,6 +246,52 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => { return err; }; +const processExtractJobInternal = async (token: string, job: Job & { id: string }) => { + const logger = _logger.child({ + module: "extract-worker", + method: "processJobInternal", + jobId: job.id, + extractId: job.data.extractId, + teamId: job.data?.teamId ?? undefined, + }); + + const extendLockInterval = setInterval(async () => { + logger.info(`🔄 Worker extending lock on job ${job.id}`); + await job.extendLock(token, jobLockExtensionTime); + }, jobLockExtendInterval); + + try { + const result = await performExtraction(job.data.extractId, { + request: job.data.request, + teamId: job.data.teamId, + plan: job.data.plan, + subId: job.data.subId, + }); + + if (result.success) { + // Move job to completed state in Redis + await job.moveToCompleted(result, token, false); + return result; + } else { + throw new Error(result.error || "Unknown error during extraction"); + } + } catch (error) { + logger.error(`🚫 Job errored ${job.id} - ${error}`, { error }); + + Sentry.captureException(error, { + data: { + job: job.id, + }, + }); + + // Move job to failed state in Redis + await job.moveToFailed(error, token, false); + throw error; + } finally { + clearInterval(extendLockInterval); + } +}; + let isShuttingDown = false; process.on("SIGINT", () => { @@ -399,7 +448,9 @@ const workerFun = async ( } }; +// Start both workers workerFun(getScrapeQueue(), processJobInternal); +workerFun(getExtractQueue(), processExtractJobInternal); async function processKickoffJob(job: Job & { id: string }, token: string) { const logger = _logger.child({ From 86e34d7c6cf68bd1c66bf5ae5814ca2adbbd390b Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 7 Jan 2025 12:13:12 -0300 Subject: [PATCH 02/10] Nick: wip --- apps/api/src/controllers/v1/extract-status.ts | 54 ++++--------------- apps/api/src/services/queue-worker.ts | 2 +- 2 files changed, 12 insertions(+), 44 deletions(-) diff --git a/apps/api/src/controllers/v1/extract-status.ts b/apps/api/src/controllers/v1/extract-status.ts index 8ab2d11b..feba30ec 100644 --- a/apps/api/src/controllers/v1/extract-status.ts +++ b/apps/api/src/controllers/v1/extract-status.ts @@ -1,53 +1,21 @@ import { Response } from "express"; -import { - supabaseGetJobByIdOnlyData, - supabaseGetJobsById, -} from "../../lib/supabase-jobs"; -import { scrapeStatusRateLimiter } from "../../services/rate-limiter"; +import { supabaseGetJobsById } from "../../lib/supabase-jobs"; import { RequestWithAuth } from "./types"; export async function extractStatusController( req: RequestWithAuth<{ jobId: string }, any, any>, res: Response, ) { - try { - const rateLimiter = scrapeStatusRateLimiter; - const incomingIP = (req.headers["x-forwarded-for"] || - req.socket.remoteAddress) as string; - const iptoken = incomingIP; - await rateLimiter.consume(iptoken); - - const job = await supabaseGetJobByIdOnlyData(req.params.jobId); - if (!job || job.team_id !== req.auth.team_id) { - return res.status(403).json({ - success: false, - error: "You are not allowed to access this resource.", - }); - } - - const jobData = await supabaseGetJobsById([req.params.jobId]); - if (!jobData || jobData.length === 0) { - return res.status(404).json({ - success: false, - error: "Job not found", - }); - } - - return res.status(200).json({ - success: true, - data: jobData[0].docs, + const jobData = await supabaseGetJobsById([req.params.jobId]); + if (!jobData || jobData.length === 0) { + return res.status(404).json({ + success: false, + error: "Job not found", }); - } catch (error) { - if (error instanceof Error && error.message == "Too Many Requests") { - return res.status(429).json({ - success: false, - error: "Rate limit exceeded. Please try again later.", - }); - } else { - return res.status(500).json({ - success: false, - error: "An unexpected error occurred.", - }); - } } + + return res.status(200).json({ + success: true, + data: jobData[0].docs, + }); } diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 422a8160..a8a2e408 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -351,7 +351,7 @@ const processExtractJobInternal = async (token: string, job: Job & { id: string // Move job to failed state in Redis await job.moveToFailed(error, token, false); - throw error; + // throw error; } finally { clearInterval(extendLockInterval); } From eb254547e5c50410f2d525b7ed1888dc45147865 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 7 Jan 2025 16:16:01 -0300 Subject: [PATCH 03/10] Nick: --- apps/api/src/controllers/v1/extract-status.ts | 26 +++++++++++-- apps/api/src/controllers/v1/extract.ts | 18 ++++----- apps/api/src/lib/extract/extract-redis.ts | 37 +++++++++++++++++++ .../api/src/lib/extract/extraction-service.ts | 9 +++++ apps/api/src/services/queue-worker.ts | 1 + 5 files changed, 78 insertions(+), 13 deletions(-) create mode 100644 apps/api/src/lib/extract/extract-redis.ts diff --git a/apps/api/src/controllers/v1/extract-status.ts b/apps/api/src/controllers/v1/extract-status.ts index feba30ec..a42f17fa 100644 --- a/apps/api/src/controllers/v1/extract-status.ts +++ b/apps/api/src/controllers/v1/extract-status.ts @@ -1,21 +1,39 @@ import { Response } from "express"; import { supabaseGetJobsById } from "../../lib/supabase-jobs"; import { RequestWithAuth } from "./types"; +import { getExtract, getExtractExpiry } from "../../lib/extract/extract-redis"; export async function extractStatusController( req: RequestWithAuth<{ jobId: string }, any, any>, res: Response, ) { - const jobData = await supabaseGetJobsById([req.params.jobId]); - if (!jobData || jobData.length === 0) { + const extract = await getExtract(req.params.jobId); + + if (!extract) { return res.status(404).json({ success: false, - error: "Job not found", + error: "Extract job not found", }); } + let data: any[] = []; + + if (extract.status === "completed") { + const jobData = await supabaseGetJobsById([req.params.jobId]); + if (!jobData || jobData.length === 0) { + return res.status(404).json({ + success: false, + error: "Job not found", + }); + } + + data = jobData[0].docs; + } + return res.status(200).json({ success: true, - data: jobData[0].docs, + data: data, + status: extract.status, + expiresAt: (await getExtractExpiry(req.params.jobId)).toISOString(), }); } diff --git a/apps/api/src/controllers/v1/extract.ts b/apps/api/src/controllers/v1/extract.ts index 2838779b..6668d33b 100644 --- a/apps/api/src/controllers/v1/extract.ts +++ b/apps/api/src/controllers/v1/extract.ts @@ -7,7 +7,7 @@ import { } from "./types"; import { getExtractQueue } from "../../services/queue-service"; import * as Sentry from "@sentry/node"; -import { v4 as uuidv4 } from "uuid"; +import { saveExtract } from "../../lib/extract/extract-redis"; /** * Extracts data from the provided URLs based on the request parameters. @@ -23,14 +23,6 @@ export async function extractController( const selfHosted = process.env.USE_DB_AUTHENTICATION !== "true"; req.body = extractRequestSchema.parse(req.body); - if (!req.auth.plan) { - return res.status(400).json({ - success: false, - error: "No plan specified", - urlTrace: [], - }); - } - const extractId = crypto.randomUUID(); const jobData = { request: req.body, @@ -40,6 +32,14 @@ export async function extractController( extractId, }; + await saveExtract(extractId, { + id: extractId, + team_id: req.auth.team_id, + plan: req.auth.plan, + createdAt: Date.now(), + status: "processing", + }); + if (Sentry.isInitialized()) { const size = JSON.stringify(jobData).length; await Sentry.startSpan( diff --git a/apps/api/src/lib/extract/extract-redis.ts b/apps/api/src/lib/extract/extract-redis.ts new file mode 100644 index 00000000..2d9410f9 --- /dev/null +++ b/apps/api/src/lib/extract/extract-redis.ts @@ -0,0 +1,37 @@ +import { redisConnection } from "../../services/queue-service"; +import { logger as _logger } from "../logger"; + +export type StoredExtract = { + id: string; + team_id: string; + plan?: string; + createdAt: number; + status: "processing" | "completed" | "failed" | "cancelled"; +}; + +export async function saveExtract(id: string, extract: StoredExtract) { + _logger.debug("Saving extract " + id + " to Redis..."); + await redisConnection.set("extract:" + id, JSON.stringify(extract)); + await redisConnection.expire("extract:" + id, 24 * 60 * 60, "NX"); +} + +export async function getExtract(id: string): Promise { + const x = await redisConnection.get("extract:" + id); + return x ? JSON.parse(x) : null; +} + +export async function updateExtract(id: string, extract: Partial) { + const current = await getExtract(id); + if (!current) return; + await redisConnection.set("extract:" + id, JSON.stringify({ ...current, ...extract })); + await redisConnection.expire("extract:" + id, 24 * 60 * 60, "NX"); +} + + +export async function getExtractExpiry(id: string): Promise { + const d = new Date(); + const ttl = await redisConnection.pttl("extract:" + id); + d.setMilliseconds(d.getMilliseconds() + ttl); + d.setMilliseconds(0); + return d; +} diff --git a/apps/api/src/lib/extract/extraction-service.ts b/apps/api/src/lib/extract/extraction-service.ts index 6a98ef94..e4893bc9 100644 --- a/apps/api/src/lib/extract/extraction-service.ts +++ b/apps/api/src/lib/extract/extraction-service.ts @@ -9,6 +9,7 @@ import { billTeam } from "../../services/billing/credit_billing"; import { logJob } from "../../services/logging/log_job"; import { _addScrapeJobToBullMQ } from "../../services/queue-jobs"; import { saveCrawl, StoredCrawl } from "../crawl-redis"; +import { updateExtract } from "./extract-redis"; interface ExtractServiceOptions { request: ExtractRequest; @@ -202,8 +203,16 @@ export async function performExtraction(extractId, options: ExtractServiceOption scrapeOptions: request, origin: request.origin ?? "api", num_tokens: completions.numTokens ?? 0, + }).then(() => { + updateExtract(extractId, { + status: "completed", + }).catch((error) => { + logger.error(`Failed to update extract ${extractId} status to completed: ${error}`); + }); }); + + return { success: true, data: completions.extract ?? {}, diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index a8a2e408..96fad2d2 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -353,6 +353,7 @@ const processExtractJobInternal = async (token: string, job: Job & { id: string await job.moveToFailed(error, token, false); // throw error; } finally { + clearInterval(extendLockInterval); } }; From 1f2a76fc2396078ec271768052b87198902caf34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Tue, 7 Jan 2025 20:18:10 +0100 Subject: [PATCH 04/10] Update apps/api/src/lib/extract/extraction-service.ts --- apps/api/src/lib/extract/extraction-service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/api/src/lib/extract/extraction-service.ts b/apps/api/src/lib/extract/extraction-service.ts index e4893bc9..9793540b 100644 --- a/apps/api/src/lib/extract/extraction-service.ts +++ b/apps/api/src/lib/extract/extraction-service.ts @@ -39,7 +39,7 @@ function getRootDomain(url: string): string { } } -export async function performExtraction(extractId, options: ExtractServiceOptions): Promise { +export async function performExtraction(extractId: string, options: ExtractServiceOptions): Promise { const { request, teamId, plan, subId } = options; const urlTraces: URLTrace[] = []; let docs: Document[] = []; From 11af214db163d4cbf1d7879696422a5ac7151777 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 7 Jan 2025 16:21:51 -0300 Subject: [PATCH 05/10] Nick: update extract in case there is an error --- apps/api/src/controllers/v1/extract-status.ts | 1 + apps/api/src/lib/extract/extract-redis.ts | 1 + apps/api/src/services/queue-worker.ts | 6 ++++++ 3 files changed, 8 insertions(+) diff --git a/apps/api/src/controllers/v1/extract-status.ts b/apps/api/src/controllers/v1/extract-status.ts index a42f17fa..ce58a342 100644 --- a/apps/api/src/controllers/v1/extract-status.ts +++ b/apps/api/src/controllers/v1/extract-status.ts @@ -34,6 +34,7 @@ export async function extractStatusController( success: true, data: data, status: extract.status, + error: extract?.error ?? undefined, expiresAt: (await getExtractExpiry(req.params.jobId)).toISOString(), }); } diff --git a/apps/api/src/lib/extract/extract-redis.ts b/apps/api/src/lib/extract/extract-redis.ts index 2d9410f9..f4ed0369 100644 --- a/apps/api/src/lib/extract/extract-redis.ts +++ b/apps/api/src/lib/extract/extract-redis.ts @@ -7,6 +7,7 @@ export type StoredExtract = { plan?: string; createdAt: number; status: "processing" | "completed" | "failed" | "cancelled"; + error?: any; }; export async function saveExtract(id: string, extract: StoredExtract) { diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 96fad2d2..180bef10 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -55,6 +55,7 @@ import { Document } from "../controllers/v1/types"; import { performExtraction } from "../lib/extract/extraction-service"; import { supabase_service } from "../services/supabase"; import { normalizeUrl, normalizeUrlOnlyHostname } from "../lib/canonical-url"; +import { saveExtract, updateExtract } from "../lib/extract/extract-redis"; configDotenv(); @@ -351,6 +352,11 @@ const processExtractJobInternal = async (token: string, job: Job & { id: string // Move job to failed state in Redis await job.moveToFailed(error, token, false); + + await updateExtract(job.data.extractId, { + status: "failed", + error: error.error ?? error ?? "Unknown error, please contact help@firecrawl.dev. Extract id: " + job.data.extractId, + }); // throw error; } finally { From 9fdcfb931460d63e6dfef1cb91d60db212585205 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 7 Jan 2025 16:24:46 -0300 Subject: [PATCH 06/10] Update index.ts --- apps/api/src/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 20214d72..cc2d41b8 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -4,7 +4,7 @@ import * as Sentry from "@sentry/node"; import express, { NextFunction, Request, Response } from "express"; import bodyParser from "body-parser"; import cors from "cors"; -import { getScrapeQueue } from "./services/queue-service"; +import { getExtractQueue, getScrapeQueue } from "./services/queue-service"; import { v0Router } from "./routes/v0"; import os from "os"; import { logger } from "./lib/logger"; @@ -45,7 +45,7 @@ const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`); const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({ - queues: [new BullAdapter(getScrapeQueue())], + queues: [new BullAdapter(getScrapeQueue()), new BullAdapter(getExtractQueue())], serverAdapter: serverAdapter, }); From dd14744850b638fda8e64673f6225a4d147a8b3f Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 7 Jan 2025 16:55:55 -0300 Subject: [PATCH 07/10] Update types.ts --- apps/api/src/controllers/v1/types.ts | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index 8727f706..1c9112c5 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -186,6 +186,10 @@ export const scrapeOptions = z export type ScrapeOptions = z.infer; +import Ajv from "ajv"; + +const ajv = new Ajv(); + export const extractV1Options = z .object({ urls: url @@ -193,7 +197,20 @@ export const extractV1Options = z .max(10, "Maximum of 10 URLs allowed per request while in beta."), prompt: z.string().optional(), systemPrompt: z.string().optional(), - schema: z.any().optional(), + schema: z + .any() + .optional() + .refine((val) => { + if (!val) return true; // Allow undefined schema + try { + const validate = ajv.compile(val); + return typeof validate === "function"; + } catch (e) { + return false; + } + }, { + message: "Invalid JSON schema.", + }), limit: z.number().int().positive().finite().safe().optional(), ignoreSitemap: z.boolean().default(false), includeSubdomains: z.boolean().default(true), From 9ec08d70204db87f9a222786bc9cd4a4873045ca Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 7 Jan 2025 17:20:49 -0300 Subject: [PATCH 08/10] Nick: fixed the sdks --- apps/api/src/controllers/v1/extract.ts | 2 +- apps/js-sdk/firecrawl/src/index.ts | 36 ++++++++++++++++++-------- apps/python-sdk/firecrawl/firecrawl.py | 25 +++++++++++++++++- 3 files changed, 50 insertions(+), 13 deletions(-) diff --git a/apps/api/src/controllers/v1/extract.ts b/apps/api/src/controllers/v1/extract.ts index 6668d33b..061f5c9c 100644 --- a/apps/api/src/controllers/v1/extract.ts +++ b/apps/api/src/controllers/v1/extract.ts @@ -69,7 +69,7 @@ export async function extractController( }); } - return res.status(202).json({ + return res.status(200).json({ success: true, id: extractId, urlTrace: [], diff --git a/apps/js-sdk/firecrawl/src/index.ts b/apps/js-sdk/firecrawl/src/index.ts index 474eea83..644be7f6 100644 --- a/apps/js-sdk/firecrawl/src/index.ts +++ b/apps/js-sdk/firecrawl/src/index.ts @@ -887,18 +887,32 @@ export default class FirecrawlApp { { ...jsonData, schema: jsonSchema }, headers ); + if (response.status === 200) { - const responseData = response.data as ExtractResponse; - if (responseData.success) { - return { - success: true, - data: responseData.data, - warning: responseData.warning, - error: responseData.error - }; - } else { - throw new FirecrawlError(`Failed to scrape URL. Error: ${responseData.error}`, response.status); - } + const jobId = response.data.id; + let extractStatus; + do { + const statusResponse: AxiosResponse = await this.getRequest( + `${this.apiUrl}/v1/extract/${jobId}`, + headers + ); + extractStatus = statusResponse.data; + if (extractStatus.status === "completed") { + if (extractStatus.success) { + return { + success: true, + data: extractStatus.data, + warning: extractStatus.warning, + error: extractStatus.error + }; + } else { + throw new FirecrawlError(`Failed to extract data. Error: ${extractStatus.error}`, statusResponse.status); + } + } else if (extractStatus.status === "failed" || extractStatus.status === "cancelled") { + throw new FirecrawlError(`Extract job ${extractStatus.status}. Error: ${extractStatus.error}`, statusResponse.status); + } + await new Promise(resolve => setTimeout(resolve, 1000)); // Polling interval + } while (extractStatus.status !== "completed"); } else { this.handleError(response, "extract"); } diff --git a/apps/python-sdk/firecrawl/firecrawl.py b/apps/python-sdk/firecrawl/firecrawl.py index d3216405..e844b733 100644 --- a/apps/python-sdk/firecrawl/firecrawl.py +++ b/apps/python-sdk/firecrawl/firecrawl.py @@ -542,6 +542,7 @@ class FirecrawlApp: } try: + # Send the initial extract request response = self._post_request( f'{self.api_url}/v1/extract', request_data, @@ -550,7 +551,29 @@ class FirecrawlApp: if response.status_code == 200: data = response.json() if data['success']: - return data + job_id = data.get('id') + if not job_id: + raise Exception('Job ID not returned from extract request.') + + # Poll for the extract status + while True: + status_response = self._get_request( + f'{self.api_url}/v1/extract/{job_id}', + headers + ) + if status_response.status_code == 200: + status_data = status_response.json() + if status_data['status'] == 'completed': + if status_data['success']: + return status_data + else: + raise Exception(f'Failed to extract. Error: {status_data["error"]}') + elif status_data['status'] in ['failed', 'cancelled']: + raise Exception(f'Extract job {status_data["status"]}. Error: {status_data["error"]}') + else: + self._handle_error(status_response, "extract-status") + + time.sleep(2) # Polling interval else: raise Exception(f'Failed to extract. Error: {data["error"]}') else: From a185c05a5c1fd0ab4de7eb570be1a357cfbee116 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 7 Jan 2025 17:27:40 -0300 Subject: [PATCH 09/10] Nick: sdk async and get status --- apps/js-sdk/firecrawl/src/index.ts | 66 ++++++++++++++++++++++++++ apps/python-sdk/firecrawl/firecrawl.py | 63 ++++++++++++++++++++++++ 2 files changed, 129 insertions(+) diff --git a/apps/js-sdk/firecrawl/src/index.ts b/apps/js-sdk/firecrawl/src/index.ts index 644be7f6..bb9fcf6a 100644 --- a/apps/js-sdk/firecrawl/src/index.ts +++ b/apps/js-sdk/firecrawl/src/index.ts @@ -922,6 +922,72 @@ export default class FirecrawlApp { return { success: false, error: "Internal server error." }; } + /** + * Initiates an asynchronous extract job for a URL using the Firecrawl API. + * @param url - The URL to extract data from. + * @param params - Additional parameters for the extract request. + * @param idempotencyKey - Optional idempotency key for the request. + * @returns The response from the extract operation. + */ + async asyncExtract( + url: string, + params?: ExtractParams, + idempotencyKey?: string + ): Promise { + const headers = this.prepareHeaders(idempotencyKey); + let jsonData: any = { url, ...params }; + let jsonSchema: any; + + try { + if (params?.schema instanceof zt.ZodType) { + jsonSchema = zodToJsonSchema(params.schema); + } else { + jsonSchema = params?.schema; + } + } catch (error: any) { + throw new FirecrawlError("Invalid schema. Schema must be either a valid Zod schema or JSON schema object.", 400); + } + + try { + const response: AxiosResponse = await this.postRequest( + this.apiUrl + `/v1/extract`, + { ...jsonData, schema: jsonSchema }, + headers + ); + + if (response.status === 200) { + return response.data; + } else { + this.handleError(response, "start extract job"); + } + } catch (error: any) { + throw new FirecrawlError(error.message, 500); + } + return { success: false, error: "Internal server error." }; + } + + /** + * Retrieves the status of an extract job. + * @param jobId - The ID of the extract job. + * @returns The status of the extract job. + */ + async getExtractStatus(jobId: string): Promise { + try { + const response: AxiosResponse = await this.getRequest( + `${this.apiUrl}/v1/extract/${jobId}`, + this.prepareHeaders() + ); + + if (response.status === 200) { + return response.data; + } else { + this.handleError(response, "get extract status"); + } + } catch (error: any) { + throw new FirecrawlError(error.message, 500); + } + } + /** * Prepares the headers for an API request. * @param idempotencyKey - Optional key to ensure idempotency. diff --git a/apps/python-sdk/firecrawl/firecrawl.py b/apps/python-sdk/firecrawl/firecrawl.py index e844b733..3fff1c4e 100644 --- a/apps/python-sdk/firecrawl/firecrawl.py +++ b/apps/python-sdk/firecrawl/firecrawl.py @@ -582,6 +582,69 @@ class FirecrawlApp: raise ValueError(str(e), 500) return {'success': False, 'error': "Internal server error."} + + def get_extract_status(self, job_id: str) -> Dict[str, Any]: + """ + Retrieve the status of an extract job. + + Args: + job_id (str): The ID of the extract job. + + Returns: + Dict[str, Any]: The status of the extract job. + + Raises: + ValueError: If there is an error retrieving the status. + """ + headers = self._prepare_headers() + try: + response = self._get_request(f'{self.api_url}/v1/extract/{job_id}', headers) + if response.status_code == 200: + return response.json() + else: + self._handle_error(response, "get extract status") + except Exception as e: + raise ValueError(str(e), 500) + + def async_extract(self, urls: List[str], params: Optional[Dict[str, Any]] = None, idempotency_key: Optional[str] = None) -> Dict[str, Any]: + """ + Initiate an asynchronous extract job. + + Args: + urls (List[str]): The URLs to extract data from. + params (Optional[Dict[str, Any]]): Additional parameters for the extract request. + idempotency_key (Optional[str]): A unique key to ensure idempotency of requests. + + Returns: + Dict[str, Any]: The response from the extract operation. + + Raises: + ValueError: If there is an error initiating the extract job. + """ + headers = self._prepare_headers(idempotency_key) + + schema = params.get('schema') if params else None + if schema: + if hasattr(schema, 'model_json_schema'): + # Convert Pydantic model to JSON schema + schema = schema.model_json_schema() + # Otherwise assume it's already a JSON schema dict + + jsonData = {'urls': urls, **(params or {})} + request_data = { + **jsonData, + 'allowExternalLinks': params.get('allow_external_links', False) if params else False, + 'schema': schema + } + + try: + response = self._post_request(f'{self.api_url}/v1/extract', request_data, headers) + if response.status_code == 200: + return response.json() + else: + self._handle_error(response, "async extract") + except Exception as e: + raise ValueError(str(e), 500) def _prepare_headers(self, idempotency_key: Optional[str] = None) -> Dict[str, str]: """ From b98e289f0358df126cfcf2da6700a3bf109ec483 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 7 Jan 2025 17:49:21 -0300 Subject: [PATCH 10/10] Nick: --- apps/api/src/controllers/v1/extract.ts | 25 ++++++++++++++++++++++++ apps/api/src/lib/extract/team-id-sync.ts | 19 ++++++++++++++++++ apps/js-sdk/firecrawl/src/index.ts | 2 +- apps/python-sdk/firecrawl/firecrawl.py | 3 ++- 4 files changed, 47 insertions(+), 2 deletions(-) create mode 100644 apps/api/src/lib/extract/team-id-sync.ts diff --git a/apps/api/src/controllers/v1/extract.ts b/apps/api/src/controllers/v1/extract.ts index 061f5c9c..ab69ca93 100644 --- a/apps/api/src/controllers/v1/extract.ts +++ b/apps/api/src/controllers/v1/extract.ts @@ -8,7 +8,28 @@ import { import { getExtractQueue } from "../../services/queue-service"; import * as Sentry from "@sentry/node"; import { saveExtract } from "../../lib/extract/extract-redis"; +import { getTeamIdSyncB } from "../../lib/extract/team-id-sync"; +import { performExtraction } from "../../lib/extract/extraction-service"; +export async function oldExtract(req: RequestWithAuth<{}, ExtractResponse, ExtractRequest>, res: Response, extractId: string){ + // Means that are in the non-queue system + // TODO: Remove this once all teams have transitioned to the new system + try { + const result = await performExtraction(extractId, { + request: req.body, + teamId: req.auth.team_id, + plan: req.auth.plan ?? "free", + subId: req.acuc?.sub_id ?? undefined, + }); + + return res.status(200).json(result); + } catch (error) { + return res.status(500).json({ + success: false, + error: "Internal server error", + }); + } + } /** * Extracts data from the provided URLs based on the request parameters. * Currently in beta. @@ -32,6 +53,10 @@ export async function extractController( extractId, }; + if(await getTeamIdSyncB(req.auth.team_id) && req.body.origin !== "api-sdk") { + return await oldExtract(req, res, extractId); + } + await saveExtract(extractId, { id: extractId, team_id: req.auth.team_id, diff --git a/apps/api/src/lib/extract/team-id-sync.ts b/apps/api/src/lib/extract/team-id-sync.ts new file mode 100644 index 00000000..8cf21a14 --- /dev/null +++ b/apps/api/src/lib/extract/team-id-sync.ts @@ -0,0 +1,19 @@ +import { supabase_service } from "../../services/supabase"; +import { logger } from "../logger"; + +export async function getTeamIdSyncB(teamId: string) { + try { + const { data, error } = await supabase_service + .from("eb-sync") + .select("team_id") + .eq("team_id", teamId) + .limit(1); + if (error) { + throw new Error("Error getting team id (sync b)"); + } + return data[0] ?? null; + } catch (error) { + logger.error("Error getting team id (sync b)", error); + return null; + } +} diff --git a/apps/js-sdk/firecrawl/src/index.ts b/apps/js-sdk/firecrawl/src/index.ts index bb9fcf6a..e7e8b65b 100644 --- a/apps/js-sdk/firecrawl/src/index.ts +++ b/apps/js-sdk/firecrawl/src/index.ts @@ -884,7 +884,7 @@ export default class FirecrawlApp { try { const response: AxiosResponse = await this.postRequest( this.apiUrl + `/v1/extract`, - { ...jsonData, schema: jsonSchema }, + { ...jsonData, schema: jsonSchema, origin: "api-sdk" }, headers ); diff --git a/apps/python-sdk/firecrawl/firecrawl.py b/apps/python-sdk/firecrawl/firecrawl.py index 3fff1c4e..41f8badf 100644 --- a/apps/python-sdk/firecrawl/firecrawl.py +++ b/apps/python-sdk/firecrawl/firecrawl.py @@ -538,7 +538,8 @@ class FirecrawlApp: request_data = { **jsonData, 'allowExternalLinks': params.get('allow_external_links', False), - 'schema': schema + 'schema': schema, + 'origin': 'api-sdk' } try: