Nick: slack alerts

This commit is contained in:
Nicolas 2024-07-12 19:07:59 -04:00
parent 214a6ee608
commit fd18f2269b
4 changed files with 100 additions and 5 deletions

View File

@ -9,6 +9,8 @@ import { initSDK } from "@hyperdx/node-opentelemetry";
import cluster from "cluster";
import os from "os";
import { Job } from "bull";
import { sendSlackWebhook } from "./services/alerts/slack";
import { initAlerts } from "./services/alerts";
const { createBullBoard } = require("@bull-board/api");
const { BullAdapter } = require("@bull-board/api/bullAdapter");
@ -32,6 +34,8 @@ if (cluster.isMaster) {
cluster.fork();
}
});
initAlerts();
} else {
const app = express();
@ -290,6 +294,7 @@ if (cluster.isMaster) {
});
console.log(`Worker ${process.pid} started`);
}

View File

@ -1,5 +1,10 @@
import { Job } from "bull";
import { CrawlResult, WebScraperOptions, RunWebScraperParams, RunWebScraperResult } from "../types";
import {
CrawlResult,
WebScraperOptions,
RunWebScraperParams,
RunWebScraperResult,
} from "../types";
import { WebScraperDataProvider } from "../scraper/WebScraper";
import { DocumentUrl, Progress } from "../lib/entities";
import { billTeam } from "../services/billing/credit_billing";
@ -118,12 +123,19 @@ const saveJob = async (job: Job, result: any) => {
.eq("job_id", job.id);
if (error) throw new Error(error.message);
await job.moveToCompleted(null);
try {
await job.moveToCompleted(null);
} catch (error) {
// I think the job won't exist here anymore
}
} else {
await job.moveToCompleted(result);
try {
await job.moveToCompleted(result);
} catch (error) {
// I think the job won't exist here anymore
}
}
} catch (error) {
console.error("Failed to update job status:", error);
}
}
};

View File

@ -0,0 +1,55 @@
import { getWebScraperQueue } from "../queue-service";
import { sendSlackWebhook } from "./slack";
export function initAlerts() {
if (
process.env.SLACK_WEBHOOK_URL &&
process.env.ENV === "production" &&
process.env.ALERT_NUM_ACTIVE_JOBS &&
process.env.ALERT_NUM_WAITING_JOBS
) {
console.info("Initializing alerts");
const checkActiveJobs = async () => {
try {
const webScraperQueue = getWebScraperQueue();
const activeJobs = await webScraperQueue.getActiveCount();
if (activeJobs > Number(process.env.ALERT_NUM_ACTIVE_JOBS)) {
console.warn(
`Alert: Number of active jobs is over ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}.`
);
sendSlackWebhook(
`Alert: Number of active jobs is over ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}`,
true
);
} else {
console.info(
`Number of active jobs is under ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}`
);
}
} catch (error) {
console.error("Failed to check active jobs:", error);
}
};
const checkWaitingQueue = async () => {
const webScraperQueue = getWebScraperQueue();
const waitingJobs = await webScraperQueue.getWaitingCount();
if (waitingJobs > Number(process.env.ALERT_NUM_WAITING_JOBS)) {
console.warn(
`Alert: Number of waiting jobs is over ${process.env.ALERT_NUM_WAITING_JOBS}. Current waiting jobs: ${waitingJobs}.`
);
sendSlackWebhook(
`Alert: Number of waiting jobs is over ${process.env.ALERT_NUM_WAITING_JOBS}. Current waiting jobs: ${waitingJobs}. Scale up the number of workers with fly scale count worker=20`,
true
);
}
};
const checkAll = async () => {
await checkActiveJobs();
await checkWaitingQueue();
};
setInterval(checkAll, 5 * 60 * 1000); // Run every 5 minutes
}
}

View File

@ -0,0 +1,23 @@
import axios from "axios";
export async function sendSlackWebhook(
message: string,
alertEveryone: boolean = false
) {
const webhookUrl = process.env.SLACK_WEBHOOK_URL;
const messagePrefix = alertEveryone ? "<!channel> " : "";
const payload = {
text: `${messagePrefix} ${message}`,
};
try {
const response = await axios.post(webhookUrl, payload, {
headers: {
"Content-Type": "application/json",
},
});
console.log("Webhook sent successfully:", response.data);
} catch (error) {
console.error("Error sending webhook:", error);
}
}