hotfix: kill zombie workers, respect timeouts better (FIR-2034) (#1575)

* feat(scrapeURL): add strict timeouts everywhere

* feat(queue-worker/liveness): add networking check

* fix(queue-worker): typo

* fix(queue-worker/liveness): do not parse

* fix(queue-worker): check local network instead

* fix(queue-worker/liveness): typo
This commit is contained in:
Gergő Móricz 2025-05-20 17:35:32 +02:00 committed by GitHub
parent 5152019a05
commit f838190ba6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 33 additions and 7 deletions

View File

@ -3,9 +3,10 @@ import { EngineScrapeResult } from "..";
import { downloadFile } from "../utils/downloadFile"; import { downloadFile } from "../utils/downloadFile";
import mammoth from "mammoth"; import mammoth from "mammoth";
export async function scrapeDOCX(meta: Meta): Promise<EngineScrapeResult> { export async function scrapeDOCX(meta: Meta, timeToRun: number | undefined): Promise<EngineScrapeResult> {
const { response, tempFilePath } = await downloadFile(meta.id, meta.url, { const { response, tempFilePath } = await downloadFile(meta.id, meta.url, {
headers: meta.options.headers, headers: meta.options.headers,
signal: meta.internalOptions.abort ?? AbortSignal.timeout(timeToRun ?? 300000),
}); });
return { return {

View File

@ -59,7 +59,7 @@ export async function scrapeURLWithFetch(
dispatcher: await makeSecureDispatcher(meta.url), dispatcher: await makeSecureDispatcher(meta.url),
redirect: "follow", redirect: "follow",
headers: meta.options.headers, headers: meta.options.headers,
signal: meta.internalOptions.abort, signal: meta.internalOptions.abort ?? AbortSignal.timeout(timeout),
}), }),
(async () => { (async () => {
await new Promise((resolve) => await new Promise((resolve) =>

View File

@ -142,6 +142,7 @@ export async function fireEngineCheckStatus(
: {}), : {}),
}, },
mock, mock,
abort,
}); });
}, },
); );

View File

@ -9,6 +9,7 @@ export async function fireEngineDelete(
logger: Logger, logger: Logger,
jobId: string, jobId: string,
mock: MockState | null, mock: MockState | null,
abort?: AbortSignal,
) { ) {
await Sentry.startSpan( await Sentry.startSpan(
{ {
@ -33,6 +34,7 @@ export async function fireEngineDelete(
ignoreFailure: true, ignoreFailure: true,
logger: logger.child({ method: "fireEngineDelete/robustFetch", jobId }), logger: logger.child({ method: "fireEngineDelete/robustFetch", jobId }),
mock, mock,
abort,
}); });
}, },
); );

View File

@ -48,6 +48,7 @@ async function performFireEngineScrape<
logger.child({ method: "fireEngineScrape" }), logger.child({ method: "fireEngineScrape" }),
request, request,
mock, mock,
abort,
); );
const startTime = Date.now(); const startTime = Date.now();
@ -56,6 +57,7 @@ async function performFireEngineScrape<
let status: FireEngineCheckStatusSuccess | undefined = undefined; let status: FireEngineCheckStatusSuccess | undefined = undefined;
while (status === undefined) { while (status === undefined) {
abort?.throwIfAborted();
if (errors.length >= errorLimit) { if (errors.length >= errorLimit) {
logger.error("Error limit hit.", { errors }); logger.error("Error limit hit.", { errors });
fireEngineDelete( fireEngineDelete(
@ -236,7 +238,7 @@ export async function scrapeURLWithFireEngineChromeCDP(
request, request,
timeout, timeout,
meta.mock, meta.mock,
meta.internalOptions.abort, meta.internalOptions.abort ?? AbortSignal.timeout(timeout),
); );
if ( if (
@ -317,7 +319,7 @@ export async function scrapeURLWithFireEnginePlaywright(
request, request,
timeout, timeout,
meta.mock, meta.mock,
meta.internalOptions.abort, meta.internalOptions.abort ?? AbortSignal.timeout(timeout),
); );
if (!response.url) { if (!response.url) {
@ -373,7 +375,7 @@ export async function scrapeURLWithFireEngineTLSClient(
request, request,
timeout, timeout,
meta.mock, meta.mock,
meta.internalOptions.abort, meta.internalOptions.abort ?? AbortSignal.timeout(timeout),
); );
if (!response.url) { if (!response.url) {

View File

@ -45,6 +45,8 @@ async function scrapePDFWithRunPodMU(
}); });
} }
const timeout = timeToRun ? timeToRun - (Date.now() - preCacheCheckStartTime) : undefined;
const result = await robustFetch({ const result = await robustFetch({
url: url:
"https://api.runpod.ai/v2/" + process.env.RUNPOD_MU_POD_ID + "/runsync", "https://api.runpod.ai/v2/" + process.env.RUNPOD_MU_POD_ID + "/runsync",
@ -56,7 +58,7 @@ async function scrapePDFWithRunPodMU(
input: { input: {
file_content: base64Content, file_content: base64Content,
filename: path.basename(tempFilePath) + ".pdf", filename: path.basename(tempFilePath) + ".pdf",
timeout: timeToRun ? timeToRun - (Date.now() - preCacheCheckStartTime) : undefined, timeout,
created_at: Date.now(), created_at: Date.now(),
}, },
}, },
@ -69,6 +71,7 @@ async function scrapePDFWithRunPodMU(
}), }),
}), }),
mock: meta.mock, mock: meta.mock,
abort: timeout ? AbortSignal.timeout(timeout) : undefined,
}); });
const processorResult = { const processorResult = {

View File

@ -30,6 +30,7 @@ export async function scrapeURLWithPlaywright(
pageError: z.string().optional(), pageError: z.string().optional(),
}), }),
mock: meta.mock, mock: meta.mock,
abort: AbortSignal.timeout(timeout),
}), }),
(async () => { (async () => {
await new Promise((resolve) => setTimeout(() => resolve(null), timeout)); await new Promise((resolve) => setTimeout(() => resolve(null), timeout));

View File

@ -85,6 +85,7 @@ import Express from "express";
import http from "http"; import http from "http";
import https from "https"; import https from "https";
import { cacheableLookup } from "../scraper/scrapeURL/lib/cacheableLookup"; import { cacheableLookup } from "../scraper/scrapeURL/lib/cacheableLookup";
import { robustFetch } from "../scraper/scrapeURL/lib/fetch";
configDotenv(); configDotenv();
@ -1546,10 +1547,25 @@ async function processJob(job: Job & { id: string }, token: string) {
const app = Express(); const app = Express();
app.get("/liveness", (req, res) => { app.get("/liveness", (req, res) => {
// stalled check
if (isWorkerStalled) { if (isWorkerStalled) {
res.status(500).json({ ok: false }); res.status(500).json({ ok: false });
} else { } else {
res.status(200).json({ ok: true }); // networking check
robustFetch({
url: "http://firecrawl-app-service:3002",
method: "GET",
mock: null,
logger: _logger,
abort: AbortSignal.timeout(5000),
ignoreResponse: true,
})
.then(() => {
res.status(200).json({ ok: true });
}).catch(e => {
_logger.error("WORKER NETWORKING CHECK FAILED", { error: e });
res.status(500).json({ ok: false });
});
} }
}); });