From 27457ed5db3700bc0b042b6fd7f21a35b26cbabb Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 3 Jan 2025 20:44:27 -0300 Subject: [PATCH] Nick: init --- apps/api/src/controllers/v1/extract-status.ts | 53 +++++++++++++++++++ apps/api/src/controllers/v1/extract.ts | 47 ++++++++++++++-- apps/api/src/controllers/v1/types.ts | 3 +- .../api/src/lib/extract/extraction-service.ts | 13 +++-- apps/api/src/routes/v1.ts | 14 ++--- apps/api/src/services/queue-service.ts | 39 +++++++++----- apps/api/src/services/queue-worker.ts | 51 ++++++++++++++++++ 7 files changed, 188 insertions(+), 32 deletions(-) create mode 100644 apps/api/src/controllers/v1/extract-status.ts diff --git a/apps/api/src/controllers/v1/extract-status.ts b/apps/api/src/controllers/v1/extract-status.ts new file mode 100644 index 00000000..8ab2d11b --- /dev/null +++ b/apps/api/src/controllers/v1/extract-status.ts @@ -0,0 +1,53 @@ +import { Response } from "express"; +import { + supabaseGetJobByIdOnlyData, + supabaseGetJobsById, +} from "../../lib/supabase-jobs"; +import { scrapeStatusRateLimiter } from "../../services/rate-limiter"; +import { RequestWithAuth } from "./types"; + +export async function extractStatusController( + req: RequestWithAuth<{ jobId: string }, any, any>, + res: Response, +) { + try { + const rateLimiter = scrapeStatusRateLimiter; + const incomingIP = (req.headers["x-forwarded-for"] || + req.socket.remoteAddress) as string; + const iptoken = incomingIP; + await rateLimiter.consume(iptoken); + + const job = await supabaseGetJobByIdOnlyData(req.params.jobId); + if (!job || job.team_id !== req.auth.team_id) { + return res.status(403).json({ + success: false, + error: "You are not allowed to access this resource.", + }); + } + + const jobData = await supabaseGetJobsById([req.params.jobId]); + if (!jobData || jobData.length === 0) { + return res.status(404).json({ + success: false, + error: "Job not found", + }); + } + + return res.status(200).json({ + success: true, + data: jobData[0].docs, + }); + } catch (error) { + if (error instanceof Error && error.message == "Too Many Requests") { + return res.status(429).json({ + success: false, + error: "Rate limit exceeded. Please try again later.", + }); + } else { + return res.status(500).json({ + success: false, + error: "An unexpected error occurred.", + }); + } + } +} diff --git a/apps/api/src/controllers/v1/extract.ts b/apps/api/src/controllers/v1/extract.ts index af63f446..2838779b 100644 --- a/apps/api/src/controllers/v1/extract.ts +++ b/apps/api/src/controllers/v1/extract.ts @@ -5,7 +5,9 @@ import { extractRequestSchema, ExtractResponse, } from "./types"; -import { performExtraction } from "../../lib/extract/extraction-service"; +import { getExtractQueue } from "../../services/queue-service"; +import * as Sentry from "@sentry/node"; +import { v4 as uuidv4 } from "uuid"; /** * Extracts data from the provided URLs based on the request parameters. @@ -29,12 +31,47 @@ export async function extractController( }); } - const result = await performExtraction({ + const extractId = crypto.randomUUID(); + const jobData = { request: req.body, teamId: req.auth.team_id, plan: req.auth.plan, - subId: req.acuc?.sub_id || undefined, - }); + subId: req.acuc?.sub_id, + extractId, + }; - return res.status(result.success ? 200 : 400).json(result); + if (Sentry.isInitialized()) { + const size = JSON.stringify(jobData).length; + await Sentry.startSpan( + { + name: "Add extract job", + op: "queue.publish", + attributes: { + "messaging.message.id": extractId, + "messaging.destination.name": getExtractQueue().name, + "messaging.message.body.size": size, + }, + }, + async (span) => { + await getExtractQueue().add(extractId, { + ...jobData, + sentry: { + trace: Sentry.spanToTraceHeader(span), + baggage: Sentry.spanToBaggageHeader(span), + size, + }, + }); + }, + ); + } else { + await getExtractQueue().add(extractId, jobData, { + jobId: extractId, + }); + } + + return res.status(202).json({ + success: true, + id: extractId, + urlTrace: [], + }); } diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index ccb11586..8727f706 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -478,10 +478,11 @@ export interface URLTrace { export interface ExtractResponse { success: boolean; + error?: string; data?: any; scrape_id?: string; + id?: string; warning?: string; - error?: string; urlTrace?: URLTrace[]; } diff --git a/apps/api/src/lib/extract/extraction-service.ts b/apps/api/src/lib/extract/extraction-service.ts index 2791df3c..6a98ef94 100644 --- a/apps/api/src/lib/extract/extraction-service.ts +++ b/apps/api/src/lib/extract/extraction-service.ts @@ -20,7 +20,7 @@ interface ExtractServiceOptions { interface ExtractResult { success: boolean; data?: any; - scrapeId: string; + extractId: string; warning?: string; urlTrace?: URLTrace[]; error?: string; @@ -38,9 +38,8 @@ function getRootDomain(url: string): string { } } -export async function performExtraction(options: ExtractServiceOptions): Promise { +export async function performExtraction(extractId, options: ExtractServiceOptions): Promise { const { request, teamId, plan, subId } = options; - const scrapeId = crypto.randomUUID(); const urlTraces: URLTrace[] = []; let docs: Document[] = []; @@ -65,7 +64,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise return { success: false, error: "No valid URLs found to scrape. Try adjusting your search criteria or including more URLs.", - scrapeId, + extractId, urlTrace: urlTraces, }; } @@ -89,7 +88,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise return { success: false, error: error.message, - scrapeId, + extractId, urlTrace: urlTraces, }; } @@ -191,7 +190,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise // Log job logJob({ - job_id: scrapeId, + job_id: extractId, success: true, message: "Extract completed", num_docs: 1, @@ -208,7 +207,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise return { success: true, data: completions.extract ?? {}, - scrapeId, + extractId, warning: completions.warning, urlTrace: request.urlTrace ? urlTraces : undefined, }; diff --git a/apps/api/src/routes/v1.ts b/apps/api/src/routes/v1.ts index b6ab2ee8..bc500325 100644 --- a/apps/api/src/routes/v1.ts +++ b/apps/api/src/routes/v1.ts @@ -24,13 +24,7 @@ import { scrapeStatusController } from "../controllers/v1/scrape-status"; import { concurrencyCheckController } from "../controllers/v1/concurrency-check"; import { batchScrapeController } from "../controllers/v1/batch-scrape"; import { extractController } from "../controllers/v1/extract"; -// import { crawlPreviewController } from "../../src/controllers/v1/crawlPreview"; -// import { crawlJobStatusPreviewController } from "../../src/controllers/v1/status"; -// import { searchController } from "../../src/controllers/v1/search"; -// import { crawlCancelController } from "../../src/controllers/v1/crawl-cancel"; -// import { keyAuthController } from "../../src/controllers/v1/keyAuth"; -// import { livenessController } from "../controllers/v1/liveness"; -// import { readinessController } from "../controllers/v1/readiness"; +import { extractStatusController } from "../controllers/v1/extract-status"; import { creditUsageController } from "../controllers/v1/credit-usage"; import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings"; import { searchController } from "../controllers/v1/search"; @@ -215,6 +209,12 @@ v1Router.post( wrap(extractController), ); +v1Router.get( + "/extract/:jobId", + authMiddleware(RateLimiterMode.CrawlStatus), + wrap(extractStatusController), +); + // v1Router.post("/crawlWebsitePreview", crawlPreviewController); v1Router.delete( diff --git a/apps/api/src/services/queue-service.ts b/apps/api/src/services/queue-service.ts index 3cfd8c91..d3d8a4e5 100644 --- a/apps/api/src/services/queue-service.ts +++ b/apps/api/src/services/queue-service.ts @@ -3,12 +3,16 @@ import { logger } from "../lib/logger"; import IORedis from "ioredis"; let scrapeQueue: Queue; +let extractQueue: Queue; +let loggingQueue: Queue; export const redisConnection = new IORedis(process.env.REDIS_URL!, { maxRetriesPerRequest: null, }); export const scrapeQueueName = "{scrapeQueue}"; +export const extractQueueName = "{extractQueue}"; +export const loggingQueueName = "{loggingQueue}"; export function getScrapeQueue() { if (!scrapeQueue) { @@ -24,24 +28,35 @@ export function getScrapeQueue() { age: 90000, // 25 hours }, }, - }, - // { - // settings: { - // lockDuration: 1 * 60 * 1000, // 1 minute in milliseconds, - // lockRenewTime: 15 * 1000, // 15 seconds in milliseconds - // stalledInterval: 30 * 1000, - // maxStalledCount: 10, - // }, - // defaultJobOptions:{ - // attempts: 5 - // } - // } + } ); logger.info("Web scraper queue created"); } return scrapeQueue; } +export function getExtractQueue() { + if (!extractQueue) { + extractQueue = new Queue( + extractQueueName, + { + connection: redisConnection, + defaultJobOptions: { + removeOnComplete: { + age: 90000, // 25 hours + }, + removeOnFail: { + age: 90000, // 25 hours + }, + }, + } + ); + logger.info("Extraction queue created"); + } + return extractQueue; +} + + // === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE // import { QueueEvents } from 'bullmq'; // export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() }); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index f6a033cb..c235a87b 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -4,8 +4,10 @@ import * as Sentry from "@sentry/node"; import { CustomError } from "../lib/custom-error"; import { getScrapeQueue, + getExtractQueue, redisConnection, scrapeQueueName, + extractQueueName, } from "./queue-service"; import { startWebScraperPipeline } from "../main/runWebScraper"; import { callWebhook } from "./webhook"; @@ -50,6 +52,7 @@ import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings"; import { indexPage } from "../lib/extract/index/pinecone"; import { Document } from "../controllers/v1/types"; +import { performExtraction } from "../lib/extract/extraction-service"; configDotenv(); @@ -243,6 +246,52 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => { return err; }; +const processExtractJobInternal = async (token: string, job: Job & { id: string }) => { + const logger = _logger.child({ + module: "extract-worker", + method: "processJobInternal", + jobId: job.id, + extractId: job.data.extractId, + teamId: job.data?.teamId ?? undefined, + }); + + const extendLockInterval = setInterval(async () => { + logger.info(`🔄 Worker extending lock on job ${job.id}`); + await job.extendLock(token, jobLockExtensionTime); + }, jobLockExtendInterval); + + try { + const result = await performExtraction(job.data.extractId, { + request: job.data.request, + teamId: job.data.teamId, + plan: job.data.plan, + subId: job.data.subId, + }); + + if (result.success) { + // Move job to completed state in Redis + await job.moveToCompleted(result, token, false); + return result; + } else { + throw new Error(result.error || "Unknown error during extraction"); + } + } catch (error) { + logger.error(`🚫 Job errored ${job.id} - ${error}`, { error }); + + Sentry.captureException(error, { + data: { + job: job.id, + }, + }); + + // Move job to failed state in Redis + await job.moveToFailed(error, token, false); + throw error; + } finally { + clearInterval(extendLockInterval); + } +}; + let isShuttingDown = false; process.on("SIGINT", () => { @@ -399,7 +448,9 @@ const workerFun = async ( } }; +// Start both workers workerFun(getScrapeQueue(), processJobInternal); +workerFun(getExtractQueue(), processExtractJobInternal); async function processKickoffJob(job: Job & { id: string }, token: string) { const logger = _logger.child({