Merge branch 'main' into v1-webscraper

This commit is contained in:
Nicolas 2024-08-26 16:22:05 -03:00
commit 4d0acc9722
9 changed files with 692 additions and 31 deletions

View File

@ -4,6 +4,7 @@ import { Job } from "bullmq";
import { Logger } from "../../../lib/logger";
import { getScrapeQueue } from "../../../services/queue-service";
import { checkAlerts } from "../../../services/alerts";
import { sendSlackWebhook } from "../../../services/alerts/slack";
export async function cleanBefore24hCompleteJobsController(
req: Request,
@ -54,34 +55,145 @@ export async function cleanBefore24hCompleteJobsController(
}
}
export async function checkQueuesController(req: Request, res: Response) {
try {
await checkAlerts();
return res.status(200).send("Alerts initialized");
} catch (error) {
Logger.debug(`Failed to initialize alerts: ${error}`);
return res.status(500).send("Failed to initialize alerts");
}
try {
await checkAlerts();
return res.status(200).send("Alerts initialized");
} catch (error) {
Logger.debug(`Failed to initialize alerts: ${error}`);
return res.status(500).send("Failed to initialize alerts");
}
}
// Use this as a "health check" that way we dont destroy the server
// Use this as a "health check" that way we dont destroy the server
export async function queuesController(req: Request, res: Response) {
try {
const scrapeQueue = getScrapeQueue();
try {
const scrapeQueue = getScrapeQueue();
const [webScraperActive] = await Promise.all([
const [webScraperActive] = await Promise.all([
scrapeQueue.getActiveCount(),
]);
const noActiveJobs = webScraperActive === 0;
// 200 if no active jobs, 503 if there are active jobs
return res.status(noActiveJobs ? 200 : 500).json({
webScraperActive,
noActiveJobs,
});
} catch (error) {
Logger.error(error);
return res.status(500).json({ error: error.message });
}
}
export async function autoscalerController(req: Request, res: Response) {
try {
const maxNumberOfMachines = 80;
const minNumberOfMachines = 20;
const scrapeQueue = getScrapeQueue();
const [webScraperActive, webScraperWaiting, webScraperPriority] =
await Promise.all([
scrapeQueue.getActiveCount(),
scrapeQueue.getWaitingCount(),
scrapeQueue.getPrioritizedCount(),
]);
const noActiveJobs = webScraperActive === 0;
// 200 if no active jobs, 503 if there are active jobs
return res.status(noActiveJobs ? 200 : 500).json({
webScraperActive,
noActiveJobs,
});
} catch (error) {
Logger.error(error);
return res.status(500).json({ error: error.message });
let waitingAndPriorityCount = webScraperWaiting + webScraperPriority;
// get number of machines active
const request = await fetch(
"https://api.machines.dev/v1/apps/firecrawl-scraper-js/machines",
{
headers: {
Authorization: `Bearer ${process.env.FLY_API_TOKEN}`,
},
}
);
const machines = await request.json();
// Only worker machines
const activeMachines = machines.filter(
(machine) =>
(machine.state === "started" ||
machine.state === "starting" ||
machine.state === "replacing") &&
machine.config.env["FLY_PROCESS_GROUP"] === "worker"
).length;
let targetMachineCount = activeMachines;
const baseScaleUp = 10;
// Slow scale down
const baseScaleDown = 2;
// Scale up logic
if (webScraperActive > 9000 || waitingAndPriorityCount > 2000) {
targetMachineCount = Math.min(
maxNumberOfMachines,
activeMachines + baseScaleUp * 3
);
} else if (webScraperActive > 5000 || waitingAndPriorityCount > 1000) {
targetMachineCount = Math.min(
maxNumberOfMachines,
activeMachines + baseScaleUp * 2
);
} else if (webScraperActive > 1000 || waitingAndPriorityCount > 500) {
targetMachineCount = Math.min(
maxNumberOfMachines,
activeMachines + baseScaleUp
);
}
}
// Scale down logic
if (webScraperActive < 100 && waitingAndPriorityCount < 50) {
targetMachineCount = Math.max(
minNumberOfMachines,
activeMachines - baseScaleDown * 3
);
} else if (webScraperActive < 500 && waitingAndPriorityCount < 200) {
targetMachineCount = Math.max(
minNumberOfMachines,
activeMachines - baseScaleDown * 2
);
} else if (webScraperActive < 1000 && waitingAndPriorityCount < 500) {
targetMachineCount = Math.max(
minNumberOfMachines,
activeMachines - baseScaleDown
);
}
if (targetMachineCount !== activeMachines) {
Logger.info(
`🐂 Scaling from ${activeMachines} to ${targetMachineCount} - ${webScraperActive} active, ${webScraperWaiting} waiting`
);
if (targetMachineCount > activeMachines) {
sendSlackWebhook(
`🐂 Scaling from ${activeMachines} to ${targetMachineCount} - ${webScraperActive} active, ${webScraperWaiting} waiting - Current DateTime: ${new Date().toISOString()}`,
false,
process.env.SLACK_AUTOSCALER ?? ""
);
} else {
sendSlackWebhook(
`🐂 Scaling from ${activeMachines} to ${targetMachineCount} - ${webScraperActive} active, ${webScraperWaiting} waiting - Current DateTime: ${new Date().toISOString()}`,
false,
process.env.SLACK_AUTOSCALER ?? ""
);
}
return res.status(200).json({
mode: "scale-descale",
count: targetMachineCount,
});
}
return res.status(200).json({
mode: "normal",
count: activeMachines,
});
} catch (error) {
Logger.error(error);
return res.status(500).send("Failed to initialize autoscaler");
}
}

View File

@ -222,7 +222,8 @@ export async function supaAuthenticateUser(
rateLimiter = getRateLimiter(
RateLimiterMode.Scrape,
token,
subscriptionData.plan
subscriptionData.plan,
teamId
);
break;
case RateLimiterMode.Search:

View File

@ -1,5 +1,5 @@
export function parseMarkdown(html: string) {
export async function parseMarkdown(html: string) {
var TurndownService = require("turndown");
var turndownPluginGfm = require('joplin-turndown-plugin-gfm')
@ -21,7 +21,27 @@ export function parseMarkdown(html: string) {
});
var gfm = turndownPluginGfm.gfm;
turndownService.use(gfm);
let markdownContent = turndownService.turndown(html);
let markdownContent = "";
const turndownPromise = new Promise<string>((resolve, reject) => {
try {
const result = turndownService.turndown(html);
resolve(result);
} catch (error) {
reject("Error converting HTML to Markdown: " + error);
}
});
const timeoutPromise = new Promise<string>((resolve, reject) => {
const timeout = 5000; // Timeout in milliseconds
setTimeout(() => reject("Conversion timed out after " + timeout + "ms"), timeout);
});
try {
markdownContent = await Promise.race([turndownPromise, timeoutPromise]);
} catch (error) {
console.error(error);
return ""; // Optionally return an empty string or handle the error as needed
}
// multiple line links
let insideLinkContent = false;

View File

@ -1,6 +1,7 @@
import express from "express";
import { redisHealthController } from "../controllers/v0/admin/redis-health";
import {
autoscalerController,
checkQueuesController,
cleanBefore24hCompleteJobsController,
queuesController,
@ -27,3 +28,8 @@ adminRouter.get(
`/admin/${process.env.BULL_AUTH_KEY}/queues`,
queuesController
);
adminRouter.get(
`/admin/${process.env.BULL_AUTH_KEY}/autoscaler`,
autoscalerController
);

View File

@ -24,8 +24,8 @@ import { clientSideError } from "../../strings";
dotenv.config();
export const baseScrapers = [
"fire-engine",
"fire-engine;chrome-cdp",
"fire-engine",
"scrapingBee",
process.env.USE_DB_AUTHENTICATION ? undefined : "playwright",
"scrapingBeeLoad",
@ -85,8 +85,8 @@ function getScrapingFallbackOrder(
});
let defaultOrder = [
!process.env.USE_DB_AUTHENTICATION ? undefined : "fire-engine",
!process.env.USE_DB_AUTHENTICATION ? undefined : "fire-engine;chrome-cdp",
!process.env.USE_DB_AUTHENTICATION ? undefined : "fire-engine",
"scrapingBee",
process.env.USE_DB_AUTHENTICATION ? undefined : "playwright",
"scrapingBeeLoad",

View File

@ -15,7 +15,8 @@ const socialMediaBlocklist = [
'wechat.com',
'telegram.org',
'researchhub.com',
'youtube.com'
'youtube.com',
'corterix.com',
];
const allowedKeywords = [

View File

@ -3,9 +3,9 @@ import { Logger } from "../../../src/lib/logger";
export async function sendSlackWebhook(
message: string,
alertEveryone: boolean = false
alertEveryone: boolean = false,
webhookUrl: string = process.env.SLACK_WEBHOOK_URL ?? ""
) {
const webhookUrl = process.env.SLACK_WEBHOOK_URL;
const messagePrefix = alertEveryone ? "<!channel> " : "";
const payload = {
text: `${messagePrefix} ${message}`,

View File

@ -97,16 +97,28 @@ export const testSuiteRateLimiter = new RateLimiterRedis({
duration: 60, // Duration in seconds
});
export const devBRateLimiter = new RateLimiterRedis({
storeClient: redisRateLimitClient,
keyPrefix: "dev-b",
points: 1200,
duration: 60, // Duration in seconds
});
export function getRateLimiter(
mode: RateLimiterMode,
token: string,
plan?: string
plan?: string,
teamId?: string
) {
if (token.includes("a01ccae") || token.includes("6254cf9") || token.includes("0f96e673")) {
return testSuiteRateLimiter;
}
if(teamId === process.env.DEV_B_TEAM_ID) {
return devBRateLimiter;
}
const rateLimitConfig = RATE_LIMITS[mode]; // {default : 5}
if (!rateLimitConfig) return serverRateLimiter;

File diff suppressed because one or more lines are too long