feat(queue-worker): liveness check endpoint

This commit is contained in:
Gergő Móricz 2025-05-16 14:15:48 +02:00
parent b5b612c35b
commit f936befcdb

View File

@ -81,6 +81,7 @@ import { updateGeneratedLlmsTxt } from "../lib/generate-llmstxt/generate-llmstxt
import { performExtraction_F0 } from "../lib/extract/fire-0/extraction-service-f0";
import { CostTracking } from "../lib/extract/extraction-service";
import { getACUCTeam } from "../controllers/auth";
import Express from "express";
configDotenv();
@ -688,6 +689,7 @@ const processGenerateLlmsTxtJobInternal = async (
};
let isShuttingDown = false;
let isWorkerStalled = false;
process.on("SIGINT", () => {
console.log("Received SIGTERM. Shutting down gracefully...");
@ -731,7 +733,9 @@ const workerFun = async (
logger.info("Can't accept connection due to RAM/CPU load");
cantAcceptConnectionCount++;
if (cantAcceptConnectionCount >= 25) {
isWorkerStalled = cantAcceptConnectionCount >= 25;
if (isWorkerStalled) {
logger.error("WORKER STALLED", {
cpuUsage: await monitor.checkCpuUsage(),
memoryUsage: await monitor.checkMemoryUsage(),
@ -1526,6 +1530,20 @@ async function processJob(job: Job & { id: string }, token: string) {
// wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));
// Start all workers
const app = Express();
app.get("/liveness", (req, res) => {
if (isWorkerStalled) {
res.status(500).json({ ok: false });
} else {
res.status(200).json({ ok: true });
}
});
app.listen(3005, () => {
_logger.info("Liveness endpoint is running on port 3005");
});
(async () => {
await Promise.all([
workerFun(getScrapeQueue(), processJobInternal),