diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 2a1e47f7..04181c9e 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -81,6 +81,7 @@ import { updateGeneratedLlmsTxt } from "../lib/generate-llmstxt/generate-llmstxt import { performExtraction_F0 } from "../lib/extract/fire-0/extraction-service-f0"; import { CostTracking } from "../lib/extract/extraction-service"; import { getACUCTeam } from "../controllers/auth"; +import Express from "express"; configDotenv(); @@ -688,6 +689,7 @@ const processGenerateLlmsTxtJobInternal = async ( }; let isShuttingDown = false; +let isWorkerStalled = false; process.on("SIGINT", () => { console.log("Received SIGTERM. Shutting down gracefully..."); @@ -731,7 +733,9 @@ const workerFun = async ( logger.info("Can't accept connection due to RAM/CPU load"); cantAcceptConnectionCount++; - if (cantAcceptConnectionCount >= 25) { + isWorkerStalled = cantAcceptConnectionCount >= 25; + + if (isWorkerStalled) { logger.error("WORKER STALLED", { cpuUsage: await monitor.checkCpuUsage(), memoryUsage: await monitor.checkMemoryUsage(), @@ -1526,6 +1530,20 @@ async function processJob(job: Job & { id: string }, token: string) { // wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); // Start all workers +const app = Express(); + +app.get("/liveness", (req, res) => { + if (isWorkerStalled) { + res.status(500).json({ ok: false }); + } else { + res.status(200).json({ ok: true }); + } +}); + +app.listen(3005, () => { + _logger.info("Liveness endpoint is running on port 3005"); +}); + (async () => { await Promise.all([ workerFun(getScrapeQueue(), processJobInternal), @@ -1542,4 +1560,4 @@ async function processJob(job: Job & { id: string }, token: string) { console.log("All jobs finished. Worker out!"); process.exit(0); -})(); +})(); \ No newline at end of file