From 2014d9dd2e18a1f57a23bb6f81eba395b7e5d360 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Thu, 25 Jul 2024 14:54:20 -0400 Subject: [PATCH] Nick: admin router --- apps/api/src/controllers/admin/queue.ts | 87 ++++++++++ .../api/src/controllers/admin/redis-health.ts | 86 ++++++++++ apps/api/src/index.ts | 161 +----------------- apps/api/src/routes/admin.ts | 29 ++++ 4 files changed, 204 insertions(+), 159 deletions(-) create mode 100644 apps/api/src/controllers/admin/queue.ts create mode 100644 apps/api/src/controllers/admin/redis-health.ts create mode 100644 apps/api/src/routes/admin.ts diff --git a/apps/api/src/controllers/admin/queue.ts b/apps/api/src/controllers/admin/queue.ts new file mode 100644 index 00000000..cb5f99ed --- /dev/null +++ b/apps/api/src/controllers/admin/queue.ts @@ -0,0 +1,87 @@ +import { Request, Response } from "express"; + +import { Job } from "bull"; +import { Logger } from "../../lib/logger"; +import { getWebScraperQueue } from "../../services/queue-service"; +import { checkAlerts } from "../../services/alerts"; + +export async function cleanBefore24hCompleteJobsController( + req: Request, + res: Response +) { + Logger.info("🐂 Cleaning jobs older than 24h"); + try { + const webScraperQueue = getWebScraperQueue(); + const batchSize = 10; + const numberOfBatches = 9; // Adjust based on your needs + const completedJobsPromises: Promise[] = []; + for (let i = 0; i < numberOfBatches; i++) { + completedJobsPromises.push( + webScraperQueue.getJobs( + ["completed"], + i * batchSize, + i * batchSize + batchSize, + true + ) + ); + } + const completedJobs: Job[] = ( + await Promise.all(completedJobsPromises) + ).flat(); + const before24hJobs = + completedJobs.filter( + (job) => job.finishedOn < Date.now() - 24 * 60 * 60 * 1000 + ) || []; + + let count = 0; + + if (!before24hJobs) { + return res.status(200).send(`No jobs to remove.`); + } + + for (const job of before24hJobs) { + try { + await job.remove(); + count++; + } catch (jobError) { + Logger.error(`🐂 Failed to remove job with ID ${job.id}: ${jobError}`); + } + } + return res.status(200).send(`Removed ${count} completed jobs.`); + } catch (error) { + Logger.error(`🐂 Failed to clean last 24h complete jobs: ${error}`); + return res.status(500).send("Failed to clean jobs"); + } +} + + +export async function checkQueuesController(req: Request, res: Response) { + try { + await checkAlerts(); + return res.status(200).send("Alerts initialized"); + } catch (error) { + Logger.debug(`Failed to initialize alerts: ${error}`); + return res.status(500).send("Failed to initialize alerts"); + } + } + + // Use this as a "health check" that way we dont destroy the server +export async function queuesController(req: Request, res: Response) { + try { + const webScraperQueue = getWebScraperQueue(); + + const [webScraperActive] = await Promise.all([ + webScraperQueue.getActiveCount(), + ]); + + const noActiveJobs = webScraperActive === 0; + // 200 if no active jobs, 503 if there are active jobs + return res.status(noActiveJobs ? 200 : 500).json({ + webScraperActive, + noActiveJobs, + }); + } catch (error) { + Logger.error(error); + return res.status(500).json({ error: error.message }); + } + } \ No newline at end of file diff --git a/apps/api/src/controllers/admin/redis-health.ts b/apps/api/src/controllers/admin/redis-health.ts new file mode 100644 index 00000000..e35d6db9 --- /dev/null +++ b/apps/api/src/controllers/admin/redis-health.ts @@ -0,0 +1,86 @@ +import { Request, Response } from "express"; +import Redis from "ioredis"; +import { Logger } from "../../lib/logger"; +import { sendSlackWebhook } from "../../services/alerts/slack"; +import { redisRateLimitClient } from "../../services/rate-limiter"; + +export async function redisHealthController(req: Request, res: Response) { + const retryOperation = async (operation, retries = 3) => { + for (let attempt = 1; attempt <= retries; attempt++) { + try { + return await operation(); + } catch (error) { + if (attempt === retries) throw error; + Logger.warn(`Attempt ${attempt} failed: ${error.message}. Retrying...`); + await new Promise((resolve) => setTimeout(resolve, 2000)); // Wait 2 seconds before retrying + } + } + }; + + try { + const queueRedis = new Redis(process.env.REDIS_URL); + + const testKey = "test"; + const testValue = "test"; + + // Test queueRedis + let queueRedisHealth; + try { + await retryOperation(() => queueRedis.set(testKey, testValue)); + queueRedisHealth = await retryOperation(() => queueRedis.get(testKey)); + await retryOperation(() => queueRedis.del(testKey)); + } catch (error) { + Logger.error(`queueRedis health check failed: ${error}`); + queueRedisHealth = null; + } + + // Test redisRateLimitClient + let redisRateLimitHealth; + try { + await retryOperation(() => redisRateLimitClient.set(testKey, testValue)); + redisRateLimitHealth = await retryOperation(() => + redisRateLimitClient.get(testKey) + ); + await retryOperation(() => redisRateLimitClient.del(testKey)); + } catch (error) { + Logger.error(`redisRateLimitClient health check failed: ${error}`); + redisRateLimitHealth = null; + } + + const healthStatus = { + queueRedis: queueRedisHealth === testValue ? "healthy" : "unhealthy", + redisRateLimitClient: + redisRateLimitHealth === testValue ? "healthy" : "unhealthy", + }; + + if ( + healthStatus.queueRedis === "healthy" && + healthStatus.redisRateLimitClient === "healthy" + ) { + Logger.info("Both Redis instances are healthy"); + return res.status(200).json({ status: "healthy", details: healthStatus }); + } else { + Logger.info( + `Redis instances health check: ${JSON.stringify(healthStatus)}` + ); + await sendSlackWebhook( + `[REDIS DOWN] Redis instances health check: ${JSON.stringify( + healthStatus + )}`, + true + ); + return res + .status(500) + .json({ status: "unhealthy", details: healthStatus }); + } + } catch (error) { + Logger.error(`Redis health check failed: ${error}`); + await sendSlackWebhook( + `[REDIS DOWN] Redis instances health check: ${error.message}`, + true + ); + return res + .status(500) + .json({ status: "unhealthy", message: error.message }); + } +} diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 98243756..69433d6b 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -7,12 +7,8 @@ import { v0Router } from "./routes/v0"; import { initSDK } from "@hyperdx/node-opentelemetry"; import cluster from "cluster"; import os from "os"; -import { Job } from "bull"; -import { sendSlackWebhook } from "./services/alerts/slack"; -import { checkAlerts } from "./services/alerts"; -import Redis from "ioredis"; -import { redisRateLimitClient } from "./services/rate-limiter"; import { Logger } from "./lib/logger"; +import { adminRouter } from "./routes/admin"; const { createBullBoard } = require("@bull-board/api"); const { BullAdapter } = require("@bull-board/api/bullAdapter"); @@ -46,7 +42,6 @@ if (cluster.isMaster) { app.use(cors()); // Add this line to enable CORS - const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`); @@ -71,6 +66,7 @@ if (cluster.isMaster) { // register router app.use(v0Router); + app.use(adminRouter); const DEFAULT_PORT = process.env.PORT ?? 3002; const HOST = process.env.HOST ?? "localhost"; @@ -94,27 +90,6 @@ if (cluster.isMaster) { startServer(); } - // Use this as a "health check" that way we dont destroy the server - app.get(`/admin/${process.env.BULL_AUTH_KEY}/queues`, async (req, res) => { - try { - const webScraperQueue = getWebScraperQueue(); - - const [webScraperActive] = await Promise.all([ - webScraperQueue.getActiveCount(), - ]); - - const noActiveJobs = webScraperActive === 0; - // 200 if no active jobs, 503 if there are active jobs - return res.status(noActiveJobs ? 200 : 500).json({ - webScraperActive, - noActiveJobs, - }); - } catch (error) { - Logger.error(error); - return res.status(500).json({ error: error.message }); - } - }); - app.get(`/serverHealthCheck`, async (req, res) => { try { const webScraperQueue = getWebScraperQueue(); @@ -187,141 +162,9 @@ if (cluster.isMaster) { } }); - app.get( - `/admin/${process.env.BULL_AUTH_KEY}/check-queues`, - async (req, res) => { - try { - await checkAlerts(); - return res.status(200).send("Alerts initialized"); - } catch (error) { - Logger.debug(`Failed to initialize alerts: ${error}`); - return res.status(500).send("Failed to initialize alerts"); - } - } - ); - - app.get( - `/admin/${process.env.BULL_AUTH_KEY}/clean-before-24h-complete-jobs`, - async (req, res) => { - Logger.info("🐂 Cleaning jobs older than 24h"); - try { - const webScraperQueue = getWebScraperQueue(); - const batchSize = 10; - const numberOfBatches = 9; // Adjust based on your needs - const completedJobsPromises: Promise[] = []; - for (let i = 0; i < numberOfBatches; i++) { - completedJobsPromises.push( - webScraperQueue.getJobs( - ["completed"], - i * batchSize, - i * batchSize + batchSize, - true - ) - ); - } - const completedJobs: Job[] = ( - await Promise.all(completedJobsPromises) - ).flat(); - const before24hJobs = - completedJobs.filter( - (job) => job.finishedOn < Date.now() - 24 * 60 * 60 * 1000 - ) || []; - - let count = 0; - - if (!before24hJobs) { - return res.status(200).send(`No jobs to remove.`); - } - - for (const job of before24hJobs) { - try { - await job.remove(); - count++; - } catch (jobError) { - Logger.error(`🐂 Failed to remove job with ID ${job.id}: ${jobError}` ); - } - } - return res.status(200).send(`Removed ${count} completed jobs.`); - } catch (error) { - Logger.error(`🐂 Failed to clean last 24h complete jobs: ${error}`); - return res.status(500).send("Failed to clean jobs"); - } - } - ); - app.get("/is-production", (req, res) => { res.send({ isProduction: global.isProduction }); }); - app.get( - `/admin/${process.env.BULL_AUTH_KEY}/redis-health`, - async (req, res) => { - try { - const queueRedis = new Redis(process.env.REDIS_URL); - - const testKey = "test"; - const testValue = "test"; - - // Test queueRedis - let queueRedisHealth; - try { - await queueRedis.set(testKey, testValue); - queueRedisHealth = await queueRedis.get(testKey); - await queueRedis.del(testKey); - } catch (error) { - Logger.error(`queueRedis health check failed: ${error}`); - queueRedisHealth = null; - } - - // Test redisRateLimitClient - let redisRateLimitHealth; - try { - await redisRateLimitClient.set(testKey, testValue); - redisRateLimitHealth = await redisRateLimitClient.get(testKey); - await redisRateLimitClient.del(testKey); - } catch (error) { - Logger.error(`redisRateLimitClient health check failed: ${error}`); - redisRateLimitHealth = null; - } - - const healthStatus = { - queueRedis: queueRedisHealth === testValue ? "healthy" : "unhealthy", - redisRateLimitClient: - redisRateLimitHealth === testValue ? "healthy" : "unhealthy", - }; - - if ( - healthStatus.queueRedis === "healthy" && - healthStatus.redisRateLimitClient === "healthy" - ) { - Logger.info("Both Redis instances are healthy"); - return res - .status(200) - .json({ status: "healthy", details: healthStatus }); - } else { - Logger.info(`Redis instances health check: ${JSON.stringify(healthStatus)}`); - await sendSlackWebhook( - `[REDIS DOWN] Redis instances health check: ${JSON.stringify( - healthStatus - )}`, - true - ); - return res - .status(500) - .json({ status: "unhealthy", details: healthStatus }); - } - } catch (error) { - Logger.error(`Redis health check failed: ${error}`); - await sendSlackWebhook( - `[REDIS DOWN] Redis instances health check: ${error.message}`, - true - ); - return res - .status(500) - .json({ status: "unhealthy", message: error.message }); - } - } - ); - Logger.info(`Worker ${process.pid} started`); } diff --git a/apps/api/src/routes/admin.ts b/apps/api/src/routes/admin.ts new file mode 100644 index 00000000..81766184 --- /dev/null +++ b/apps/api/src/routes/admin.ts @@ -0,0 +1,29 @@ +import express from "express"; +import { redisHealthController } from "../controllers/admin/redis-health"; +import { + checkQueuesController, + cleanBefore24hCompleteJobsController, + queuesController, +} from "../controllers/admin/queue"; + +export const adminRouter = express.Router(); + +adminRouter.post( + `/admin/${process.env.BULL_AUTH_KEY}/redis-health`, + redisHealthController +); + +adminRouter.post( + `/admin/${process.env.BULL_AUTH_KEY}/clean-before-24h-complete-jobs`, + cleanBefore24hCompleteJobsController +); + +adminRouter.post( + `/admin/${process.env.BULL_AUTH_KEY}/check-queues`, + checkQueuesController +); + +adminRouter.post( + `/admin/${process.env.BULL_AUTH_KEY}/queues`, + queuesController +);