From 60c74357dfbb133658c2cb73b40f0573c2ed07e8 Mon Sep 17 00:00:00 2001 From: Gergo Moricz Date: Wed, 24 Jul 2024 18:44:14 +0200 Subject: [PATCH] feat(ScrapeEvents): log queue events --- apps/api/src/index.ts | 10 ++++++++++ apps/api/src/lib/scrape-events.ts | 11 ++++++++++- apps/api/src/services/queue-worker.ts | 9 +++++++++ 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 700cc98e..cbd37eb7 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -13,6 +13,7 @@ import { checkAlerts } from "./services/alerts"; import Redis from "ioredis"; import { redisRateLimitClient } from "./services/rate-limiter"; import { Logger } from "./lib/logger"; +import { ScrapeEvents } from "./lib/scrape-events"; const { createBullBoard } = require("@bull-board/api"); const { BullAdapter } = require("@bull-board/api/bullAdapter"); @@ -325,3 +326,12 @@ if (cluster.isMaster) { Logger.info(`Worker ${process.pid} started`); } + +const wsq = getWebScraperQueue(); + +wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); +wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active")); +wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed")); +wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused")); +wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); +wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); diff --git a/apps/api/src/lib/scrape-events.ts b/apps/api/src/lib/scrape-events.ts index 9b050c30..7015c92d 100644 --- a/apps/api/src/lib/scrape-events.ts +++ b/apps/api/src/lib/scrape-events.ts @@ -1,3 +1,4 @@ +import { Job, JobId } from "bull"; import type { baseScrapers } from "../scraper/WebScraper/single_url"; import { supabase_service as supabase } from "../services/supabase"; @@ -24,7 +25,7 @@ export type ScrapeScrapeEvent = { export type ScrapeQueueEvent = { type: "queue", - event: "created" | "started" | "interrupted" | "finished", + event: "waiting" | "active" | "completed" | "paused" | "resumed" | "removed", worker?: string, } @@ -58,4 +59,12 @@ export class ScrapeEvents { } }).eq("id", logId); } + + static async logJobEvent(job: Job | JobId, event: ScrapeQueueEvent["event"]) { + await this.insert(((job as any).id ? (job as any).id : job) as string, { + type: "queue", + event, + worker: process.env.FLY_MACHINE_ID, + }); + } } diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 532c5bc1..e7767809 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -8,6 +8,7 @@ import { logJob } from "./logging/log_job"; import { initSDK } from '@hyperdx/node-opentelemetry'; import { Job } from "bull"; import { Logger } from "../lib/logger"; +import { ScrapeEvents } from "../lib/scrape-events"; if (process.env.ENV === 'production') { initSDK({ @@ -20,6 +21,7 @@ const wsq = getWebScraperQueue(); async function processJob(job: Job, done) { Logger.debug(`🐂 Worker taking job ${job.id}`); + try { job.progress({ current: 1, @@ -114,3 +116,10 @@ wsq.process( Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)), processJob ); + +wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); +wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active")); +wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed")); +wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused")); +wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); +wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));