Merge branch 'main' into python-sdk/e2e-tests-all-params

This commit is contained in:
Nicolas 2025-05-16 15:54:30 -03:00
commit 94220a772b
16 changed files with 408 additions and 22 deletions

View File

@ -35,6 +35,7 @@ env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
VERTEX_CREDENTIALS: ${{ secrets.VERTEX_CREDENTIALS }} VERTEX_CREDENTIALS: ${{ secrets.VERTEX_CREDENTIALS }}
USE_GO_MARKDOWN_PARSER: true USE_GO_MARKDOWN_PARSER: true
SENTRY_ENVIRONMENT: dev
jobs: jobs:
test: test:
@ -53,6 +54,7 @@ jobs:
oauth-client-id: ${{ secrets.TS_OAUTH_CLIENT_ID }} oauth-client-id: ${{ secrets.TS_OAUTH_CLIENT_ID }}
oauth-secret: ${{ secrets.TS_OAUTH_SECRET }} oauth-secret: ${{ secrets.TS_OAUTH_SECRET }}
tags: tag:ci tags: tag:ci
use-cache: 'true'
- name: Install pnpm - name: Install pnpm
uses: pnpm/action-setup@v4 uses: pnpm/action-setup@v4
with: with:

View File

@ -0,0 +1,106 @@
import { createPdfCacheKey, savePdfResultToCache, getPdfResultFromCache } from '../../lib/gcs-pdf-cache';
jest.mock('@google-cloud/storage', () => {
const mockSave = jest.fn().mockResolvedValue(undefined);
const mockExists = jest.fn().mockResolvedValue([true]);
const mockDownload = jest.fn().mockResolvedValue([Buffer.from(JSON.stringify({
markdown: 'cached markdown',
html: 'cached html'
}))]);
const mockFile = jest.fn().mockImplementation((path) => ({
save: mockSave,
exists: mockExists,
download: mockDownload
}));
return {
Storage: jest.fn().mockImplementation(() => ({
bucket: jest.fn().mockImplementation(() => ({
file: mockFile
}))
})),
_getMockFile: () => mockFile,
_getMockSave: () => mockSave
};
});
process.env.GCS_BUCKET_NAME = 'test-bucket';
describe('PDF Caching', () => {
beforeEach(() => {
jest.clearAllMocks();
});
test('createPdfCacheKey generates consistent keys', () => {
const pdfContent1 = 'test-pdf-content';
const pdfContent2 = 'test-pdf-content';
const pdfContent3 = 'different-pdf-content';
const key1 = createPdfCacheKey(pdfContent1);
const key2 = createPdfCacheKey(pdfContent2);
const key3 = createPdfCacheKey(pdfContent3);
expect(key1).toBe(key2); // Same content should generate same key
expect(key1).not.toBe(key3); // Different content should generate different key
expect(key1).toMatch(/^[a-f0-9]{64}$/);
});
test('createPdfCacheKey works directly with base64 content', () => {
const base64Content = 'JVBERi0xLjMKJcTl8uXrp/Og0MTGCjQgMCBvYmoKPDwgL0xlbmd0aCA1IDAgUiAvRmlsdGVyIC9GbGF0ZURlY29kZSA+PgpzdHJlYW0KeAFLy';
const key = createPdfCacheKey(base64Content);
expect(key).toMatch(/^[a-f0-9]{64}$/);
expect(createPdfCacheKey(base64Content)).toBe(key);
});
test('savePdfResultToCache saves results to GCS', async () => {
const pdfContent = 'test-pdf-content';
const result = { markdown: 'test markdown', html: 'test html' };
const { _getMockFile, _getMockSave } = require('@google-cloud/storage');
const mockFile = _getMockFile();
const mockSave = _getMockSave();
mockFile.mockClear();
mockSave.mockClear();
const cacheKey = await savePdfResultToCache(pdfContent, result);
expect(cacheKey).not.toBeNull();
expect(mockFile).toHaveBeenCalledWith(expect.stringContaining('pdf-cache/'));
expect(mockSave).toHaveBeenCalledWith(JSON.stringify(result), {
contentType: 'application/json',
metadata: expect.objectContaining({
source: 'runpod_pdf_conversion',
cache_type: 'pdf_markdown',
created_at: expect.any(String)
})
});
});
test('getPdfResultFromCache retrieves results from GCS', async () => {
const pdfContent = 'test-pdf-content';
const result = await getPdfResultFromCache(pdfContent);
expect(result).not.toBeNull();
expect(result?.markdown).toBe('cached markdown');
expect(result?.html).toBe('cached html');
});
test('getPdfResultFromCache returns null when cache miss', async () => {
const { Storage } = require('@google-cloud/storage');
const mockExists = Storage().bucket().file().exists;
mockExists.mockResolvedValueOnce([false]);
const result = await getPdfResultFromCache('uncached-content');
expect(result).toBeNull();
});
});

View File

@ -89,7 +89,7 @@ export async function crawlController(
createdAt: Date.now(), createdAt: Date.now(),
}; };
const crawler = crawlToCrawler(id, sc, req.acuc.flags ?? null); const crawler = crawlToCrawler(id, sc, req.acuc?.flags ?? null);
try { try {
sc.robots = await crawler.getRobotsTxt(scrapeOptions.skipTlsVerification); sc.robots = await crawler.getRobotsTxt(scrapeOptions.skipTlsVerification);

View File

@ -325,7 +325,7 @@ export async function mapController(
abort: abort.signal, abort: abort.signal,
mock: req.body.useMock, mock: req.body.useMock,
filterByPath: req.body.filterByPath !== false, filterByPath: req.body.filterByPath !== false,
flags: req.acuc.flags ?? null, flags: req.acuc?.flags ?? null,
}), }),
...(req.body.timeout !== undefined ? [ ...(req.body.timeout !== undefined ? [
new Promise((resolve, reject) => setTimeout(() => { new Promise((resolve, reject) => setTimeout(() => {

View File

@ -18,7 +18,6 @@ import { logger } from "./lib/logger";
import { adminRouter } from "./routes/admin"; import { adminRouter } from "./routes/admin";
import http from "node:http"; import http from "node:http";
import https from "node:https"; import https from "node:https";
import CacheableLookup from "cacheable-lookup";
import { v1Router } from "./routes/v1"; import { v1Router } from "./routes/v1";
import expressWs from "express-ws"; import expressWs from "express-ws";
import { ErrorResponse, ResponseWithSentry } from "./controllers/v1/types"; import { ErrorResponse, ResponseWithSentry } from "./controllers/v1/types";
@ -26,6 +25,7 @@ import { ZodError } from "zod";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { RateLimiterMode } from "./types"; import { RateLimiterMode } from "./types";
import { attachWsProxy } from "./services/agentLivecastWS"; import { attachWsProxy } from "./services/agentLivecastWS";
import { cacheableLookup } from "./scraper/scrapeURL/lib/cacheableLookup";
const { createBullBoard } = require("@bull-board/api"); const { createBullBoard } = require("@bull-board/api");
const { BullAdapter } = require("@bull-board/api/bullAdapter"); const { BullAdapter } = require("@bull-board/api/bullAdapter");
@ -34,11 +34,9 @@ const { ExpressAdapter } = require("@bull-board/express");
const numCPUs = process.env.ENV === "local" ? 2 : os.cpus().length; const numCPUs = process.env.ENV === "local" ? 2 : os.cpus().length;
logger.info(`Number of CPUs: ${numCPUs} available`); logger.info(`Number of CPUs: ${numCPUs} available`);
const cacheable = new CacheableLookup();
// Install cacheable lookup for all other requests // Install cacheable lookup for all other requests
cacheable.install(http.globalAgent); cacheableLookup.install(http.globalAgent);
cacheable.install(https.globalAgent); cacheableLookup.install(https.globalAgent);
// Initialize Express with WebSocket support // Initialize Express with WebSocket support
const expressApp = express(); const expressApp = express();

View File

@ -106,6 +106,22 @@ import { getACUCTeam } from "../../../controllers/auth";
logger.error("No search results found", { logger.error("No search results found", {
query: request.prompt, query: request.prompt,
}); });
logJob({
job_id: extractId,
success: false,
message: "No search results found",
num_docs: 1,
docs: [],
time_taken: (new Date().getTime() - Date.now()) / 1000,
team_id: teamId,
mode: "extract",
url: request.urls?.join(", ") || "",
scrapeOptions: request,
origin: request.origin ?? "api",
num_tokens: 0,
tokens_billed: 0,
sources,
});
return { return {
success: false, success: false,
error: "No search results found", error: "No search results found",
@ -191,6 +207,22 @@ import { getACUCTeam } from "../../../controllers/auth";
logger.error("0 links! Bailing.", { logger.error("0 links! Bailing.", {
linkCount: links.length, linkCount: links.length,
}); });
logJob({
job_id: extractId,
success: false,
message: "No valid URLs found to scrape",
num_docs: 1,
docs: [],
time_taken: (new Date().getTime() - Date.now()) / 1000,
team_id: teamId,
mode: "extract",
url: request.urls?.join(", ") || "",
scrapeOptions: request,
origin: request.origin ?? "api",
num_tokens: 0,
tokens_billed: 0,
sources,
});
return { return {
success: false, success: false,
error: error:
@ -524,6 +556,22 @@ import { getACUCTeam } from "../../../controllers/auth";
} catch (error) { } catch (error) {
logger.error(`Failed to transform array to object`, { error }); logger.error(`Failed to transform array to object`, { error });
logJob({
job_id: extractId,
success: false,
message: "Failed to transform array to object",
num_docs: 1,
docs: [],
time_taken: (new Date().getTime() - Date.now()) / 1000,
team_id: teamId,
mode: "extract",
url: request.urls?.join(", ") || "",
scrapeOptions: request,
origin: request.origin ?? "api",
num_tokens: 0,
tokens_billed: 0,
sources,
});
return { return {
success: false, success: false,
error: error:
@ -602,6 +650,23 @@ import { getACUCTeam } from "../../../controllers/auth";
logger.debug("Scrapes finished.", { docCount: validResults.length }); logger.debug("Scrapes finished.", { docCount: validResults.length });
} catch (error) { } catch (error) {
logger.error("Failed to scrape documents", { error });
logJob({
job_id: extractId,
success: false,
message: "Failed to scrape documents",
num_docs: 1,
docs: [],
time_taken: (new Date().getTime() - Date.now()) / 1000,
team_id: teamId,
mode: "extract",
url: request.urls?.join(", ") || "",
scrapeOptions: request,
origin: request.origin ?? "api",
num_tokens: 0,
tokens_billed: 0,
sources,
});
return { return {
success: false, success: false,
error: error.message, error: error.message,
@ -614,6 +679,22 @@ import { getACUCTeam } from "../../../controllers/auth";
if (docsMap.size == 0) { if (docsMap.size == 0) {
// All urls are invalid // All urls are invalid
logger.error("All provided URLs are invalid!"); logger.error("All provided URLs are invalid!");
logJob({
job_id: extractId,
success: false,
message: "All provided URLs are invalid",
num_docs: 1,
docs: [],
time_taken: (new Date().getTime() - Date.now()) / 1000,
team_id: teamId,
mode: "extract",
url: request.urls?.join(", ") || "",
scrapeOptions: request,
origin: request.origin ?? "api",
num_tokens: 0,
tokens_billed: 0,
sources,
});
return { return {
success: false, success: false,
error: error:

View File

@ -0,0 +1,112 @@
import { Storage } from "@google-cloud/storage";
import { logger } from "./logger";
import crypto from "crypto";
const credentials = process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined;
const PDF_CACHE_PREFIX = "pdf-cache/";
/**
* Creates a SHA-256 hash of the PDF content to use as a cache key
* Directly hashes the content without any conversion
*/
export function createPdfCacheKey(pdfContent: string | Buffer): string {
return crypto
.createHash('sha256')
.update(pdfContent)
.digest('hex');
}
/**
* Save RunPod markdown results to GCS cache
*/
export async function savePdfResultToCache(
pdfContent: string,
result: { markdown: string; html: string }
): Promise<string | null> {
try {
if (!process.env.GCS_BUCKET_NAME) {
return null;
}
const cacheKey = createPdfCacheKey(pdfContent);
const storage = new Storage({ credentials });
const bucket = storage.bucket(process.env.GCS_BUCKET_NAME);
const blob = bucket.file(`${PDF_CACHE_PREFIX}${cacheKey}.json`);
for (let i = 0; i < 3; i++) {
try {
await blob.save(JSON.stringify(result), {
contentType: "application/json",
metadata: {
source: "runpod_pdf_conversion",
cache_type: "pdf_markdown",
created_at: new Date().toISOString(),
}
});
logger.info(`Saved PDF RunPod result to GCS cache`, {
cacheKey,
});
return cacheKey;
} catch (error) {
if (i === 2) {
throw error;
} else {
logger.error(`Error saving PDF RunPod result to GCS cache, retrying`, {
error,
cacheKey,
i,
});
}
}
}
return cacheKey;
} catch (error) {
logger.error(`Error saving PDF RunPod result to GCS cache`, {
error,
});
return null;
}
}
/**
* Get cached RunPod markdown results from GCS
*/
export async function getPdfResultFromCache(
pdfContent: string
): Promise<{ markdown: string; html: string } | null> {
try {
if (!process.env.GCS_BUCKET_NAME) {
return null;
}
const cacheKey = createPdfCacheKey(pdfContent);
const storage = new Storage({ credentials });
const bucket = storage.bucket(process.env.GCS_BUCKET_NAME);
const blob = bucket.file(`${PDF_CACHE_PREFIX}${cacheKey}.json`);
const [exists] = await blob.exists();
if (!exists) {
logger.debug(`PDF RunPod result not found in GCS cache`, {
cacheKey,
});
return null;
}
const [content] = await blob.download();
const result = JSON.parse(content.toString());
logger.info(`Retrieved PDF RunPod result from GCS cache`, {
cacheKey,
});
return result;
} catch (error) {
logger.error(`Error retrieving PDF RunPod result from GCS cache`, {
error,
});
return null;
}
}

View File

@ -7,6 +7,7 @@ import {
ActionError, ActionError,
EngineError, EngineError,
SiteError, SiteError,
SSLError,
UnsupportedFileError, UnsupportedFileError,
} from "../../error"; } from "../../error";
import { MockState } from "../../lib/mock"; import { MockState } from "../../lib/mock";
@ -169,7 +170,13 @@ export async function fireEngineCheckStatus(
typeof status.error === "string" && typeof status.error === "string" &&
status.error.includes("Chrome error: ") status.error.includes("Chrome error: ")
) { ) {
throw new SiteError(status.error.split("Chrome error: ")[1]); const code = status.error.split("Chrome error: ")[1];
if (code.includes("ERR_CERT_") || code.includes("ERR_SSL_") || code.includes("ERR_BAD_SSL_")) {
throw new SSLError();
} else {
throw new SiteError(code);
}
} else if ( } else if (
typeof status.error === "string" && typeof status.error === "string" &&
status.error.includes("File size exceeds") status.error.includes("File size exceeds")

View File

@ -17,6 +17,7 @@ import {
ActionError, ActionError,
EngineError, EngineError,
SiteError, SiteError,
SSLError,
TimeoutError, TimeoutError,
UnsupportedFileError, UnsupportedFileError,
} from "../../error"; } from "../../error";
@ -94,6 +95,7 @@ async function performFireEngineScrape<
} else if ( } else if (
error instanceof EngineError || error instanceof EngineError ||
error instanceof SiteError || error instanceof SiteError ||
error instanceof SSLError ||
error instanceof ActionError || error instanceof ActionError ||
error instanceof UnsupportedFileError error instanceof UnsupportedFileError
) { ) {

View File

@ -11,6 +11,7 @@ import { PDFAntibotError, RemoveFeatureError, UnsupportedFileError } from "../..
import { readFile, unlink } from "node:fs/promises"; import { readFile, unlink } from "node:fs/promises";
import path from "node:path"; import path from "node:path";
import type { Response } from "undici"; import type { Response } from "undici";
import { getPdfResultFromCache, savePdfResultToCache } from "../../../../lib/gcs-pdf-cache";
type PDFProcessorResult = { html: string; markdown?: string }; type PDFProcessorResult = { html: string; markdown?: string };
@ -26,6 +27,22 @@ async function scrapePDFWithRunPodMU(
tempFilePath, tempFilePath,
}); });
try {
const cachedResult = await getPdfResultFromCache(base64Content);
if (cachedResult) {
meta.logger.info("Using cached RunPod MU result for PDF", {
tempFilePath,
});
return cachedResult;
}
} catch (error) {
meta.logger.warn("Error checking PDF cache, proceeding with RunPod MU", {
error,
tempFilePath,
});
}
const result = await robustFetch({ const result = await robustFetch({
url: url:
"https://api.runpod.ai/v2/" + process.env.RUNPOD_MU_POD_ID + "/runsync", "https://api.runpod.ai/v2/" + process.env.RUNPOD_MU_POD_ID + "/runsync",
@ -50,10 +67,21 @@ async function scrapePDFWithRunPodMU(
mock: meta.mock, mock: meta.mock,
}); });
return { const processorResult = {
markdown: result.output.markdown, markdown: result.output.markdown,
html: await marked.parse(result.output.markdown, { async: true }), html: await marked.parse(result.output.markdown, { async: true }),
}; };
try {
await savePdfResultToCache(base64Content, processorResult);
} catch (error) {
meta.logger.warn("Error saving PDF to cache", {
error,
tempFilePath,
});
}
return processorResult;
} }
async function scrapePDFWithParsePDF( async function scrapePDFWithParsePDF(

View File

@ -2,6 +2,7 @@ import type { Socket } from "net";
import type { TLSSocket } from "tls"; import type { TLSSocket } from "tls";
import * as undici from "undici"; import * as undici from "undici";
import { Address6 } from "ip-address"; import { Address6 } from "ip-address";
import { cacheableLookup } from "../../lib/cacheableLookup";
export class InsecureConnectionError extends Error { export class InsecureConnectionError extends Error {
constructor() { constructor() {
@ -46,7 +47,7 @@ export function makeSecureDispatcher(
const agentOpts: undici.Agent.Options = { const agentOpts: undici.Agent.Options = {
connect: { connect: {
rejectUnauthorized: false, // bypass SSL failures -- this is fine rejectUnauthorized: false, // bypass SSL failures -- this is fine
// lookup: secureLookup, lookup: cacheableLookup.lookup,
}, },
maxRedirections: 5000, maxRedirections: 5000,
...options, ...options,

View File

@ -49,6 +49,12 @@ export class RemoveFeatureError extends Error {
} }
} }
export class SSLError extends Error {
constructor() {
super("An SSL error occurred while scraping the URL. If you're not inputting any sensitive data, try scraping with `skipTlsVerification: true`.");
}
}
export class SiteError extends Error { export class SiteError extends Error {
public code: string; public code: string;
constructor(code: string) { constructor(code: string) {

View File

@ -21,6 +21,7 @@ import {
SiteError, SiteError,
TimeoutError, TimeoutError,
UnsupportedFileError, UnsupportedFileError,
SSLError,
} from "./error"; } from "./error";
import { executeTransformers } from "./transformers"; import { executeTransformers } from "./transformers";
import { LLMRefusalError } from "./transformers/llmExtract"; import { LLMRefusalError } from "./transformers/llmExtract";
@ -323,6 +324,8 @@ async function scrapeURLLoop(meta: Meta): Promise<ScrapeUrlResponse> {
throw error; throw error;
} else if (error instanceof SiteError) { } else if (error instanceof SiteError) {
throw error; throw error;
} else if (error instanceof SSLError) {
throw error;
} else if (error instanceof ActionError) { } else if (error instanceof ActionError) {
throw error; throw error;
} else if (error instanceof UnsupportedFileError) { } else if (error instanceof UnsupportedFileError) {
@ -470,6 +473,8 @@ export async function scrapeURL(
// TODO: results? // TODO: results?
} else if (error instanceof SiteError) { } else if (error instanceof SiteError) {
meta.logger.warn("scrapeURL: Site failed to load in browser", { error }); meta.logger.warn("scrapeURL: Site failed to load in browser", { error });
} else if (error instanceof SSLError) {
meta.logger.warn("scrapeURL: SSL error", { error });
} else if (error instanceof ActionError) { } else if (error instanceof ActionError) {
meta.logger.warn("scrapeURL: Action(s) failed to complete", { error }); meta.logger.warn("scrapeURL: Action(s) failed to complete", { error });
} else if (error instanceof UnsupportedFileError) { } else if (error instanceof UnsupportedFileError) {

View File

@ -0,0 +1,4 @@
import CacheableLookup from 'cacheable-lookup';
import dns from 'dns';
export const cacheableLookup = (process.env.SENTRY_ENVIRONMENT === "dev" ? { lookup: dns.lookup, install: () => {} } : new CacheableLookup({}));

View File

@ -5,6 +5,7 @@ import { MockState, saveMock } from "./mock";
import { TimeoutSignal } from "../../../controllers/v1/types"; import { TimeoutSignal } from "../../../controllers/v1/types";
import { fireEngineURL } from "../engines/fire-engine/scrape"; import { fireEngineURL } from "../engines/fire-engine/scrape";
import { fetch, RequestInit, Response, FormData, Agent } from "undici"; import { fetch, RequestInit, Response, FormData, Agent } from "undici";
import { cacheableLookup } from "./cacheableLookup";
export type RobustFetchParams<Schema extends z.Schema<any>> = { export type RobustFetchParams<Schema extends z.Schema<any>> = {
url: string; url: string;
@ -82,6 +83,9 @@ export async function robustFetch<
dispatcher: new Agent({ dispatcher: new Agent({
headersTimeout: 0, headersTimeout: 0,
bodyTimeout: 0, bodyTimeout: 0,
connect: {
lookup: cacheableLookup.lookup,
},
}), }),
...(body instanceof FormData ...(body instanceof FormData
? { ? {

View File

@ -81,6 +81,10 @@ import { updateGeneratedLlmsTxt } from "../lib/generate-llmstxt/generate-llmstxt
import { performExtraction_F0 } from "../lib/extract/fire-0/extraction-service-f0"; import { performExtraction_F0 } from "../lib/extract/fire-0/extraction-service-f0";
import { CostTracking } from "../lib/extract/extraction-service"; import { CostTracking } from "../lib/extract/extraction-service";
import { getACUCTeam } from "../controllers/auth"; import { getACUCTeam } from "../controllers/auth";
import Express from "express";
import http from "http";
import https from "https";
import { cacheableLookup } from "../scraper/scrapeURL/lib/cacheableLookup";
configDotenv(); configDotenv();
@ -108,6 +112,10 @@ const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20;
const runningJobs: Set<string> = new Set(); const runningJobs: Set<string> = new Set();
// Install cacheable lookup for all other requests
cacheableLookup.install(http.globalAgent);
cacheableLookup.install(https.globalAgent);
async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) { async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) {
const logger = _logger.child({ const logger = _logger.child({
module: "queue-worker", module: "queue-worker",
@ -144,14 +152,18 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) {
visitedUrls: visitedUrls.size, visitedUrls: visitedUrls.size,
}); });
const lastUrls: string[] = ( let lastUrls: string[] = [];
( const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true";
await supabase_service.rpc("diff_get_last_crawl_urls", { if (useDbAuthentication) {
i_team_id: job.data.team_id, lastUrls = (
i_url: sc.originUrl!, (
}) await supabase_service.rpc("diff_get_last_crawl_urls", {
).data ?? [] i_team_id: job.data.team_id,
).map((x) => x.url); i_url: sc.originUrl!,
})
).data ?? []
).map((x) => x.url);
}
const lastUrlsSet = new Set(lastUrls); const lastUrlsSet = new Set(lastUrls);
@ -249,7 +261,8 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) {
if ( if (
visitedUrls.length > 0 && visitedUrls.length > 0 &&
job.data.crawlerOptions !== null && job.data.crawlerOptions !== null &&
originUrl originUrl &&
process.env.USE_DB_AUTHENTICATION === "true"
) { ) {
// Queue the indexing job instead of doing it directly // Queue the indexing job instead of doing it directly
await getIndexQueue().add( await getIndexQueue().add(
@ -688,6 +701,7 @@ const processGenerateLlmsTxtJobInternal = async (
}; };
let isShuttingDown = false; let isShuttingDown = false;
let isWorkerStalled = false;
process.on("SIGINT", () => { process.on("SIGINT", () => {
console.log("Received SIGTERM. Shutting down gracefully..."); console.log("Received SIGTERM. Shutting down gracefully...");
@ -731,7 +745,9 @@ const workerFun = async (
logger.info("Can't accept connection due to RAM/CPU load"); logger.info("Can't accept connection due to RAM/CPU load");
cantAcceptConnectionCount++; cantAcceptConnectionCount++;
if (cantAcceptConnectionCount >= 25) { isWorkerStalled = cantAcceptConnectionCount >= 25;
if (isWorkerStalled) {
logger.error("WORKER STALLED", { logger.error("WORKER STALLED", {
cpuUsage: await monitor.checkCpuUsage(), cpuUsage: await monitor.checkCpuUsage(),
memoryUsage: await monitor.checkMemoryUsage(), memoryUsage: await monitor.checkMemoryUsage(),
@ -1526,6 +1542,20 @@ async function processJob(job: Job & { id: string }, token: string) {
// wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); // wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));
// Start all workers // Start all workers
const app = Express();
app.get("/liveness", (req, res) => {
if (isWorkerStalled) {
res.status(500).json({ ok: false });
} else {
res.status(200).json({ ok: true });
}
});
app.listen(3005, () => {
_logger.info("Liveness endpoint is running on port 3005");
});
(async () => { (async () => {
await Promise.all([ await Promise.all([
workerFun(getScrapeQueue(), processJobInternal), workerFun(getScrapeQueue(), processJobInternal),