diff --git a/apps/api/src/controllers/admin/queue.ts b/apps/api/src/controllers/admin/queue.ts index 3f1e9323..095e7ca7 100644 --- a/apps/api/src/controllers/admin/queue.ts +++ b/apps/api/src/controllers/admin/queue.ts @@ -2,7 +2,7 @@ import { Request, Response } from "express"; import { Job } from "bullmq"; import { Logger } from "../../lib/logger"; -import { getWebScraperQueue } from "../../services/queue-service"; +import { getScrapeQueue } from "../../services/queue-service"; import { checkAlerts } from "../../services/alerts"; export async function cleanBefore24hCompleteJobsController( @@ -11,13 +11,13 @@ export async function cleanBefore24hCompleteJobsController( ) { Logger.info("🐂 Cleaning jobs older than 24h"); try { - const webScraperQueue = getWebScraperQueue(); + const scrapeQueue = getScrapeQueue(); const batchSize = 10; const numberOfBatches = 9; // Adjust based on your needs const completedJobsPromises: Promise[] = []; for (let i = 0; i < numberOfBatches; i++) { completedJobsPromises.push( - webScraperQueue.getJobs( + scrapeQueue.getJobs( ["completed"], i * batchSize, i * batchSize + batchSize, @@ -68,10 +68,10 @@ export async function checkQueuesController(req: Request, res: Response) { // Use this as a "health check" that way we dont destroy the server export async function queuesController(req: Request, res: Response) { try { - const webScraperQueue = getWebScraperQueue(); + const scrapeQueue = getScrapeQueue(); const [webScraperActive] = await Promise.all([ - webScraperQueue.getActiveCount(), + scrapeQueue.getActiveCount(), ]); const noActiveJobs = webScraperActive === 0; diff --git a/apps/api/src/controllers/crawl-status.ts b/apps/api/src/controllers/crawl-status.ts index b82a6088..e1528712 100644 --- a/apps/api/src/controllers/crawl-status.ts +++ b/apps/api/src/controllers/crawl-status.ts @@ -15,12 +15,6 @@ export async function crawlStatusController(req: Request, res: Response) { if (!success) { return res.status(status).json({ error }); } - // const job = await getWebScraperQueue().getJob(req.params.jobId); - // if (!job) { - // return res.status(404).json({ error: "Job not found" }); - // } - - // const isCancelled = await (await getWebScraperQueue().client).exists("cancelled:" + req.params.jobId); const sc = await getCrawl(req.params.jobId); if (!sc) { diff --git a/apps/api/src/controllers/crawl.ts b/apps/api/src/controllers/crawl.ts index a6971a22..f31e81f0 100644 --- a/apps/api/src/controllers/crawl.ts +++ b/apps/api/src/controllers/crawl.ts @@ -1,10 +1,8 @@ import { Request, Response } from "express"; -import { WebScraperDataProvider } from "../../src/scraper/WebScraper"; -import { billTeam } from "../../src/services/billing/credit_billing"; import { checkTeamCredits } from "../../src/services/billing/credit_billing"; import { authenticateUser } from "./auth"; import { RateLimiterMode } from "../../src/types"; -import { addScrapeJob, addWebScraperJob } from "../../src/services/queue-jobs"; +import { addScrapeJob } from "../../src/services/queue-jobs"; import { isUrlBlocked } from "../../src/scraper/WebScraper/utils/blocklist"; import { logCrawl } from "../../src/services/logging/crawl_log"; import { validateIdempotencyKey } from "../../src/services/idempotency/validate"; diff --git a/apps/api/src/controllers/crawlPreview.ts b/apps/api/src/controllers/crawlPreview.ts index 7c5c804d..cd762646 100644 --- a/apps/api/src/controllers/crawlPreview.ts +++ b/apps/api/src/controllers/crawlPreview.ts @@ -1,44 +1,124 @@ import { Request, Response } from "express"; import { authenticateUser } from "./auth"; import { RateLimiterMode } from "../../src/types"; -import { addWebScraperJob } from "../../src/services/queue-jobs"; import { isUrlBlocked } from "../../src/scraper/WebScraper/utils/blocklist"; +import { v4 as uuidv4 } from "uuid"; import { Logger } from "../../src/lib/logger"; +import { addCrawlJob, crawlToCrawler, lockURL, saveCrawl, StoredCrawl } from "../../src/lib/crawl-redis"; +import { addScrapeJob } from "../../src/services/queue-jobs"; export async function crawlPreviewController(req: Request, res: Response) { try { - const { success, team_id, error, status } = await authenticateUser( + const { success, error, status } = await authenticateUser( req, res, RateLimiterMode.Preview ); + + const team_id = "preview"; + if (!success) { return res.status(status).json({ error }); } - // authenticate on supabase + const url = req.body.url; if (!url) { return res.status(400).json({ error: "Url is required" }); } if (isUrlBlocked(url)) { - return res.status(403).json({ error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it." }); + return res + .status(403) + .json({ + error: + "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.", + }); } - const mode = req.body.mode ?? "crawl"; const crawlerOptions = req.body.crawlerOptions ?? {}; const pageOptions = req.body.pageOptions ?? { onlyMainContent: false, includeHtml: false, removeTags: [] }; - const job = await addWebScraperJob({ - url: url, - mode: mode ?? "crawl", // fix for single urls not working - crawlerOptions: { ...crawlerOptions, limit: 5, maxCrawledLinks: 5 }, - team_id: "preview", - pageOptions: pageOptions, - origin: "website-preview", - }); + // if (mode === "single_urls" && !url.includes(",")) { // NOTE: do we need this? + // try { + // const a = new WebScraperDataProvider(); + // await a.setOptions({ + // jobId: uuidv4(), + // mode: "single_urls", + // urls: [url], + // crawlerOptions: { ...crawlerOptions, returnOnlyUrls: true }, + // pageOptions: pageOptions, + // }); - res.json({ jobId: job.id }); + // const docs = await a.getDocuments(false, (progress) => { + // job.updateProgress({ + // current: progress.current, + // total: progress.total, + // current_step: "SCRAPING", + // current_url: progress.currentDocumentUrl, + // }); + // }); + // return res.json({ + // success: true, + // documents: docs, + // }); + // } catch (error) { + // Logger.error(error); + // return res.status(500).json({ error: error.message }); + // } + // } + + const id = uuidv4(); + + let robots; + + try { + robots = await this.getRobotsTxt(); + } catch (_) {} + + const sc: StoredCrawl = { + originUrl: url, + crawlerOptions, + pageOptions, + team_id, + robots, + }; + + await saveCrawl(id, sc); + + const crawler = crawlToCrawler(id, sc); + + const sitemap = sc.crawlerOptions?.ignoreSitemap ? null : await crawler.tryGetSitemap(); + + if (sitemap !== null) { + for (const url of sitemap.map(x => x.url)) { + await lockURL(id, sc, url); + const job = await addScrapeJob({ + url, + mode: "single_urls", + crawlerOptions: crawlerOptions, + team_id: team_id, + pageOptions: pageOptions, + origin: "website-preview", + crawl_id: id, + sitemapped: true, + }); + await addCrawlJob(id, job.id); + } + } else { + await lockURL(id, sc, url); + const job = await addScrapeJob({ + url, + mode: "single_urls", + crawlerOptions: crawlerOptions, + team_id: team_id, + pageOptions: pageOptions, + origin: "website-preview", + crawl_id: id, + }); + await addCrawlJob(id, job.id); + } + + res.json({ jobId: id }); } catch (error) { Logger.error(error); return res.status(500).json({ error: error.message }); diff --git a/apps/api/src/controllers/status.ts b/apps/api/src/controllers/status.ts index 6437bea0..8dccfde3 100644 --- a/apps/api/src/controllers/status.ts +++ b/apps/api/src/controllers/status.ts @@ -1,56 +1,38 @@ import { Request, Response } from "express"; -import { getWebScraperQueue } from "../../src/services/queue-service"; -import { supabaseGetJobById } from "../../src/lib/supabase-jobs"; import { Logger } from "../../src/lib/logger"; +import { getCrawl, getCrawlJobs } from "../../src/lib/crawl-redis"; +import { getScrapeQueue } from "../../src/services/queue-service"; export async function crawlJobStatusPreviewController(req: Request, res: Response) { try { - const job = await getWebScraperQueue().getJob(req.params.jobId); - if (!job) { + const sc = await getCrawl(req.params.jobId); + if (!sc) { return res.status(404).json({ error: "Job not found" }); } - let progress = job.progress; - if(typeof progress !== 'object') { - progress = { - current: 0, - current_url: '', - total: 0, - current_step: '', - partialDocs: [] - } - } - const { - current = 0, - current_url = '', - total = 0, - current_step = '', - partialDocs = [] - } = progress as { current: number, current_url: string, total: number, current_step: string, partialDocs: any[] }; + const jobIDs = await getCrawlJobs(req.params.jobId); - let data = job.returnvalue; - if (process.env.USE_DB_AUTHENTICATION === "true") { - const supabaseData = await supabaseGetJobById(req.params.jobId); + // let data = job.returnvalue; + // if (process.env.USE_DB_AUTHENTICATION === "true") { + // const supabaseData = await supabaseGetJobById(req.params.jobId); - if (supabaseData) { - data = supabaseData.docs; - } - } + // if (supabaseData) { + // data = supabaseData.docs; + // } + // } - let jobStatus = await job.getState(); - if (jobStatus === 'waiting' || jobStatus === 'delayed' || jobStatus === 'waiting-children' || jobStatus === 'unknown' || jobStatus === 'prioritized') { - jobStatus = 'active'; - } + const jobs = await Promise.all(jobIDs.map(x => getScrapeQueue().getJob(x))); + const jobStatuses = await Promise.all(jobs.map(x => x.getState())); + const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active"; + + const data = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue); res.json({ status: jobStatus, - // progress: job.progress(), - current, - current_url, - current_step, - total, - data: data ? data : null, - partial_data: jobStatus == 'completed' ? [] : partialDocs, + current: jobStatuses.filter(x => x === "completed" || x === "failed").length, + total: jobs.length, + data: jobStatus === "completed" ? data : null, + partial_data: jobStatus === "completed" ? [] : data.filter(x => x !== null), }); } catch (error) { Logger.error(error); diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index cac63aee..f9fab1cb 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -2,7 +2,7 @@ import express from "express"; import bodyParser from "body-parser"; import cors from "cors"; import "dotenv/config"; -import { getScrapeQueue, getWebScraperQueue } from "./services/queue-service"; +import { getScrapeQueue } from "./services/queue-service"; import { v0Router } from "./routes/v0"; import { initSDK } from "@hyperdx/node-opentelemetry"; import cluster from "cluster"; @@ -58,7 +58,7 @@ if (cluster.isMaster) { serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`); const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({ - queues: [new BullAdapter(getWebScraperQueue()), new BullAdapter(getScrapeQueue())], + queues: [new BullAdapter(getScrapeQueue())], serverAdapter: serverAdapter, }); @@ -104,9 +104,9 @@ if (cluster.isMaster) { app.get(`/serverHealthCheck`, async (req, res) => { try { - const webScraperQueue = getWebScraperQueue(); + const scrapeQueue = getScrapeQueue(); const [waitingJobs] = await Promise.all([ - webScraperQueue.getWaitingCount(), + scrapeQueue.getWaitingCount(), ]); const noWaitingJobs = waitingJobs === 0; @@ -126,9 +126,9 @@ if (cluster.isMaster) { const timeout = 60000; // 1 minute // The timeout value for the check in milliseconds const getWaitingJobsCount = async () => { - const webScraperQueue = getWebScraperQueue(); + const scrapeQueue = getScrapeQueue(); const [waitingJobsCount] = await Promise.all([ - webScraperQueue.getWaitingCount(), + scrapeQueue.getWaitingCount(), ]); return waitingJobsCount; @@ -181,12 +181,12 @@ if (cluster.isMaster) { Logger.info(`Worker ${process.pid} started`); } -// const wsq = getWebScraperQueue(); +// const sq = getScrapeQueue(); -// wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); -// wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active")); -// wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed")); -// wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused")); -// wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); -// wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); +// sq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); +// sq.on("active", j => ScrapeEvents.logJobEvent(j, "active")); +// sq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed")); +// sq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused")); +// sq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); +// sq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); diff --git a/apps/api/src/main/runWebScraper.ts b/apps/api/src/main/runWebScraper.ts index 0c053b77..81c06e9f 100644 --- a/apps/api/src/main/runWebScraper.ts +++ b/apps/api/src/main/runWebScraper.ts @@ -12,7 +12,7 @@ import { Document } from "../lib/entities"; import { supabase_service } from "../services/supabase"; import { Logger } from "../lib/logger"; import { ScrapeEvents } from "../lib/scrape-events"; -import { getWebScraperQueue } from "../services/queue-service"; +import { getScrapeQueue } from "../services/queue-service"; export async function startWebScraperPipeline({ job, @@ -106,19 +106,15 @@ export async function runWebScraper({ }) : docs; - const isCancelled = await (await getWebScraperQueue().client).exists("cancelled:" + bull_job_id); + const billingResult = await billTeam(team_id, filteredDocs.length); - if (!isCancelled) { - const billingResult = await billTeam(team_id, filteredDocs.length); - - if (!billingResult.success) { - // throw new Error("Failed to bill team, no subscription was found"); - return { - success: false, - message: "Failed to bill team, no subscription was found", - docs: [], - }; - } + if (!billingResult.success) { + // throw new Error("Failed to bill team, no subscription was found"); + return { + success: false, + message: "Failed to bill team, no subscription was found", + docs: [], + }; } // This is where the returnvalue from the job is set diff --git a/apps/api/src/scraper/WebScraper/index.ts b/apps/api/src/scraper/WebScraper/index.ts index 859127bd..a03553bc 100644 --- a/apps/api/src/scraper/WebScraper/index.ts +++ b/apps/api/src/scraper/WebScraper/index.ts @@ -16,7 +16,7 @@ import { replacePathsWithAbsolutePaths, } from "./utils/replacePaths"; import { generateCompletions } from "../../lib/LLM-extraction"; -import { getWebScraperQueue } from "../../../src/services/queue-service"; +import { getScrapeQueue } from "../../../src/services/queue-service"; import { fetchAndProcessDocx } from "./utils/docxProcessor"; import { getAdjustedMaxDepth, getURLDepth } from "./utils/maxDepthUtils"; import { Logger } from "../../lib/logger"; @@ -88,21 +88,6 @@ export class WebScraperDataProvider { results[i + index] = result; }) ); - try { - if (this.mode === "crawl" && this.bullJobId) { - const job = await getWebScraperQueue().getJob(this.bullJobId); - const jobStatus = await job.getState(); - if (jobStatus === "failed") { - Logger.info( - "Job has failed or has been cancelled by the user. Stopping the job..." - ); - return [] as Document[]; - } - } - } catch (error) { - Logger.error(error.message); - return [] as Document[]; - } } return results.filter((result) => result !== null) as Document[]; } diff --git a/apps/api/src/services/alerts/index.ts b/apps/api/src/services/alerts/index.ts index 0376f4c2..cb953e2e 100644 --- a/apps/api/src/services/alerts/index.ts +++ b/apps/api/src/services/alerts/index.ts @@ -1,5 +1,5 @@ import { Logger } from "../../../src/lib/logger"; -import { getWebScraperQueue } from "../queue-service"; +import { getScrapeQueue } from "../queue-service"; import { sendSlackWebhook } from "./slack"; export async function checkAlerts() { @@ -13,8 +13,8 @@ export async function checkAlerts() { Logger.info("Initializing alerts"); const checkActiveJobs = async () => { try { - const webScraperQueue = getWebScraperQueue(); - const activeJobs = await webScraperQueue.getActiveCount(); + const scrapeQueue = getScrapeQueue(); + const activeJobs = await scrapeQueue.getActiveCount(); if (activeJobs > Number(process.env.ALERT_NUM_ACTIVE_JOBS)) { Logger.warn( `Alert: Number of active jobs is over ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}.` @@ -34,8 +34,8 @@ export async function checkAlerts() { }; const checkWaitingQueue = async () => { - const webScraperQueue = getWebScraperQueue(); - const waitingJobs = await webScraperQueue.getWaitingCount(); + const scrapeQueue = getScrapeQueue(); + const waitingJobs = await scrapeQueue.getWaitingCount(); if (waitingJobs > Number(process.env.ALERT_NUM_WAITING_JOBS)) { Logger.warn( diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 46c2fb22..add37c45 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -1,22 +1,8 @@ import { Job, Queue } from "bullmq"; -import { - getScrapeQueue, - getWebScraperQueue, -} from "./queue-service"; +import { getScrapeQueue } from "./queue-service"; import { v4 as uuidv4 } from "uuid"; import { WebScraperOptions } from "../types"; -export async function addWebScraperJob( - webScraperOptions: WebScraperOptions, - options: any = {}, - jobId: string = uuidv4(), -): Promise { - return await getWebScraperQueue().add(jobId, webScraperOptions, { - ...options, - jobId, - }); -} - export async function addScrapeJob( webScraperOptions: WebScraperOptions, options: any = {}, diff --git a/apps/api/src/services/queue-service.ts b/apps/api/src/services/queue-service.ts index 348e3d7c..b13489a6 100644 --- a/apps/api/src/services/queue-service.ts +++ b/apps/api/src/services/queue-service.ts @@ -2,38 +2,13 @@ import { Queue } from "bullmq"; import { Logger } from "../lib/logger"; import IORedis from "ioredis"; -let webScraperQueue: Queue; let scrapeQueue: Queue; export const redisConnection = new IORedis(process.env.REDIS_URL, { maxRetriesPerRequest: null, }); -export const webScraperQueueName = "{crawlQueue}"; export const scrapeQueueName = "{scrapeQueue}"; -export function getWebScraperQueue() { - if (!webScraperQueue) { - webScraperQueue = new Queue( - webScraperQueueName, - { - connection: redisConnection, - } - // { - // settings: { - // lockDuration: 1 * 60 * 1000, // 1 minute in milliseconds, - // lockRenewTime: 15 * 1000, // 15 seconds in milliseconds - // stalledInterval: 30 * 1000, - // maxStalledCount: 10, - // }, - // defaultJobOptions:{ - // attempts: 5 - // } - // } - ); - Logger.info("Web scraper queue created"); - } - return webScraperQueue; -} export function getScrapeQueue() { if (!scrapeQueue) { @@ -62,5 +37,4 @@ export function getScrapeQueue() { import { QueueEvents } from 'bullmq'; -export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection }); -export const webScraperQueueEvents = new QueueEvents(webScraperQueueName, { connection: redisConnection }); \ No newline at end of file +export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection }); \ No newline at end of file diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 8f6b7093..aec93f64 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -1,9 +1,7 @@ import { CustomError } from "../lib/custom-error"; import { - getWebScraperQueue, getScrapeQueue, redisConnection, - webScraperQueueName, scrapeQueueName, } from "./queue-service"; import "dotenv/config"; @@ -110,7 +108,6 @@ const workerFun = async (queueName: string, processJobInternal: (token: string, } }; -workerFun(webScraperQueueName, processJobInternal); workerFun(scrapeQueueName, processJobInternal); async function processJob(job: Job, token: string) { @@ -205,10 +202,6 @@ async function processJob(job: Job, token: string) { return data; } catch (error) { Logger.error(`🐂 Job errored ${job.id} - ${error}`); - if (await getWebScraperQueue().isPaused()) { - Logger.debug("🐂Queue is paused, ignoring"); - return; - } if (error instanceof CustomError) { // Here we handle the error, then save the failed job