From 7cd9bf92e33b1079cff7e92dcb28bc548021bee5 Mon Sep 17 00:00:00 2001 From: Gergo Moricz Date: Wed, 24 Jul 2024 14:31:25 +0200 Subject: [PATCH 1/8] feat: scrape event logging to DB --- apps/api/src/controllers/crawl.ts | 4 +- apps/api/src/controllers/scrape.ts | 2 + apps/api/src/controllers/search.ts | 7 +++ apps/api/src/example.ts | 1 + apps/api/src/lib/entities.ts | 1 + apps/api/src/lib/scrape-events.ts | 58 +++++++++++++++++++ apps/api/src/main/runWebScraper.ts | 2 + .../WebScraper/__tests__/crawler.test.ts | 6 ++ .../WebScraper/__tests__/single_url.test.ts | 6 +- apps/api/src/scraper/WebScraper/crawler.ts | 6 +- apps/api/src/scraper/WebScraper/index.ts | 4 ++ apps/api/src/scraper/WebScraper/single_url.ts | 28 ++++++++- 12 files changed, 118 insertions(+), 7 deletions(-) create mode 100644 apps/api/src/lib/scrape-events.ts diff --git a/apps/api/src/controllers/crawl.ts b/apps/api/src/controllers/crawl.ts index 89358fcc..3388f8a7 100644 --- a/apps/api/src/controllers/crawl.ts +++ b/apps/api/src/controllers/crawl.ts @@ -10,6 +10,7 @@ import { logCrawl } from "../../src/services/logging/crawl_log"; import { validateIdempotencyKey } from "../../src/services/idempotency/validate"; import { createIdempotencyKey } from "../../src/services/idempotency/create"; import { defaultCrawlPageOptions, defaultCrawlerOptions, defaultOrigin } from "../../src/lib/default-values"; +import { v4 as uuidv4 } from "uuid"; export async function crawlController(req: Request, res: Response) { try { @@ -60,10 +61,11 @@ export async function crawlController(req: Request, res: Response) { const crawlerOptions = { ...defaultCrawlerOptions, ...req.body.crawlerOptions }; const pageOptions = { ...defaultCrawlPageOptions, ...req.body.pageOptions }; - if (mode === "single_urls" && !url.includes(",")) { + if (mode === "single_urls" && !url.includes(",")) { // NOTE: do we need this? try { const a = new WebScraperDataProvider(); await a.setOptions({ + jobId: uuidv4(), mode: "single_urls", urls: [url], crawlerOptions: { ...crawlerOptions, returnOnlyUrls: true }, diff --git a/apps/api/src/controllers/scrape.ts b/apps/api/src/controllers/scrape.ts index fb57e41d..bfbb1e2a 100644 --- a/apps/api/src/controllers/scrape.ts +++ b/apps/api/src/controllers/scrape.ts @@ -9,6 +9,7 @@ import { Document } from "../lib/entities"; import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function import { numTokensFromString } from '../lib/LLM-extraction/helpers'; import { defaultPageOptions, defaultExtractorOptions, defaultTimeout, defaultOrigin } from '../lib/default-values'; +import { v4 as uuidv4 } from "uuid"; export async function scrapeHelper( req: Request, @@ -35,6 +36,7 @@ export async function scrapeHelper( const a = new WebScraperDataProvider(); await a.setOptions({ + jobId: uuidv4(), mode: "single_urls", urls: [url], crawlerOptions: { diff --git a/apps/api/src/controllers/search.ts b/apps/api/src/controllers/search.ts index 8cb6d55b..e6ab8e9d 100644 --- a/apps/api/src/controllers/search.ts +++ b/apps/api/src/controllers/search.ts @@ -7,8 +7,10 @@ import { logJob } from "../services/logging/log_job"; import { PageOptions, SearchOptions } from "../lib/entities"; import { search } from "../search"; import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; +import { v4 as uuidv4 } from "uuid"; export async function searchHelper( + jobId: string, req: Request, team_id: string, crawlerOptions: any, @@ -75,6 +77,7 @@ export async function searchHelper( const a = new WebScraperDataProvider(); await a.setOptions({ + jobId, mode: "single_urls", urls: res.map((r) => r.url).slice(0, searchOptions.limit ?? 7), crawlerOptions: { @@ -148,6 +151,8 @@ export async function searchController(req: Request, res: Response) { const searchOptions = req.body.searchOptions ?? { limit: 7 }; + const jobId = uuidv4(); + try { const { success: creditsCheckSuccess, message: creditsCheckMessage } = await checkTeamCredits(team_id, 1); @@ -160,6 +165,7 @@ export async function searchController(req: Request, res: Response) { } const startTime = new Date().getTime(); const result = await searchHelper( + jobId, req, team_id, crawlerOptions, @@ -169,6 +175,7 @@ export async function searchController(req: Request, res: Response) { const endTime = new Date().getTime(); const timeTakenInSeconds = (endTime - startTime) / 1000; logJob({ + job_id: jobId, success: result.success, message: result.error, num_docs: result.data ? result.data.length : 0, diff --git a/apps/api/src/example.ts b/apps/api/src/example.ts index 0c30ea33..edf0faef 100644 --- a/apps/api/src/example.ts +++ b/apps/api/src/example.ts @@ -4,6 +4,7 @@ async function example() { const example = new WebScraperDataProvider(); await example.setOptions({ + jobId: "TEST", mode: "crawl", urls: ["https://mendable.ai"], crawlerOptions: {}, diff --git a/apps/api/src/lib/entities.ts b/apps/api/src/lib/entities.ts index f60e197f..56cd793a 100644 --- a/apps/api/src/lib/entities.ts +++ b/apps/api/src/lib/entities.ts @@ -56,6 +56,7 @@ export type CrawlerOptions = { } export type WebScraperOptions = { + jobId: string; urls: string[]; mode: "single_urls" | "sitemap" | "crawl"; crawlerOptions?: CrawlerOptions; diff --git a/apps/api/src/lib/scrape-events.ts b/apps/api/src/lib/scrape-events.ts new file mode 100644 index 00000000..ea30ffc2 --- /dev/null +++ b/apps/api/src/lib/scrape-events.ts @@ -0,0 +1,58 @@ +import type { baseScrapers } from "../scraper/WebScraper/single_url"; +import { supabase_service as supabase } from "../services/supabase"; + +export type ScrapeErrorEvent = { + type: "error", + message: string, + stack?: string, +} + +export type ScrapeScrapeEvent = { + type: "scrape", + method: (typeof baseScrapers)[number], + result: null | { + success: boolean, + response_code?: number, + error?: string, + // proxy?: string, + time_taken: number, + }, +} + +export type ScrapeQueueEvent = { + type: "queue", + event: "created" | "started" | "interrupted" | "finished", + worker?: string, +} + +export type ScrapeEvent = ScrapeErrorEvent | ScrapeScrapeEvent | ScrapeQueueEvent; + +export class ScrapeEvents { + static async insert(jobId: string, content: ScrapeEvent) { + if (jobId === "TEST") return null; + + if (process.env.USE_DB_AUTH) { + const result = await supabase.from("scrape_events").insert({ + job_id: jobId, + type: content.type, + content: content, + // created_at + }).single(); + return (result.data as any).id; + } + + return null; + } + + static async updateScrapeResult(logId: number | null, result: ScrapeScrapeEvent["result"]) { + if (logId === null) return; + + const previousLog = (await supabase.from("scrape_events").select().eq("id", logId).single()).data as any; + await supabase.from("scrape_events").update({ + content: { + ...previousLog.content, + result, + } + }).eq("id", logId); + } +} diff --git a/apps/api/src/main/runWebScraper.ts b/apps/api/src/main/runWebScraper.ts index 0a7da035..5c97ef41 100644 --- a/apps/api/src/main/runWebScraper.ts +++ b/apps/api/src/main/runWebScraper.ts @@ -60,6 +60,7 @@ export async function runWebScraper({ const provider = new WebScraperDataProvider(); if (mode === "crawl") { await provider.setOptions({ + jobId: bull_job_id, mode: mode, urls: [url], crawlerOptions: crawlerOptions, @@ -68,6 +69,7 @@ export async function runWebScraper({ }); } else { await provider.setOptions({ + jobId: bull_job_id, mode: mode, urls: url.split(","), crawlerOptions: crawlerOptions, diff --git a/apps/api/src/scraper/WebScraper/__tests__/crawler.test.ts b/apps/api/src/scraper/WebScraper/__tests__/crawler.test.ts index 32c8b0a0..20419ffa 100644 --- a/apps/api/src/scraper/WebScraper/__tests__/crawler.test.ts +++ b/apps/api/src/scraper/WebScraper/__tests__/crawler.test.ts @@ -42,6 +42,7 @@ describe('WebCrawler', () => { crawler = new WebCrawler({ + jobId: "TEST", initialUrl: initialUrl, includes: [], excludes: [], @@ -76,6 +77,7 @@ describe('WebCrawler', () => { crawler = new WebCrawler({ + jobId: "TEST", initialUrl: initialUrl, includes: [], excludes: [], @@ -104,6 +106,7 @@ describe('WebCrawler', () => { crawler = new WebCrawler({ + jobId: "TEST", initialUrl: initialUrl, includes: [], excludes: [], @@ -133,6 +136,7 @@ describe('WebCrawler', () => { crawler = new WebCrawler({ + jobId: "TEST", initialUrl: initialUrl, includes: [], excludes: [], @@ -161,6 +165,7 @@ describe('WebCrawler', () => { // Setup the crawler with the specific test case options const crawler = new WebCrawler({ + jobId: "TEST", initialUrl: initialUrl, includes: [], excludes: [], @@ -194,6 +199,7 @@ describe('WebCrawler', () => { const limit = 2; // Set a limit for the number of links crawler = new WebCrawler({ + jobId: "TEST", initialUrl: initialUrl, includes: [], excludes: [], diff --git a/apps/api/src/scraper/WebScraper/__tests__/single_url.test.ts b/apps/api/src/scraper/WebScraper/__tests__/single_url.test.ts index 8a9df227..4b720835 100644 --- a/apps/api/src/scraper/WebScraper/__tests__/single_url.test.ts +++ b/apps/api/src/scraper/WebScraper/__tests__/single_url.test.ts @@ -15,8 +15,8 @@ describe('scrapSingleUrl', () => { const pageOptionsWithHtml: PageOptions = { includeHtml: true }; const pageOptionsWithoutHtml: PageOptions = { includeHtml: false }; - const resultWithHtml = await scrapSingleUrl(url, pageOptionsWithHtml); - const resultWithoutHtml = await scrapSingleUrl(url, pageOptionsWithoutHtml); + const resultWithHtml = await scrapSingleUrl("TEST", url, pageOptionsWithHtml); + const resultWithoutHtml = await scrapSingleUrl("TEST", url, pageOptionsWithoutHtml); expect(resultWithHtml.html).toBeDefined(); expect(resultWithoutHtml.html).toBeUndefined(); @@ -27,7 +27,7 @@ it('should return a list of links on the mendable.ai page', async () => { const url = 'https://mendable.ai'; const pageOptions: PageOptions = { includeHtml: true }; - const result = await scrapSingleUrl(url, pageOptions); + const result = await scrapSingleUrl("TEST", url, pageOptions); // Check if the result contains a list of links expect(result.linksOnPage).toBeDefined(); diff --git a/apps/api/src/scraper/WebScraper/crawler.ts b/apps/api/src/scraper/WebScraper/crawler.ts index 3d7c267a..c592f400 100644 --- a/apps/api/src/scraper/WebScraper/crawler.ts +++ b/apps/api/src/scraper/WebScraper/crawler.ts @@ -11,6 +11,7 @@ import { axiosTimeout } from "../../../src/lib/timeout"; import { Logger } from "../../../src/lib/logger"; export class WebCrawler { + private jobId: string; private initialUrl: string; private baseUrl: string; private includes: string[]; @@ -27,6 +28,7 @@ export class WebCrawler { private allowExternalContentLinks: boolean; constructor({ + jobId, initialUrl, includes, excludes, @@ -37,6 +39,7 @@ export class WebCrawler { allowBackwardCrawling = false, allowExternalContentLinks = false }: { + jobId: string; initialUrl: string; includes?: string[]; excludes?: string[]; @@ -47,6 +50,7 @@ export class WebCrawler { allowBackwardCrawling?: boolean; allowExternalContentLinks?: boolean; }) { + this.jobId = jobId; this.initialUrl = initialUrl; this.baseUrl = new URL(initialUrl).origin; this.includes = includes ?? []; @@ -261,7 +265,7 @@ export class WebCrawler { // If it is the first link, fetch with single url if (this.visited.size === 1) { - const page = await scrapSingleUrl(url, { ...pageOptions, includeHtml: true }); + const page = await scrapSingleUrl(this.jobId, url, { ...pageOptions, includeHtml: true }); content = page.html ?? ""; pageStatusCode = page.metadata?.pageStatusCode; pageError = page.metadata?.pageError || undefined; diff --git a/apps/api/src/scraper/WebScraper/index.ts b/apps/api/src/scraper/WebScraper/index.ts index 6506620e..eff709fa 100644 --- a/apps/api/src/scraper/WebScraper/index.ts +++ b/apps/api/src/scraper/WebScraper/index.ts @@ -22,6 +22,7 @@ import { getAdjustedMaxDepth, getURLDepth } from "./utils/maxDepthUtils"; import { Logger } from "../../lib/logger"; export class WebScraperDataProvider { + private jobId: string; private bullJobId: string; private urls: string[] = [""]; private mode: "single_urls" | "sitemap" | "crawl" = "single_urls"; @@ -66,6 +67,7 @@ export class WebScraperDataProvider { batchUrls.map(async (url, index) => { const existingHTML = allHtmls ? allHtmls[i + index] : ""; const result = await scrapSingleUrl( + this.jobId, url, this.pageOptions, this.extractorOptions, @@ -166,6 +168,7 @@ export class WebScraperDataProvider { inProgress?: (progress: Progress) => void ): Promise { const crawler = new WebCrawler({ + jobId: this.jobId, initialUrl: this.urls[0], includes: this.includes, excludes: this.excludes, @@ -500,6 +503,7 @@ export class WebScraperDataProvider { throw new Error("Urls are required"); } + this.jobId = options.jobId; this.bullJobId = options.bullJobId; this.urls = options.urls; this.mode = options.mode; diff --git a/apps/api/src/scraper/WebScraper/single_url.ts b/apps/api/src/scraper/WebScraper/single_url.ts index 1e7f1fac..68474fed 100644 --- a/apps/api/src/scraper/WebScraper/single_url.ts +++ b/apps/api/src/scraper/WebScraper/single_url.ts @@ -18,10 +18,11 @@ import { scrapWithPlaywright } from "./scrapers/playwright"; import { scrapWithScrapingBee } from "./scrapers/scrapingBee"; import { extractLinks } from "./utils/utils"; import { Logger } from "../../lib/logger"; +import { ScrapeEvents } from "../../lib/scrape-events"; dotenv.config(); -const baseScrapers = [ +export const baseScrapers = [ "fire-engine", "fire-engine;chrome-cdp", "scrapingBee", @@ -118,6 +119,7 @@ function getScrapingFallbackOrder( export async function scrapSingleUrl( + jobId: string, urlToScrap: string, pageOptions: PageOptions = { onlyMainContent: true, @@ -145,6 +147,13 @@ export async function scrapSingleUrl( } = { text: "", screenshot: "", metadata: {} }; let screenshot = ""; + const timer = Date.now(); + const logInsertPromise = ScrapeEvents.insert(jobId, { + type: "scrape", + method, + result: null, + }); + switch (method) { case "fire-engine": case "fire-engine;chrome-cdp": @@ -254,8 +263,18 @@ export async function scrapSingleUrl( } //* TODO: add an optional to return markdown or structured/extracted content let cleanedHtml = removeUnwantedElements(scraperResponse.text, pageOptions); + const text = await parseMarkdown(cleanedHtml); + + const insertedLogId = await logInsertPromise; + ScrapeEvents.updateScrapeResult(insertedLogId, { + success: !!scraperResponse.metadata.pageError && !!text, + error: scraperResponse.metadata.pageError, + response_code: scraperResponse.metadata.pageStatusCode, + time_taken: Date.now() - timer, + }); + return { - text: await parseMarkdown(cleanedHtml), + text, html: cleanedHtml, rawHtml: scraperResponse.text, screenshot: scraperResponse.screenshot, @@ -379,6 +398,11 @@ export async function scrapSingleUrl( return document; } catch (error) { Logger.debug(`⛏️ Error: ${error.message} - Failed to fetch URL: ${urlToScrap}`); + ScrapeEvents.insert(jobId, { + type: "error", + message: typeof error === "string" ? error : typeof error.message === "string" ? error.message : JSON.stringify(error), + stack: error.stack, + }); return { content: "", markdown: "", From 71072fef3b85c22a3f8daf6f667e17bce6efc7d5 Mon Sep 17 00:00:00 2001 From: Gergo Moricz Date: Wed, 24 Jul 2024 14:46:41 +0200 Subject: [PATCH 2/8] fix(scrape-events): bad logic --- apps/api/src/lib/scrape-events.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/api/src/lib/scrape-events.ts b/apps/api/src/lib/scrape-events.ts index ea30ffc2..d7958d14 100644 --- a/apps/api/src/lib/scrape-events.ts +++ b/apps/api/src/lib/scrape-events.ts @@ -13,7 +13,7 @@ export type ScrapeScrapeEvent = { result: null | { success: boolean, response_code?: number, - error?: string, + error?: string | object, // proxy?: string, time_taken: number, }, @@ -31,13 +31,13 @@ export class ScrapeEvents { static async insert(jobId: string, content: ScrapeEvent) { if (jobId === "TEST") return null; - if (process.env.USE_DB_AUTH) { + if (process.env.USE_DB_AUTHENTICATION) { const result = await supabase.from("scrape_events").insert({ job_id: jobId, type: content.type, content: content, // created_at - }).single(); + }).select().single(); return (result.data as any).id; } From d57dbbd0c606f55836200281e981927b1ccfc491 Mon Sep 17 00:00:00 2001 From: Gergo Moricz Date: Wed, 24 Jul 2024 15:18:12 +0200 Subject: [PATCH 3/8] fix: add jobId for scrape --- apps/api/src/controllers/scrape.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/api/src/controllers/scrape.ts b/apps/api/src/controllers/scrape.ts index bfbb1e2a..c409371d 100644 --- a/apps/api/src/controllers/scrape.ts +++ b/apps/api/src/controllers/scrape.ts @@ -12,6 +12,7 @@ import { defaultPageOptions, defaultExtractorOptions, defaultTimeout, defaultOri import { v4 as uuidv4 } from "uuid"; export async function scrapeHelper( + jobId: string, req: Request, team_id: string, crawlerOptions: any, @@ -36,7 +37,7 @@ export async function scrapeHelper( const a = new WebScraperDataProvider(); await a.setOptions({ - jobId: uuidv4(), + jobId, mode: "single_urls", urls: [url], crawlerOptions: { @@ -129,8 +130,11 @@ export async function scrapeController(req: Request, res: Response) { checkCredits(); } + const jobId = uuidv4(); + const startTime = new Date().getTime(); const result = await scrapeHelper( + jobId, req, team_id, crawlerOptions, @@ -171,6 +175,7 @@ export async function scrapeController(req: Request, res: Response) { } logJob({ + job_id: jobId, success: result.success, message: result.error, num_docs: 1, From 64bcedeefc82e753e3939c6f29d473a7238ef8da Mon Sep 17 00:00:00 2001 From: Gergo Moricz Date: Wed, 24 Jul 2024 16:21:59 +0200 Subject: [PATCH 4/8] fix(monitoring): bad success check on scrape --- apps/api/src/scraper/WebScraper/single_url.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/api/src/scraper/WebScraper/single_url.ts b/apps/api/src/scraper/WebScraper/single_url.ts index 68474fed..61ee89af 100644 --- a/apps/api/src/scraper/WebScraper/single_url.ts +++ b/apps/api/src/scraper/WebScraper/single_url.ts @@ -267,7 +267,7 @@ export async function scrapSingleUrl( const insertedLogId = await logInsertPromise; ScrapeEvents.updateScrapeResult(insertedLogId, { - success: !!scraperResponse.metadata.pageError && !!text, + success: !scraperResponse.metadata.pageError && !!text, error: scraperResponse.metadata.pageError, response_code: scraperResponse.metadata.pageStatusCode, time_taken: Date.now() - timer, From 4d35ad073c86e74840dc0074cdee1e7a5a05ea90 Mon Sep 17 00:00:00 2001 From: Gergo Moricz Date: Wed, 24 Jul 2024 16:43:39 +0200 Subject: [PATCH 5/8] feat(monitoring/scrape): include url, worker, response_size --- apps/api/src/lib/scrape-events.ts | 3 +++ apps/api/src/scraper/WebScraper/single_url.ts | 3 +++ 2 files changed, 6 insertions(+) diff --git a/apps/api/src/lib/scrape-events.ts b/apps/api/src/lib/scrape-events.ts index d7958d14..9b050c30 100644 --- a/apps/api/src/lib/scrape-events.ts +++ b/apps/api/src/lib/scrape-events.ts @@ -9,10 +9,13 @@ export type ScrapeErrorEvent = { export type ScrapeScrapeEvent = { type: "scrape", + url: string, + worker?: string, method: (typeof baseScrapers)[number], result: null | { success: boolean, response_code?: number, + response_size?: number, error?: string | object, // proxy?: string, time_taken: number, diff --git a/apps/api/src/scraper/WebScraper/single_url.ts b/apps/api/src/scraper/WebScraper/single_url.ts index 61ee89af..4b428d35 100644 --- a/apps/api/src/scraper/WebScraper/single_url.ts +++ b/apps/api/src/scraper/WebScraper/single_url.ts @@ -150,6 +150,8 @@ export async function scrapSingleUrl( const timer = Date.now(); const logInsertPromise = ScrapeEvents.insert(jobId, { type: "scrape", + url, + worker: process.env.FLY_MACHINE_ID, method, result: null, }); @@ -267,6 +269,7 @@ export async function scrapSingleUrl( const insertedLogId = await logInsertPromise; ScrapeEvents.updateScrapeResult(insertedLogId, { + response_size: scraperResponse.text.length, success: !scraperResponse.metadata.pageError && !!text, error: scraperResponse.metadata.pageError, response_code: scraperResponse.metadata.pageStatusCode, From 60c74357dfbb133658c2cb73b40f0573c2ed07e8 Mon Sep 17 00:00:00 2001 From: Gergo Moricz Date: Wed, 24 Jul 2024 18:44:14 +0200 Subject: [PATCH 6/8] feat(ScrapeEvents): log queue events --- apps/api/src/index.ts | 10 ++++++++++ apps/api/src/lib/scrape-events.ts | 11 ++++++++++- apps/api/src/services/queue-worker.ts | 9 +++++++++ 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 700cc98e..cbd37eb7 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -13,6 +13,7 @@ import { checkAlerts } from "./services/alerts"; import Redis from "ioredis"; import { redisRateLimitClient } from "./services/rate-limiter"; import { Logger } from "./lib/logger"; +import { ScrapeEvents } from "./lib/scrape-events"; const { createBullBoard } = require("@bull-board/api"); const { BullAdapter } = require("@bull-board/api/bullAdapter"); @@ -325,3 +326,12 @@ if (cluster.isMaster) { Logger.info(`Worker ${process.pid} started`); } + +const wsq = getWebScraperQueue(); + +wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); +wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active")); +wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed")); +wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused")); +wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); +wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); diff --git a/apps/api/src/lib/scrape-events.ts b/apps/api/src/lib/scrape-events.ts index 9b050c30..7015c92d 100644 --- a/apps/api/src/lib/scrape-events.ts +++ b/apps/api/src/lib/scrape-events.ts @@ -1,3 +1,4 @@ +import { Job, JobId } from "bull"; import type { baseScrapers } from "../scraper/WebScraper/single_url"; import { supabase_service as supabase } from "../services/supabase"; @@ -24,7 +25,7 @@ export type ScrapeScrapeEvent = { export type ScrapeQueueEvent = { type: "queue", - event: "created" | "started" | "interrupted" | "finished", + event: "waiting" | "active" | "completed" | "paused" | "resumed" | "removed", worker?: string, } @@ -58,4 +59,12 @@ export class ScrapeEvents { } }).eq("id", logId); } + + static async logJobEvent(job: Job | JobId, event: ScrapeQueueEvent["event"]) { + await this.insert(((job as any).id ? (job as any).id : job) as string, { + type: "queue", + event, + worker: process.env.FLY_MACHINE_ID, + }); + } } diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 532c5bc1..e7767809 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -8,6 +8,7 @@ import { logJob } from "./logging/log_job"; import { initSDK } from '@hyperdx/node-opentelemetry'; import { Job } from "bull"; import { Logger } from "../lib/logger"; +import { ScrapeEvents } from "../lib/scrape-events"; if (process.env.ENV === 'production') { initSDK({ @@ -20,6 +21,7 @@ const wsq = getWebScraperQueue(); async function processJob(job: Job, done) { Logger.debug(`🐂 Worker taking job ${job.id}`); + try { job.progress({ current: 1, @@ -114,3 +116,10 @@ wsq.process( Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)), processJob ); + +wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); +wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active")); +wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed")); +wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused")); +wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); +wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); From cc98f83fdab20efbc48b1632372a75b866857395 Mon Sep 17 00:00:00 2001 From: rafaelsideguide <150964962+rafaelsideguide@users.noreply.github.com> Date: Wed, 24 Jul 2024 15:25:36 -0300 Subject: [PATCH 7/8] added failed and completed log events --- apps/api/src/lib/scrape-events.ts | 2 +- apps/api/src/main/runWebScraper.ts | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/api/src/lib/scrape-events.ts b/apps/api/src/lib/scrape-events.ts index 7015c92d..dda03c38 100644 --- a/apps/api/src/lib/scrape-events.ts +++ b/apps/api/src/lib/scrape-events.ts @@ -25,7 +25,7 @@ export type ScrapeScrapeEvent = { export type ScrapeQueueEvent = { type: "queue", - event: "waiting" | "active" | "completed" | "paused" | "resumed" | "removed", + event: "waiting" | "active" | "completed" | "paused" | "resumed" | "removed" | "failed", worker?: string, } diff --git a/apps/api/src/main/runWebScraper.ts b/apps/api/src/main/runWebScraper.ts index 5c97ef41..5e7d2279 100644 --- a/apps/api/src/main/runWebScraper.ts +++ b/apps/api/src/main/runWebScraper.ts @@ -11,6 +11,7 @@ import { billTeam } from "../services/billing/credit_billing"; import { Document } from "../lib/entities"; import { supabase_service } from "../services/supabase"; import { Logger } from "../lib/logger"; +import { ScrapeEvents } from "../lib/scrape-events"; export async function startWebScraperPipeline({ job, @@ -39,6 +40,7 @@ export async function startWebScraperPipeline({ }, onError: (error) => { Logger.error(`🐂 Job failed ${job.id}`); + ScrapeEvents.logJobEvent(job, "failed"); job.moveToFailed(error); }, team_id: job.data.team_id, @@ -140,6 +142,7 @@ const saveJob = async (job: Job, result: any) => { // I think the job won't exist here anymore } } + ScrapeEvents.logJobEvent(job, "completed"); } catch (error) { Logger.error(`🐂 Failed to update job status: ${error}`); } From 50d2426fc496d4feba7a6bd013a790480cc5bd14 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Thu, 25 Jul 2024 16:20:29 -0400 Subject: [PATCH 8/8] Update scrape-events.ts --- apps/api/src/lib/scrape-events.ts | 52 ++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/apps/api/src/lib/scrape-events.ts b/apps/api/src/lib/scrape-events.ts index dda03c38..ab4ef681 100644 --- a/apps/api/src/lib/scrape-events.ts +++ b/apps/api/src/lib/scrape-events.ts @@ -1,6 +1,7 @@ import { Job, JobId } from "bull"; import type { baseScrapers } from "../scraper/WebScraper/single_url"; import { supabase_service as supabase } from "../services/supabase"; +import { Logger } from "./logger"; export type ScrapeErrorEvent = { type: "error", @@ -36,13 +37,18 @@ export class ScrapeEvents { if (jobId === "TEST") return null; if (process.env.USE_DB_AUTHENTICATION) { - const result = await supabase.from("scrape_events").insert({ - job_id: jobId, - type: content.type, - content: content, - // created_at - }).select().single(); - return (result.data as any).id; + try { + const result = await supabase.from("scrape_events").insert({ + job_id: jobId, + type: content.type, + content: content, + // created_at + }).select().single(); + return (result.data as any).id; + } catch (error) { + Logger.error(`Error inserting scrape event: ${error}`); + return null; + } } return null; @@ -51,20 +57,28 @@ export class ScrapeEvents { static async updateScrapeResult(logId: number | null, result: ScrapeScrapeEvent["result"]) { if (logId === null) return; - const previousLog = (await supabase.from("scrape_events").select().eq("id", logId).single()).data as any; - await supabase.from("scrape_events").update({ - content: { - ...previousLog.content, - result, - } - }).eq("id", logId); + try { + const previousLog = (await supabase.from("scrape_events").select().eq("id", logId).single()).data as any; + await supabase.from("scrape_events").update({ + content: { + ...previousLog.content, + result, + } + }).eq("id", logId); + } catch (error) { + Logger.error(`Error updating scrape result: ${error}`); + } } static async logJobEvent(job: Job | JobId, event: ScrapeQueueEvent["event"]) { - await this.insert(((job as any).id ? (job as any).id : job) as string, { - type: "queue", - event, - worker: process.env.FLY_MACHINE_ID, - }); + try { + await this.insert(((job as any).id ? (job as any).id : job) as string, { + type: "queue", + event, + worker: process.env.FLY_MACHINE_ID, + }); + } catch (error) { + Logger.error(`Error logging job event: ${error}`); + } } }