diff --git a/apps/api/src/controllers/crawl.ts b/apps/api/src/controllers/crawl.ts index 614a5928..9480c63b 100644 --- a/apps/api/src/controllers/crawl.ts +++ b/apps/api/src/controllers/crawl.ts @@ -10,6 +10,7 @@ import { logCrawl } from "../../src/services/logging/crawl_log"; import { validateIdempotencyKey } from "../../src/services/idempotency/validate"; import { createIdempotencyKey } from "../../src/services/idempotency/create"; import { defaultCrawlPageOptions, defaultCrawlerOptions, defaultOrigin } from "../../src/lib/default-values"; +import { v4 as uuidv4 } from "uuid"; import { Logger } from "../../src/lib/logger"; export async function crawlController(req: Request, res: Response) { @@ -61,10 +62,11 @@ export async function crawlController(req: Request, res: Response) { const crawlerOptions = { ...defaultCrawlerOptions, ...req.body.crawlerOptions }; const pageOptions = { ...defaultCrawlPageOptions, ...req.body.pageOptions }; - if (mode === "single_urls" && !url.includes(",")) { + if (mode === "single_urls" && !url.includes(",")) { // NOTE: do we need this? try { const a = new WebScraperDataProvider(); await a.setOptions({ + jobId: uuidv4(), mode: "single_urls", urls: [url], crawlerOptions: { ...crawlerOptions, returnOnlyUrls: true }, diff --git a/apps/api/src/controllers/scrape.ts b/apps/api/src/controllers/scrape.ts index 615f0c0e..f594eea8 100644 --- a/apps/api/src/controllers/scrape.ts +++ b/apps/api/src/controllers/scrape.ts @@ -9,9 +9,11 @@ import { Document } from "../lib/entities"; import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function import { numTokensFromString } from '../lib/LLM-extraction/helpers'; import { defaultPageOptions, defaultExtractorOptions, defaultTimeout, defaultOrigin } from '../lib/default-values'; +import { v4 as uuidv4 } from "uuid"; import { Logger } from '../lib/logger'; export async function scrapeHelper( + jobId: string, req: Request, team_id: string, crawlerOptions: any, @@ -36,6 +38,7 @@ export async function scrapeHelper( const a = new WebScraperDataProvider(); await a.setOptions({ + jobId, mode: "single_urls", urls: [url], crawlerOptions: { @@ -128,8 +131,11 @@ export async function scrapeController(req: Request, res: Response) { checkCredits(); } + const jobId = uuidv4(); + const startTime = new Date().getTime(); const result = await scrapeHelper( + jobId, req, team_id, crawlerOptions, @@ -170,6 +176,7 @@ export async function scrapeController(req: Request, res: Response) { } logJob({ + job_id: jobId, success: result.success, message: result.error, num_docs: 1, diff --git a/apps/api/src/controllers/search.ts b/apps/api/src/controllers/search.ts index adacb766..dfd9b8b9 100644 --- a/apps/api/src/controllers/search.ts +++ b/apps/api/src/controllers/search.ts @@ -7,9 +7,11 @@ import { logJob } from "../services/logging/log_job"; import { PageOptions, SearchOptions } from "../lib/entities"; import { search } from "../search"; import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; +import { v4 as uuidv4 } from "uuid"; import { Logger } from "../lib/logger"; export async function searchHelper( + jobId: string, req: Request, team_id: string, crawlerOptions: any, @@ -76,6 +78,7 @@ export async function searchHelper( const a = new WebScraperDataProvider(); await a.setOptions({ + jobId, mode: "single_urls", urls: res.map((r) => r.url).slice(0, searchOptions.limit ?? 7), crawlerOptions: { @@ -149,6 +152,8 @@ export async function searchController(req: Request, res: Response) { const searchOptions = req.body.searchOptions ?? { limit: 7 }; + const jobId = uuidv4(); + try { const { success: creditsCheckSuccess, message: creditsCheckMessage } = await checkTeamCredits(team_id, 1); @@ -161,6 +166,7 @@ export async function searchController(req: Request, res: Response) { } const startTime = new Date().getTime(); const result = await searchHelper( + jobId, req, team_id, crawlerOptions, @@ -170,6 +176,7 @@ export async function searchController(req: Request, res: Response) { const endTime = new Date().getTime(); const timeTakenInSeconds = (endTime - startTime) / 1000; logJob({ + job_id: jobId, success: result.success, message: result.error, num_docs: result.data ? result.data.length : 0, diff --git a/apps/api/src/example.ts b/apps/api/src/example.ts index 0c30ea33..edf0faef 100644 --- a/apps/api/src/example.ts +++ b/apps/api/src/example.ts @@ -4,6 +4,7 @@ async function example() { const example = new WebScraperDataProvider(); await example.setOptions({ + jobId: "TEST", mode: "crawl", urls: ["https://mendable.ai"], crawlerOptions: {}, diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 69433d6b..3fba86f8 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -9,6 +9,7 @@ import cluster from "cluster"; import os from "os"; import { Logger } from "./lib/logger"; import { adminRouter } from "./routes/admin"; +import { ScrapeEvents } from "./lib/scrape-events"; const { createBullBoard } = require("@bull-board/api"); const { BullAdapter } = require("@bull-board/api/bullAdapter"); @@ -168,3 +169,12 @@ if (cluster.isMaster) { Logger.info(`Worker ${process.pid} started`); } + +const wsq = getWebScraperQueue(); + +wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); +wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active")); +wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed")); +wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused")); +wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); +wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); diff --git a/apps/api/src/lib/entities.ts b/apps/api/src/lib/entities.ts index f60e197f..56cd793a 100644 --- a/apps/api/src/lib/entities.ts +++ b/apps/api/src/lib/entities.ts @@ -56,6 +56,7 @@ export type CrawlerOptions = { } export type WebScraperOptions = { + jobId: string; urls: string[]; mode: "single_urls" | "sitemap" | "crawl"; crawlerOptions?: CrawlerOptions; diff --git a/apps/api/src/lib/scrape-events.ts b/apps/api/src/lib/scrape-events.ts new file mode 100644 index 00000000..ab4ef681 --- /dev/null +++ b/apps/api/src/lib/scrape-events.ts @@ -0,0 +1,84 @@ +import { Job, JobId } from "bull"; +import type { baseScrapers } from "../scraper/WebScraper/single_url"; +import { supabase_service as supabase } from "../services/supabase"; +import { Logger } from "./logger"; + +export type ScrapeErrorEvent = { + type: "error", + message: string, + stack?: string, +} + +export type ScrapeScrapeEvent = { + type: "scrape", + url: string, + worker?: string, + method: (typeof baseScrapers)[number], + result: null | { + success: boolean, + response_code?: number, + response_size?: number, + error?: string | object, + // proxy?: string, + time_taken: number, + }, +} + +export type ScrapeQueueEvent = { + type: "queue", + event: "waiting" | "active" | "completed" | "paused" | "resumed" | "removed" | "failed", + worker?: string, +} + +export type ScrapeEvent = ScrapeErrorEvent | ScrapeScrapeEvent | ScrapeQueueEvent; + +export class ScrapeEvents { + static async insert(jobId: string, content: ScrapeEvent) { + if (jobId === "TEST") return null; + + if (process.env.USE_DB_AUTHENTICATION) { + try { + const result = await supabase.from("scrape_events").insert({ + job_id: jobId, + type: content.type, + content: content, + // created_at + }).select().single(); + return (result.data as any).id; + } catch (error) { + Logger.error(`Error inserting scrape event: ${error}`); + return null; + } + } + + return null; + } + + static async updateScrapeResult(logId: number | null, result: ScrapeScrapeEvent["result"]) { + if (logId === null) return; + + try { + const previousLog = (await supabase.from("scrape_events").select().eq("id", logId).single()).data as any; + await supabase.from("scrape_events").update({ + content: { + ...previousLog.content, + result, + } + }).eq("id", logId); + } catch (error) { + Logger.error(`Error updating scrape result: ${error}`); + } + } + + static async logJobEvent(job: Job | JobId, event: ScrapeQueueEvent["event"]) { + try { + await this.insert(((job as any).id ? (job as any).id : job) as string, { + type: "queue", + event, + worker: process.env.FLY_MACHINE_ID, + }); + } catch (error) { + Logger.error(`Error logging job event: ${error}`); + } + } +} diff --git a/apps/api/src/main/runWebScraper.ts b/apps/api/src/main/runWebScraper.ts index 0a7da035..5e7d2279 100644 --- a/apps/api/src/main/runWebScraper.ts +++ b/apps/api/src/main/runWebScraper.ts @@ -11,6 +11,7 @@ import { billTeam } from "../services/billing/credit_billing"; import { Document } from "../lib/entities"; import { supabase_service } from "../services/supabase"; import { Logger } from "../lib/logger"; +import { ScrapeEvents } from "../lib/scrape-events"; export async function startWebScraperPipeline({ job, @@ -39,6 +40,7 @@ export async function startWebScraperPipeline({ }, onError: (error) => { Logger.error(`🐂 Job failed ${job.id}`); + ScrapeEvents.logJobEvent(job, "failed"); job.moveToFailed(error); }, team_id: job.data.team_id, @@ -60,6 +62,7 @@ export async function runWebScraper({ const provider = new WebScraperDataProvider(); if (mode === "crawl") { await provider.setOptions({ + jobId: bull_job_id, mode: mode, urls: [url], crawlerOptions: crawlerOptions, @@ -68,6 +71,7 @@ export async function runWebScraper({ }); } else { await provider.setOptions({ + jobId: bull_job_id, mode: mode, urls: url.split(","), crawlerOptions: crawlerOptions, @@ -138,6 +142,7 @@ const saveJob = async (job: Job, result: any) => { // I think the job won't exist here anymore } } + ScrapeEvents.logJobEvent(job, "completed"); } catch (error) { Logger.error(`🐂 Failed to update job status: ${error}`); } diff --git a/apps/api/src/scraper/WebScraper/__tests__/crawler.test.ts b/apps/api/src/scraper/WebScraper/__tests__/crawler.test.ts index 32c8b0a0..20419ffa 100644 --- a/apps/api/src/scraper/WebScraper/__tests__/crawler.test.ts +++ b/apps/api/src/scraper/WebScraper/__tests__/crawler.test.ts @@ -42,6 +42,7 @@ describe('WebCrawler', () => { crawler = new WebCrawler({ + jobId: "TEST", initialUrl: initialUrl, includes: [], excludes: [], @@ -76,6 +77,7 @@ describe('WebCrawler', () => { crawler = new WebCrawler({ + jobId: "TEST", initialUrl: initialUrl, includes: [], excludes: [], @@ -104,6 +106,7 @@ describe('WebCrawler', () => { crawler = new WebCrawler({ + jobId: "TEST", initialUrl: initialUrl, includes: [], excludes: [], @@ -133,6 +136,7 @@ describe('WebCrawler', () => { crawler = new WebCrawler({ + jobId: "TEST", initialUrl: initialUrl, includes: [], excludes: [], @@ -161,6 +165,7 @@ describe('WebCrawler', () => { // Setup the crawler with the specific test case options const crawler = new WebCrawler({ + jobId: "TEST", initialUrl: initialUrl, includes: [], excludes: [], @@ -194,6 +199,7 @@ describe('WebCrawler', () => { const limit = 2; // Set a limit for the number of links crawler = new WebCrawler({ + jobId: "TEST", initialUrl: initialUrl, includes: [], excludes: [], diff --git a/apps/api/src/scraper/WebScraper/__tests__/single_url.test.ts b/apps/api/src/scraper/WebScraper/__tests__/single_url.test.ts index 8a9df227..4b720835 100644 --- a/apps/api/src/scraper/WebScraper/__tests__/single_url.test.ts +++ b/apps/api/src/scraper/WebScraper/__tests__/single_url.test.ts @@ -15,8 +15,8 @@ describe('scrapSingleUrl', () => { const pageOptionsWithHtml: PageOptions = { includeHtml: true }; const pageOptionsWithoutHtml: PageOptions = { includeHtml: false }; - const resultWithHtml = await scrapSingleUrl(url, pageOptionsWithHtml); - const resultWithoutHtml = await scrapSingleUrl(url, pageOptionsWithoutHtml); + const resultWithHtml = await scrapSingleUrl("TEST", url, pageOptionsWithHtml); + const resultWithoutHtml = await scrapSingleUrl("TEST", url, pageOptionsWithoutHtml); expect(resultWithHtml.html).toBeDefined(); expect(resultWithoutHtml.html).toBeUndefined(); @@ -27,7 +27,7 @@ it('should return a list of links on the mendable.ai page', async () => { const url = 'https://mendable.ai'; const pageOptions: PageOptions = { includeHtml: true }; - const result = await scrapSingleUrl(url, pageOptions); + const result = await scrapSingleUrl("TEST", url, pageOptions); // Check if the result contains a list of links expect(result.linksOnPage).toBeDefined(); diff --git a/apps/api/src/scraper/WebScraper/crawler.ts b/apps/api/src/scraper/WebScraper/crawler.ts index 29819b1c..5ee8cda8 100644 --- a/apps/api/src/scraper/WebScraper/crawler.ts +++ b/apps/api/src/scraper/WebScraper/crawler.ts @@ -11,6 +11,7 @@ import { axiosTimeout } from "../../../src/lib/timeout"; import { Logger } from "../../../src/lib/logger"; export class WebCrawler { + private jobId: string; private initialUrl: string; private baseUrl: string; private includes: string[]; @@ -27,6 +28,7 @@ export class WebCrawler { private allowExternalContentLinks: boolean; constructor({ + jobId, initialUrl, includes, excludes, @@ -37,6 +39,7 @@ export class WebCrawler { allowBackwardCrawling = false, allowExternalContentLinks = false }: { + jobId: string; initialUrl: string; includes?: string[]; excludes?: string[]; @@ -47,6 +50,7 @@ export class WebCrawler { allowBackwardCrawling?: boolean; allowExternalContentLinks?: boolean; }) { + this.jobId = jobId; this.initialUrl = initialUrl; this.baseUrl = new URL(initialUrl).origin; this.includes = includes ?? []; @@ -261,7 +265,7 @@ export class WebCrawler { // If it is the first link, fetch with single url if (this.visited.size === 1) { - const page = await scrapSingleUrl(url, { ...pageOptions, includeHtml: true }); + const page = await scrapSingleUrl(this.jobId, url, { ...pageOptions, includeHtml: true }); content = page.html ?? ""; pageStatusCode = page.metadata?.pageStatusCode; pageError = page.metadata?.pageError || undefined; diff --git a/apps/api/src/scraper/WebScraper/index.ts b/apps/api/src/scraper/WebScraper/index.ts index 6506620e..eff709fa 100644 --- a/apps/api/src/scraper/WebScraper/index.ts +++ b/apps/api/src/scraper/WebScraper/index.ts @@ -22,6 +22,7 @@ import { getAdjustedMaxDepth, getURLDepth } from "./utils/maxDepthUtils"; import { Logger } from "../../lib/logger"; export class WebScraperDataProvider { + private jobId: string; private bullJobId: string; private urls: string[] = [""]; private mode: "single_urls" | "sitemap" | "crawl" = "single_urls"; @@ -66,6 +67,7 @@ export class WebScraperDataProvider { batchUrls.map(async (url, index) => { const existingHTML = allHtmls ? allHtmls[i + index] : ""; const result = await scrapSingleUrl( + this.jobId, url, this.pageOptions, this.extractorOptions, @@ -166,6 +168,7 @@ export class WebScraperDataProvider { inProgress?: (progress: Progress) => void ): Promise { const crawler = new WebCrawler({ + jobId: this.jobId, initialUrl: this.urls[0], includes: this.includes, excludes: this.excludes, @@ -500,6 +503,7 @@ export class WebScraperDataProvider { throw new Error("Urls are required"); } + this.jobId = options.jobId; this.bullJobId = options.bullJobId; this.urls = options.urls; this.mode = options.mode; diff --git a/apps/api/src/scraper/WebScraper/single_url.ts b/apps/api/src/scraper/WebScraper/single_url.ts index 1e7f1fac..4b428d35 100644 --- a/apps/api/src/scraper/WebScraper/single_url.ts +++ b/apps/api/src/scraper/WebScraper/single_url.ts @@ -18,10 +18,11 @@ import { scrapWithPlaywright } from "./scrapers/playwright"; import { scrapWithScrapingBee } from "./scrapers/scrapingBee"; import { extractLinks } from "./utils/utils"; import { Logger } from "../../lib/logger"; +import { ScrapeEvents } from "../../lib/scrape-events"; dotenv.config(); -const baseScrapers = [ +export const baseScrapers = [ "fire-engine", "fire-engine;chrome-cdp", "scrapingBee", @@ -118,6 +119,7 @@ function getScrapingFallbackOrder( export async function scrapSingleUrl( + jobId: string, urlToScrap: string, pageOptions: PageOptions = { onlyMainContent: true, @@ -145,6 +147,15 @@ export async function scrapSingleUrl( } = { text: "", screenshot: "", metadata: {} }; let screenshot = ""; + const timer = Date.now(); + const logInsertPromise = ScrapeEvents.insert(jobId, { + type: "scrape", + url, + worker: process.env.FLY_MACHINE_ID, + method, + result: null, + }); + switch (method) { case "fire-engine": case "fire-engine;chrome-cdp": @@ -254,8 +265,19 @@ export async function scrapSingleUrl( } //* TODO: add an optional to return markdown or structured/extracted content let cleanedHtml = removeUnwantedElements(scraperResponse.text, pageOptions); + const text = await parseMarkdown(cleanedHtml); + + const insertedLogId = await logInsertPromise; + ScrapeEvents.updateScrapeResult(insertedLogId, { + response_size: scraperResponse.text.length, + success: !scraperResponse.metadata.pageError && !!text, + error: scraperResponse.metadata.pageError, + response_code: scraperResponse.metadata.pageStatusCode, + time_taken: Date.now() - timer, + }); + return { - text: await parseMarkdown(cleanedHtml), + text, html: cleanedHtml, rawHtml: scraperResponse.text, screenshot: scraperResponse.screenshot, @@ -379,6 +401,11 @@ export async function scrapSingleUrl( return document; } catch (error) { Logger.debug(`⛏️ Error: ${error.message} - Failed to fetch URL: ${urlToScrap}`); + ScrapeEvents.insert(jobId, { + type: "error", + message: typeof error === "string" ? error : typeof error.message === "string" ? error.message : JSON.stringify(error), + stack: error.stack, + }); return { content: "", markdown: "", diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 532c5bc1..e7767809 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -8,6 +8,7 @@ import { logJob } from "./logging/log_job"; import { initSDK } from '@hyperdx/node-opentelemetry'; import { Job } from "bull"; import { Logger } from "../lib/logger"; +import { ScrapeEvents } from "../lib/scrape-events"; if (process.env.ENV === 'production') { initSDK({ @@ -20,6 +21,7 @@ const wsq = getWebScraperQueue(); async function processJob(job: Job, done) { Logger.debug(`🐂 Worker taking job ${job.id}`); + try { job.progress({ current: 1, @@ -114,3 +116,10 @@ wsq.process( Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)), processJob ); + +wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); +wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active")); +wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed")); +wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused")); +wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); +wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));