From fd18f2269bbb56b8150a62dd6da3cde70df17d08 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 12 Jul 2024 19:07:59 -0400 Subject: [PATCH] Nick: slack alerts --- apps/api/src/index.ts | 5 +++ apps/api/src/main/runWebScraper.ts | 22 ++++++++--- apps/api/src/services/alerts/index.ts | 55 +++++++++++++++++++++++++++ apps/api/src/services/alerts/slack.ts | 23 +++++++++++ 4 files changed, 100 insertions(+), 5 deletions(-) create mode 100644 apps/api/src/services/alerts/index.ts create mode 100644 apps/api/src/services/alerts/slack.ts diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 1cd03eb0..0ef67e6f 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -9,6 +9,8 @@ import { initSDK } from "@hyperdx/node-opentelemetry"; import cluster from "cluster"; import os from "os"; import { Job } from "bull"; +import { sendSlackWebhook } from "./services/alerts/slack"; +import { initAlerts } from "./services/alerts"; const { createBullBoard } = require("@bull-board/api"); const { BullAdapter } = require("@bull-board/api/bullAdapter"); @@ -32,6 +34,8 @@ if (cluster.isMaster) { cluster.fork(); } }); + + initAlerts(); } else { const app = express(); @@ -290,6 +294,7 @@ if (cluster.isMaster) { }); + console.log(`Worker ${process.pid} started`); } diff --git a/apps/api/src/main/runWebScraper.ts b/apps/api/src/main/runWebScraper.ts index 2b141daa..3c98e11e 100644 --- a/apps/api/src/main/runWebScraper.ts +++ b/apps/api/src/main/runWebScraper.ts @@ -1,5 +1,10 @@ import { Job } from "bull"; -import { CrawlResult, WebScraperOptions, RunWebScraperParams, RunWebScraperResult } from "../types"; +import { + CrawlResult, + WebScraperOptions, + RunWebScraperParams, + RunWebScraperResult, +} from "../types"; import { WebScraperDataProvider } from "../scraper/WebScraper"; import { DocumentUrl, Progress } from "../lib/entities"; import { billTeam } from "../services/billing/credit_billing"; @@ -118,12 +123,19 @@ const saveJob = async (job: Job, result: any) => { .eq("job_id", job.id); if (error) throw new Error(error.message); - await job.moveToCompleted(null); + try { + await job.moveToCompleted(null); + } catch (error) { + // I think the job won't exist here anymore + } } else { - await job.moveToCompleted(result); + try { + await job.moveToCompleted(result); + } catch (error) { + // I think the job won't exist here anymore + } } } catch (error) { console.error("Failed to update job status:", error); } -} - +}; diff --git a/apps/api/src/services/alerts/index.ts b/apps/api/src/services/alerts/index.ts new file mode 100644 index 00000000..2b52d95e --- /dev/null +++ b/apps/api/src/services/alerts/index.ts @@ -0,0 +1,55 @@ +import { getWebScraperQueue } from "../queue-service"; +import { sendSlackWebhook } from "./slack"; + +export function initAlerts() { + if ( + process.env.SLACK_WEBHOOK_URL && + process.env.ENV === "production" && + process.env.ALERT_NUM_ACTIVE_JOBS && + process.env.ALERT_NUM_WAITING_JOBS + ) { + console.info("Initializing alerts"); + const checkActiveJobs = async () => { + try { + const webScraperQueue = getWebScraperQueue(); + const activeJobs = await webScraperQueue.getActiveCount(); + if (activeJobs > Number(process.env.ALERT_NUM_ACTIVE_JOBS)) { + console.warn( + `Alert: Number of active jobs is over ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}.` + ); + sendSlackWebhook( + `Alert: Number of active jobs is over ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}`, + true + ); + } else { + console.info( + `Number of active jobs is under ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}` + ); + } + } catch (error) { + console.error("Failed to check active jobs:", error); + } + }; + + const checkWaitingQueue = async () => { + const webScraperQueue = getWebScraperQueue(); + const waitingJobs = await webScraperQueue.getWaitingCount(); + if (waitingJobs > Number(process.env.ALERT_NUM_WAITING_JOBS)) { + console.warn( + `Alert: Number of waiting jobs is over ${process.env.ALERT_NUM_WAITING_JOBS}. Current waiting jobs: ${waitingJobs}.` + ); + sendSlackWebhook( + `Alert: Number of waiting jobs is over ${process.env.ALERT_NUM_WAITING_JOBS}. Current waiting jobs: ${waitingJobs}. Scale up the number of workers with fly scale count worker=20`, + true + ); + } + }; + + const checkAll = async () => { + await checkActiveJobs(); + await checkWaitingQueue(); + }; + + setInterval(checkAll, 5 * 60 * 1000); // Run every 5 minutes + } +} diff --git a/apps/api/src/services/alerts/slack.ts b/apps/api/src/services/alerts/slack.ts new file mode 100644 index 00000000..f65035b1 --- /dev/null +++ b/apps/api/src/services/alerts/slack.ts @@ -0,0 +1,23 @@ +import axios from "axios"; + +export async function sendSlackWebhook( + message: string, + alertEveryone: boolean = false +) { + const webhookUrl = process.env.SLACK_WEBHOOK_URL; + const messagePrefix = alertEveryone ? " " : ""; + const payload = { + text: `${messagePrefix} ${message}`, + }; + + try { + const response = await axios.post(webhookUrl, payload, { + headers: { + "Content-Type": "application/json", + }, + }); + console.log("Webhook sent successfully:", response.data); + } catch (error) { + console.error("Error sending webhook:", error); + } +}