Merge remote-tracking branch 'origin/mog/no-cluster' into nsc/no-cluster-all

This commit is contained in:
Nicolas 2024-10-09 19:26:11 -03:00
commit 7847404e1f
2 changed files with 149 additions and 172 deletions

View File

@ -28,9 +28,6 @@ RUN cd /app/src/lib/go-html-to-md && \
chmod +x html-to-markdown.so chmod +x html-to-markdown.so
FROM base FROM base
RUN apt-get update -qq && \
apt-get install --no-install-recommends -y chromium chromium-sandbox && \
rm -rf /var/lib/apt/lists /var/cache/apt/archives
COPY --from=prod-deps /app/node_modules /app/node_modules COPY --from=prod-deps /app/node_modules /app/node_modules
COPY --from=build /app /app COPY --from=build /app /app
COPY --from=go-base /app/src/lib/go-html-to-md/html-to-markdown.so /app/dist/src/lib/go-html-to-md/html-to-markdown.so COPY --from=go-base /app/src/lib/go-html-to-md/html-to-markdown.so /app/dist/src/lib/go-html-to-md/html-to-markdown.so

View File

@ -7,7 +7,6 @@ import cors from "cors";
import { getScrapeQueue } from "./services/queue-service"; import { getScrapeQueue } from "./services/queue-service";
import { v0Router } from "./routes/v0"; import { v0Router } from "./routes/v0";
import { initSDK } from "@hyperdx/node-opentelemetry"; import { initSDK } from "@hyperdx/node-opentelemetry";
import cluster from "cluster";
import os from "os"; import os from "os";
import { Logger } from "./lib/logger"; import { Logger } from "./lib/logger";
import { adminRouter } from "./routes/admin"; import { adminRouter } from "./routes/admin";
@ -37,198 +36,179 @@ const cacheable = new CacheableLookup({
cacheable.install(http.globalAgent); cacheable.install(http.globalAgent);
cacheable.install(https.globalAgent) cacheable.install(https.globalAgent)
if (cluster.isMaster) { const ws = expressWs(express());
Logger.info(`Master ${process.pid} is running`); const app = ws.app;
// Fork workers. global.isProduction = process.env.IS_PRODUCTION === "true";
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on("exit", (worker, code, signal) => { app.use(bodyParser.urlencoded({ extended: true }));
if (code !== null) { app.use(bodyParser.json({ limit: "10mb" }));
Logger.info(`Worker ${worker.process.pid} exited`);
Logger.info("Starting a new worker"); app.use(cors()); // Add this line to enable CORS
cluster.fork();
} const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`);
const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
queues: [new BullAdapter(getScrapeQueue())],
serverAdapter: serverAdapter,
});
app.use(
`/admin/${process.env.BULL_AUTH_KEY}/queues`,
serverAdapter.getRouter()
);
app.get("/", (req, res) => {
res.send("SCRAPERS-JS: Hello, world! Fly.io");
});
//write a simple test function
app.get("/test", async (req, res) => {
res.send("Hello, world!");
});
// register router
app.use(v0Router);
app.use("/v1", v1Router);
app.use(adminRouter);
const DEFAULT_PORT = process.env.PORT ?? 3002;
const HOST = process.env.HOST ?? "localhost";
// HyperDX OpenTelemetry
if (process.env.ENV === "production") {
initSDK({ consoleCapture: true, additionalInstrumentations: [] });
}
function startServer(port = DEFAULT_PORT) {
const server = app.listen(Number(port), HOST, () => {
Logger.info(`Worker ${process.pid} listening on port ${port}`);
Logger.info(
`For the Queue UI, open: http://${HOST}:${port}/admin/${process.env.BULL_AUTH_KEY}/queues`
);
}); });
} else { return server;
const ws = expressWs(express()); }
const app = ws.app;
global.isProduction = process.env.IS_PRODUCTION === "true"; if (require.main === module) {
startServer();
}
app.use(bodyParser.urlencoded({ extended: true })); app.get(`/serverHealthCheck`, async (req, res) => {
app.use(bodyParser.json({ limit: "10mb" })); try {
const scrapeQueue = getScrapeQueue();
const [waitingJobs] = await Promise.all([
scrapeQueue.getWaitingCount(),
]);
app.use(cors()); // Add this line to enable CORS const noWaitingJobs = waitingJobs === 0;
// 200 if no active jobs, 503 if there are active jobs
const serverAdapter = new ExpressAdapter(); return res.status(noWaitingJobs ? 200 : 500).json({
serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`); waitingJobs,
const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
queues: [new BullAdapter(getScrapeQueue())],
serverAdapter: serverAdapter,
});
app.use(
`/admin/${process.env.BULL_AUTH_KEY}/queues`,
serverAdapter.getRouter()
);
app.get("/", (req, res) => {
res.send("SCRAPERS-JS: Hello, world! Fly.io");
});
//write a simple test function
app.get("/test", async (req, res) => {
res.send("Hello, world!");
});
// register router
app.use(v0Router);
app.use("/v1", v1Router);
app.use(adminRouter);
const DEFAULT_PORT = process.env.PORT ?? 3002;
const HOST = process.env.HOST ?? "localhost";
// HyperDX OpenTelemetry
if (process.env.ENV === "production") {
initSDK({ consoleCapture: true, additionalInstrumentations: [] });
}
function startServer(port = DEFAULT_PORT) {
const server = app.listen(Number(port), HOST, () => {
Logger.info(`Worker ${process.pid} listening on port ${port}`);
Logger.info(
`For the Queue UI, open: http://${HOST}:${port}/admin/${process.env.BULL_AUTH_KEY}/queues`
);
}); });
return server; } catch (error) {
Sentry.captureException(error);
Logger.error(error);
return res.status(500).json({ error: error.message });
} }
});
if (require.main === module) { app.get("/serverHealthCheck/notify", async (req, res) => {
startServer(); if (process.env.SLACK_WEBHOOK_URL) {
} const treshold = 1; // The treshold value for the active jobs
const timeout = 60000; // 1 minute // The timeout value for the check in milliseconds
app.get(`/serverHealthCheck`, async (req, res) => { const getWaitingJobsCount = async () => {
try {
const scrapeQueue = getScrapeQueue(); const scrapeQueue = getScrapeQueue();
const [waitingJobs] = await Promise.all([ const [waitingJobsCount] = await Promise.all([
scrapeQueue.getWaitingCount(), scrapeQueue.getWaitingCount(),
]); ]);
const noWaitingJobs = waitingJobs === 0; return waitingJobsCount;
// 200 if no active jobs, 503 if there are active jobs };
return res.status(noWaitingJobs ? 200 : 500).json({
waitingJobs,
});
} catch (error) {
Sentry.captureException(error);
Logger.error(error);
return res.status(500).json({ error: error.message });
}
});
app.get("/serverHealthCheck/notify", async (req, res) => { res.status(200).json({ message: "Check initiated" });
if (process.env.SLACK_WEBHOOK_URL) {
const treshold = 1; // The treshold value for the active jobs
const timeout = 60000; // 1 minute // The timeout value for the check in milliseconds
const getWaitingJobsCount = async () => { const checkWaitingJobs = async () => {
const scrapeQueue = getScrapeQueue(); try {
const [waitingJobsCount] = await Promise.all([ let waitingJobsCount = await getWaitingJobsCount();
scrapeQueue.getWaitingCount(), if (waitingJobsCount >= treshold) {
]); setTimeout(async () => {
// Re-check the waiting jobs count after the timeout
waitingJobsCount = await getWaitingJobsCount();
if (waitingJobsCount >= treshold) {
const slackWebhookUrl = process.env.SLACK_WEBHOOK_URL;
const message = {
text: `⚠️ Warning: The number of active jobs (${waitingJobsCount}) has exceeded the threshold (${treshold}) for more than ${
timeout / 60000
} minute(s).`,
};
return waitingJobsCount; const response = await fetch(slackWebhookUrl, {
}; method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(message),
});
res.status(200).json({ message: "Check initiated" }); if (!response.ok) {
Logger.error("Failed to send Slack notification");
const checkWaitingJobs = async () => {
try {
let waitingJobsCount = await getWaitingJobsCount();
if (waitingJobsCount >= treshold) {
setTimeout(async () => {
// Re-check the waiting jobs count after the timeout
waitingJobsCount = await getWaitingJobsCount();
if (waitingJobsCount >= treshold) {
const slackWebhookUrl = process.env.SLACK_WEBHOOK_URL;
const message = {
text: `⚠️ Warning: The number of active jobs (${waitingJobsCount}) has exceeded the threshold (${treshold}) for more than ${
timeout / 60000
} minute(s).`,
};
const response = await fetch(slackWebhookUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(message),
});
if (!response.ok) {
Logger.error("Failed to send Slack notification");
}
} }
}, timeout); }
} }, timeout);
} catch (error) {
Sentry.captureException(error);
Logger.debug(error);
} }
}; } catch (error) {
Sentry.captureException(error);
checkWaitingJobs(); Logger.debug(error);
}
});
app.get("/is-production", (req, res) => {
res.send({ isProduction: global.isProduction });
});
app.use((err: unknown, req: Request<{}, ErrorResponse, undefined>, res: Response<ErrorResponse>, next: NextFunction) => {
if (err instanceof ZodError) {
if (Array.isArray(err.errors) && err.errors.find(x => x.message === "URL uses unsupported protocol")) {
Logger.warn("Unsupported protocol error: " + JSON.stringify(req.body));
}
res.status(400).json({ success: false, error: "Bad Request", details: err.errors });
} else {
next(err);
}
});
Sentry.setupExpressErrorHandler(app);
app.use((err: unknown, req: Request<{}, ErrorResponse, undefined>, res: ResponseWithSentry<ErrorResponse>, next: NextFunction) => {
if (err instanceof SyntaxError && 'status' in err && err.status === 400 && 'body' in err) {
return res.status(400).json({ success: false, error: 'Bad request, malformed JSON' });
}
const id = res.sentry ?? uuidv4();
let verbose = JSON.stringify(err);
if (verbose === "{}") {
if (err instanceof Error) {
verbose = JSON.stringify({
message: err.message,
name: err.name,
stack: err.stack,
});
} }
};
checkWaitingJobs();
}
});
app.get("/is-production", (req, res) => {
res.send({ isProduction: global.isProduction });
});
app.use((err: unknown, req: Request<{}, ErrorResponse, undefined>, res: Response<ErrorResponse>, next: NextFunction) => {
if (err instanceof ZodError) {
if (Array.isArray(err.errors) && err.errors.find(x => x.message === "URL uses unsupported protocol")) {
Logger.warn("Unsupported protocol error: " + JSON.stringify(req.body));
}
res.status(400).json({ success: false, error: "Bad Request", details: err.errors });
} else {
next(err);
}
});
Sentry.setupExpressErrorHandler(app);
app.use((err: unknown, req: Request<{}, ErrorResponse, undefined>, res: ResponseWithSentry<ErrorResponse>, next: NextFunction) => {
if (err instanceof SyntaxError && 'status' in err && err.status === 400 && 'body' in err) {
return res.status(400).json({ success: false, error: 'Bad request, malformed JSON' });
}
const id = res.sentry ?? uuidv4();
let verbose = JSON.stringify(err);
if (verbose === "{}") {
if (err instanceof Error) {
verbose = JSON.stringify({
message: err.message,
name: err.name,
stack: err.stack,
});
} }
}
Logger.error("Error occurred in request! (" + req.path + ") -- ID " + id + " -- " + verbose); Logger.error("Error occurred in request! (" + req.path + ") -- ID " + id + " -- " + verbose);
res.status(500).json({ success: false, error: "An unexpected error occurred. Please contact hello@firecrawl.com for help. Your exception ID is " + id }); res.status(500).json({ success: false, error: "An unexpected error occurred. Please contact hello@firecrawl.com for help. Your exception ID is " + id });
}); });
Logger.info(`Worker ${process.pid} started`);
}
Logger.info(`Worker ${process.pid} started`);
// const sq = getScrapeQueue(); // const sq = getScrapeQueue();