Merge branch 'main' into nsc/job-priority

This commit is contained in:
Nicolas 2024-08-27 15:01:58 -03:00
commit 1e08e6d317
36 changed files with 1341 additions and 137 deletions

View File

@ -4,6 +4,8 @@ import { Job } from "bullmq";
import { Logger } from "../../lib/logger";
import { getScrapeQueue } from "../../services/queue-service";
import { checkAlerts } from "../../services/alerts";
import { exec } from "node:child_process";
import { sendSlackWebhook } from "../../services/alerts/slack";
export async function cleanBefore24hCompleteJobsController(
req: Request,
@ -54,34 +56,109 @@ export async function cleanBefore24hCompleteJobsController(
}
}
export async function checkQueuesController(req: Request, res: Response) {
try {
await checkAlerts();
return res.status(200).send("Alerts initialized");
} catch (error) {
Logger.debug(`Failed to initialize alerts: ${error}`);
return res.status(500).send("Failed to initialize alerts");
}
try {
await checkAlerts();
return res.status(200).send("Alerts initialized");
} catch (error) {
Logger.debug(`Failed to initialize alerts: ${error}`);
return res.status(500).send("Failed to initialize alerts");
}
}
// Use this as a "health check" that way we dont destroy the server
// Use this as a "health check" that way we dont destroy the server
export async function queuesController(req: Request, res: Response) {
try {
const scrapeQueue = getScrapeQueue();
try {
const scrapeQueue = getScrapeQueue();
const [webScraperActive] = await Promise.all([
scrapeQueue.getActiveCount(),
]);
const [webScraperActive] = await Promise.all([
scrapeQueue.getActiveCount(),
]);
const noActiveJobs = webScraperActive === 0;
// 200 if no active jobs, 503 if there are active jobs
return res.status(noActiveJobs ? 200 : 500).json({
webScraperActive,
noActiveJobs,
});
} catch (error) {
Logger.error(error);
return res.status(500).json({ error: error.message });
const noActiveJobs = webScraperActive === 0;
// 200 if no active jobs, 503 if there are active jobs
return res.status(noActiveJobs ? 200 : 500).json({
webScraperActive,
noActiveJobs,
});
} catch (error) {
Logger.error(error);
return res.status(500).json({ error: error.message });
}
}
export async function autoscalerController(req: Request, res: Response) {
try {
const maxNumberOfMachines = 80;
const minNumberOfMachines = 20;
const scrapeQueue = getScrapeQueue();
const [webScraperActive, webScraperWaiting, webScraperPriority] = await Promise.all([
scrapeQueue.getActiveCount(),
scrapeQueue.getWaitingCount(),
scrapeQueue.getPrioritizedCount(),
]);
let waitingAndPriorityCount = webScraperWaiting + webScraperPriority;
// get number of machines active
const request = await fetch('https://api.machines.dev/v1/apps/firecrawl-scraper-js/machines',
{
headers: {
'Authorization': `Bearer ${process.env.FLY_API_TOKEN}`
}
}
)
const machines = await request.json();
// Only worker machines
const activeMachines = machines.filter(machine => (machine.state === 'started' || machine.state === "starting" || machine.state === "replacing") && machine.config.env["FLY_PROCESS_GROUP"] === "worker").length;
let targetMachineCount = activeMachines;
const baseScaleUp = 10;
// Slow scale down
const baseScaleDown = 2;
// Scale up logic
if (webScraperActive > 9000 || waitingAndPriorityCount > 2000) {
targetMachineCount = Math.min(maxNumberOfMachines, activeMachines + (baseScaleUp * 3));
} else if (webScraperActive > 5000 || waitingAndPriorityCount > 1000) {
targetMachineCount = Math.min(maxNumberOfMachines, activeMachines + (baseScaleUp * 2));
} else if (webScraperActive > 1000 || waitingAndPriorityCount > 500) {
targetMachineCount = Math.min(maxNumberOfMachines, activeMachines + baseScaleUp);
}
}
// Scale down logic
if (webScraperActive < 100 && waitingAndPriorityCount < 50) {
targetMachineCount = Math.max(minNumberOfMachines, activeMachines - (baseScaleDown * 3));
} else if (webScraperActive < 500 && waitingAndPriorityCount < 200) {
targetMachineCount = Math.max(minNumberOfMachines, activeMachines - (baseScaleDown * 2));
} else if (webScraperActive < 1000 && waitingAndPriorityCount < 500) {
targetMachineCount = Math.max(minNumberOfMachines, activeMachines - baseScaleDown);
}
if (targetMachineCount !== activeMachines) {
Logger.info(`🐂 Scaling from ${activeMachines} to ${targetMachineCount} - ${webScraperActive} active, ${webScraperWaiting} waiting`);
if(targetMachineCount > activeMachines) {
sendSlackWebhook(`🐂 Scaling from ${activeMachines} to ${targetMachineCount} - ${webScraperActive} active, ${webScraperWaiting} waiting - Current DateTime: ${new Date().toISOString()}`, false, process.env.SLACK_AUTOSCALER ?? "");
} else {
sendSlackWebhook(`🐂 Scaling from ${activeMachines} to ${targetMachineCount} - ${webScraperActive} active, ${webScraperWaiting} waiting - Current DateTime: ${new Date().toISOString()}`, false, process.env.SLACK_AUTOSCALER ?? "");
}
return res.status(200).json({
mode: "scale-descale",
count: targetMachineCount,
});
}
return res.status(200).json({
mode: "normal",
count: activeMachines,
});
} catch (error) {
Logger.error(error);
return res.status(500).send("Failed to initialize autoscaler");
}
}

View File

@ -62,8 +62,10 @@ async function getKeyAndPriceId(normalizedApi: string): Promise<{
};
}
if (!data || data.length === 0) {
Logger.warn(`Error fetching api key: ${error.message} or data is empty`);
Sentry.captureException(error);
if (error) {
Logger.warn(`Error fetching api key: ${error.message} or data is empty`);
Sentry.captureException(error);
}
// TODO: change this error code ?
return {
success: false,
@ -221,7 +223,8 @@ export async function supaAuthenticateUser(
rateLimiter = getRateLimiter(
RateLimiterMode.Scrape,
token,
subscriptionData.plan
subscriptionData.plan,
teamId
);
break;
case RateLimiterMode.Search:
@ -310,8 +313,8 @@ export async function supaAuthenticateUser(
if (error || !data || data.length === 0) {
if (error) {
Sentry.captureException(error);
Logger.warn(`Error fetching api key: ${error.message} or data is empty`);
}
Logger.warn(`Error fetching api key: ${error.message} or data is empty`);
return {
success: false,
error: "Unauthorized: Invalid token",

View File

@ -58,6 +58,26 @@ export async function crawlController(req: Request, res: Response) {
};
const pageOptions = { ...defaultCrawlPageOptions, ...req.body.pageOptions };
if (Array.isArray(crawlerOptions.includes)) {
for (const x of crawlerOptions.includes) {
try {
new RegExp(x);
} catch (e) {
return res.status(400).json({ error: e.message });
}
}
}
if (Array.isArray(crawlerOptions.excludes)) {
for (const x of crawlerOptions.excludes) {
try {
new RegExp(x);
} catch (e) {
return res.status(400).json({ error: e.message });
}
}
}
const limitCheck = req.body?.crawlerOptions?.limit ?? 1;
const { success: creditsCheckSuccess, message: creditsCheckMessage, remainingCredits } =
await checkTeamCredits(team_id, limitCheck);
@ -73,6 +93,9 @@ export async function crawlController(req: Request, res: Response) {
if (!url) {
return res.status(400).json({ error: "Url is required" });
}
if (typeof url !== "string") {
return res.status(400).json({ error: "URL must be a string" });
}
try {
url = checkAndUpdateURL(url).url;
} catch (e) {
@ -88,8 +111,6 @@ export async function crawlController(req: Request, res: Response) {
});
}
const mode = req.body.mode ?? "crawl";
// if (mode === "single_urls" && !url.includes(",")) { // NOTE: do we need this?
// try {
// const a = new WebScraperDataProvider();
@ -144,8 +165,8 @@ export async function crawlController(req: Request, res: Response) {
? null
: await crawler.tryGetSitemap();
if (sitemap !== null) {
if (sitemap !== null && sitemap.length > 0) {
let jobPriority = 20;
// If it is over 1000, we need to get the job priority,
// otherwise we can use the default priority of 20
@ -153,7 +174,6 @@ export async function crawlController(req: Request, res: Response) {
// set base to 21
jobPriority = await getJobPriority({plan, team_id, basePriority: 21})
}
const jobs = sitemap.map((x) => {
const url = x.url;
const uuid = uuidv4();
@ -184,7 +204,14 @@ export async function crawlController(req: Request, res: Response) {
id,
jobs.map((x) => x.opts.jobId)
);
await getScrapeQueue().addBulk(jobs);
if (Sentry.isInitialized()) {
for (const job of jobs) {
// add with sentry instrumentation
await addScrapeJob(job.data as any, {}, job.opts.jobId);
}
} else {
await getScrapeQueue().addBulk(jobs);
}
} else {
await lockURL(id, sc, url);

View File

@ -1,4 +1,4 @@
import { ExtractorOptions, PageOptions } from './../lib/entities';
import { ExtractorOptions, PageOptions } from './../lib/entities';
import { Request, Response } from "express";
import { billTeam, checkTeamCredits } from "../services/billing/credit_billing";
import { authenticateUser } from "./auth";
@ -9,7 +9,7 @@ import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import
import { numTokensFromString } from '../lib/LLM-extraction/helpers';
import { defaultPageOptions, defaultExtractorOptions, defaultTimeout, defaultOrigin } from '../lib/default-values';
import { addScrapeJob } from '../services/queue-jobs';
import { scrapeQueueEvents } from '../services/queue-service';
import { getScrapeQueue } from '../services/queue-service';
import { v4 as uuidv4 } from "uuid";
import { Logger } from '../lib/logger';
import { getJobPriority } from '../lib/job-priority';
@ -52,18 +52,51 @@ export async function scrapeHelper(
}, {}, jobId, jobPriority);
let doc;
try {
doc = (await job.waitUntilFinished(scrapeQueueEvents, timeout))[0]; //60 seconds timeout
} catch (e) {
if (e instanceof Error && e.message.startsWith("Job wait")) {
return {
success: false,
error: "Request timed out",
returnCode: 408,
const err = await Sentry.startSpan({ name: "Wait for job to finish", op: "bullmq.wait", attributes: { job: jobId } }, async (span) => {
try {
doc = (await new Promise((resolve, reject) => {
const start = Date.now();
const int = setInterval(async () => {
if (Date.now() >= start + timeout) {
clearInterval(int);
reject(new Error("Job wait "));
} else {
const state = await job.getState();
if (state === "completed") {
clearInterval(int);
resolve((await getScrapeQueue().getJob(job.id)).returnvalue);
} else if (state === "failed") {
clearInterval(int);
reject((await getScrapeQueue().getJob(job.id)).failedReason);
}
}
}, 1000);
}))[0]
} catch (e) {
if (e instanceof Error && e.message.startsWith("Job wait")) {
span.setAttribute("timedOut", true);
return {
success: false,
error: "Request timed out",
returnCode: 408,
}
} else if (typeof e === "string" && (e.includes("Error generating completions: ") || e.includes("Invalid schema for function") || e.includes("LLM extraction did not match the extraction schema you provided."))) {
return {
success: false,
error: e,
returnCode: 500,
};
} else {
throw e;
}
} else {
throw e;
}
span.setAttribute("result", JSON.stringify(doc));
return null;
});
if (err !== null) {
return err;
}
await job.remove();
@ -108,6 +141,10 @@ export async function scrapeController(req: Request, res: Response) {
let timeout = req.body.timeout ?? defaultTimeout;
if (extractorOptions.mode.includes("llm-extraction")) {
if (typeof extractorOptions.extractionSchema !== "object" || extractorOptions.extractionSchema === null) {
return res.status(400).json({ error: "extractorOptions.extractionSchema must be an object if llm-extraction mode is specified" });
}
pageOptions.onlyMainContent = true;
timeout = req.body.timeout ?? 90000;
}
@ -192,6 +229,6 @@ export async function scrapeController(req: Request, res: Response) {
} catch (error) {
Sentry.captureException(error);
Logger.error(error);
return res.status(500).json({ error: error.message });
return res.status(500).json({ error: typeof error === "string" ? error : (error?.message ?? "Internal Server Error") });
}
}

View File

@ -9,9 +9,10 @@ import { search } from "../search";
import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist";
import { v4 as uuidv4 } from "uuid";
import { Logger } from "../lib/logger";
import { getScrapeQueue, scrapeQueueEvents } from "../services/queue-service";
import { getJobPriority } from "../lib/job-priority";
import { getScrapeQueue } from "../services/queue-service";
import * as Sentry from "@sentry/node";
import { addScrapeJob } from "../services/queue-jobs";
export async function searchHelper(
jobId: string,
@ -99,10 +100,36 @@ export async function searchHelper(
}
};
})
const jobs = await getScrapeQueue().addBulk(jobDatas);
const docs = (await Promise.all(jobs.map(x => x.waitUntilFinished(scrapeQueueEvents, 60000)))).map(x => x[0]);
let jobs = [];
if (Sentry.isInitialized()) {
for (const job of jobDatas) {
// add with sentry instrumentation
jobs.push(await addScrapeJob(job.data as any, {}, job.opts.jobId));
}
} else {
jobs = await getScrapeQueue().addBulk(jobDatas);
await getScrapeQueue().addBulk(jobs);
}
const docs = (await Promise.all(jobs.map(x => new Promise((resolve, reject) => {
const start = Date.now();
const int = setInterval(async () => {
if (Date.now() >= start + 60000) {
clearInterval(int);
reject(new Error("Job wait "));
} else {
const state = await x.getState();
if (state === "completed") {
clearInterval(int);
resolve((await getScrapeQueue().getJob(x.id)).returnvalue);
} else if (state === "failed") {
clearInterval(int);
reject((await getScrapeQueue().getJob(x.id)).failedReason);
}
}
}, 1000);
})))).map(x => x[0]);
if (docs.length === 0) {
return { success: true, error: "No search results found", returnCode: 200 };
@ -112,7 +139,7 @@ export async function searchHelper(
// make sure doc.content is not empty
const filteredDocs = docs.filter(
(doc: { content?: string }) => doc.content && doc.content.trim().length > 0
(doc: { content?: string }) => doc && doc.content && doc.content.trim().length > 0
);
if (filteredDocs.length === 0) {
@ -191,6 +218,10 @@ export async function searchController(req: Request, res: Response) {
});
return res.status(result.returnCode).json(result);
} catch (error) {
if (error instanceof Error && error.message.startsWith("Job wait")) {
return res.status(408).json({ error: "Request timed out" });
}
Sentry.captureException(error);
Logger.error(error);
return res.status(500).json({ error: error.message });

View File

@ -1,8 +1,6 @@
import { Request, Response } from "express";
import { Logger } from "../../src/lib/logger";
import { getCrawl, getCrawlJobs } from "../../src/lib/crawl-redis";
import { getScrapeQueue } from "../../src/services/queue-service";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
import { getJobs } from "./crawl-status";
import * as Sentry from "@sentry/node";

View File

@ -119,6 +119,7 @@ if (cluster.isMaster) {
waitingJobs,
});
} catch (error) {
Sentry.captureException(error);
Logger.error(error);
return res.status(500).json({ error: error.message });
}
@ -170,6 +171,7 @@ if (cluster.isMaster) {
}, timeout);
}
} catch (error) {
Sentry.captureException(error);
Logger.debug(error);
}
};

View File

@ -46,7 +46,7 @@ export async function generateCompletions(
return completionResult;
} catch (error) {
Logger.error(`Error generating completions: ${error}`);
throw new Error(`Error generating completions: ${error.message}`);
throw error;
}
default:
throw new Error("Invalid client");

View File

@ -15,7 +15,7 @@ const defaultPrompt =
function prepareOpenAIDoc(
document: Document,
mode: "markdown" | "raw-html"
): [OpenAI.Chat.Completions.ChatCompletionContentPart[], number] {
): [OpenAI.Chat.Completions.ChatCompletionContentPart[], number] | null {
let markdown = document.markdown;
@ -27,9 +27,10 @@ function prepareOpenAIDoc(
// Check if the markdown content exists in the document
if (!extractionTarget) {
throw new Error(
`${mode} content is missing in the document. This is likely due to an error in the scraping process. Please try again or reach out to help@mendable.ai`
);
return null;
// throw new Error(
// `${mode} content is missing in the document. This is likely due to an error in the scraping process. Please try again or reach out to help@mendable.ai`
// );
}
@ -64,7 +65,16 @@ export async function generateOpenAICompletions({
mode: "markdown" | "raw-html";
}): Promise<Document> {
const openai = client as OpenAI;
const [content, numTokens] = prepareOpenAIDoc(document, mode);
const preparedDoc = prepareOpenAIDoc(document, mode);
if (preparedDoc === null) {
return {
...document,
warning: "LLM extraction was not performed since the document's content is empty or missing.",
};
}
const [content, numTokens] = preparedDoc;
const completion = await openai.chat.completions.create({
model,

View File

@ -1,5 +1,5 @@
export function parseMarkdown(html: string) {
export async function parseMarkdown(html: string) {
var TurndownService = require("turndown");
var turndownPluginGfm = require('joplin-turndown-plugin-gfm')
@ -21,7 +21,27 @@ export function parseMarkdown(html: string) {
});
var gfm = turndownPluginGfm.gfm;
turndownService.use(gfm);
let markdownContent = turndownService.turndown(html);
let markdownContent = "";
const turndownPromise = new Promise<string>((resolve, reject) => {
try {
const result = turndownService.turndown(html);
resolve(result);
} catch (error) {
reject("Error converting HTML to Markdown: " + error);
}
});
const timeoutPromise = new Promise<string>((resolve, reject) => {
const timeout = 5000; // Timeout in milliseconds
setTimeout(() => reject("Conversion timed out after " + timeout + "ms"), timeout);
});
try {
markdownContent = await Promise.race([turndownPromise, timeoutPromise]);
} catch (error) {
console.error(error);
return ""; // Optionally return an empty string or handle the error as needed
}
// multiple line links
let insideLinkContent = false;

View File

@ -12,7 +12,6 @@ import { Document } from "../lib/entities";
import { supabase_service } from "../services/supabase";
import { Logger } from "../lib/logger";
import { ScrapeEvents } from "../lib/scrape-events";
import { getScrapeQueue } from "../services/queue-service";
export async function startWebScraperPipeline({
job,

View File

@ -1,6 +1,7 @@
import express from "express";
import { redisHealthController } from "../controllers/admin/redis-health";
import {
autoscalerController,
checkQueuesController,
cleanBefore24hCompleteJobsController,
queuesController,
@ -27,3 +28,8 @@ adminRouter.get(
`/admin/${process.env.BULL_AUTH_KEY}/queues`,
queuesController
);
adminRouter.get(
`/admin/${process.env.BULL_AUTH_KEY}/autoscaler`,
autoscalerController
);

View File

@ -53,8 +53,8 @@ export class WebCrawler {
this.jobId = jobId;
this.initialUrl = initialUrl;
this.baseUrl = new URL(initialUrl).origin;
this.includes = includes ?? [];
this.excludes = excludes ?? [];
this.includes = Array.isArray(includes) ? includes : [];
this.excludes = Array.isArray(excludes) ? excludes : [];
this.limit = limit;
this.robotsTxtUrl = `${this.baseUrl}/robots.txt`;
this.robots = robotsParser(this.robotsTxtUrl, "");
@ -108,7 +108,12 @@ export class WebCrawler {
// Normalize the initial URL and the link to account for www and non-www versions
const normalizedInitialUrl = new URL(this.initialUrl);
const normalizedLink = new URL(link);
let normalizedLink;
try {
normalizedLink = new URL(link);
} catch (_) {
return false;
}
const initialHostname = normalizedInitialUrl.hostname.replace(/^www\./, '');
const linkHostname = normalizedLink.hostname.replace(/^www\./, '');

View File

@ -16,7 +16,6 @@ import {
replacePathsWithAbsolutePaths,
} from "./utils/replacePaths";
import { generateCompletions } from "../../lib/LLM-extraction";
import { getScrapeQueue } from "../../../src/services/queue-service";
import { fetchAndProcessDocx } from "./utils/docxProcessor";
import { getAdjustedMaxDepth, getURLDepth } from "./utils/maxDepthUtils";
import { Logger } from "../../lib/logger";

View File

@ -5,6 +5,7 @@ import { generateRequestParams } from "../single_url";
import { fetchAndProcessPdf } from "../utils/pdfProcessor";
import { universalTimeout } from "../global";
import { Logger } from "../../../lib/logger";
import * as Sentry from "@sentry/node";
/**
* Scrapes a URL with Fire-Engine
@ -92,27 +93,35 @@ export async function scrapWithFireEngine({
});
const startTime = Date.now();
const _response = await axiosInstance.post(
process.env.FIRE_ENGINE_BETA_URL + endpoint,
{
url: url,
wait: waitParam,
screenshot: screenshotParam,
fullPageScreenshot: fullPageScreenshotParam,
headers: headers,
pageOptions: pageOptions,
disableJsDom: pageOptions?.disableJsDom ?? false,
priority,
engine,
instantReturn: true,
...fireEngineOptionsParam,
},
{
headers: {
"Content-Type": "application/json",
const _response = await Sentry.startSpan({
name: "Call to fire-engine"
}, async span => {
return await axiosInstance.post(
process.env.FIRE_ENGINE_BETA_URL + endpoint,
{
url: url,
wait: waitParam,
screenshot: screenshotParam,
fullPageScreenshot: fullPageScreenshotParam,
headers: headers,
pageOptions: pageOptions,
disableJsDom: pageOptions?.disableJsDom ?? false,
priority,
engine,
instantReturn: true,
...fireEngineOptionsParam,
},
{
headers: {
"Content-Type": "application/json",
...(Sentry.isInitialized() ? ({
"sentry-trace": Sentry.spanToTraceHeader(span),
"baggage": Sentry.spanToBaggageHeader(span),
}) : {}),
}
}
}
);
);
});
let checkStatusResponse = await axiosInstance.get(`${process.env.FIRE_ENGINE_BETA_URL}/scrape/${_response.data.jobId}`);
while (checkStatusResponse.data.processing && Date.now() - startTime < universalTimeout + waitParam) {

View File

@ -24,8 +24,8 @@ import { clientSideError } from "../../strings";
dotenv.config();
export const baseScrapers = [
"fire-engine",
"fire-engine;chrome-cdp",
"fire-engine",
"scrapingBee",
process.env.USE_DB_AUTHENTICATION ? undefined : "playwright",
"scrapingBeeLoad",
@ -85,8 +85,8 @@ function getScrapingFallbackOrder(
});
let defaultOrder = [
!process.env.USE_DB_AUTHENTICATION ? undefined : "fire-engine",
!process.env.USE_DB_AUTHENTICATION ? undefined : "fire-engine;chrome-cdp",
!process.env.USE_DB_AUTHENTICATION ? undefined : "fire-engine",
"scrapingBee",
process.env.USE_DB_AUTHENTICATION ? undefined : "playwright",
"scrapingBeeLoad",

View File

@ -8,7 +8,6 @@ describe('Blocklist Functionality', () => {
'https://twitter.com/home',
'https://instagram.com/explore',
'https://linkedin.com/in/johndoe',
'https://pinterest.com/pin/create',
'https://snapchat.com/add/johndoe',
'https://tiktok.com/@johndoe',
'https://reddit.com/r/funny',

View File

@ -8,7 +8,6 @@ describe('isUrlBlocked', () => {
'https://twitter.com/someuser',
'https://instagram.com/someuser',
'https://www.linkedin.com/in/someuser',
'https://pinterest.com/someuser',
'https://snapchat.com/someuser',
'https://tiktok.com/@someuser',
'https://reddit.com/r/somesubreddit',

View File

@ -6,7 +6,6 @@ const socialMediaBlocklist = [
'twitter.com',
'instagram.com',
'linkedin.com',
'pinterest.com',
'snapchat.com',
'tiktok.com',
'reddit.com',
@ -16,7 +15,8 @@ const socialMediaBlocklist = [
'wechat.com',
'telegram.org',
'researchhub.com',
'youtube.com'
'youtube.com',
'corterix.com',
];
const allowedKeywords = [

View File

@ -3,9 +3,9 @@ import { Logger } from "../../../src/lib/logger";
export async function sendSlackWebhook(
message: string,
alertEveryone: boolean = false
alertEveryone: boolean = false,
webhookUrl: string = process.env.SLACK_WEBHOOK_URL ?? ""
) {
const webhookUrl = process.env.SLACK_WEBHOOK_URL;
const messagePrefix = alertEveryone ? "<!channel> " : "";
const payload = {
text: `${messagePrefix} ${message}`,

View File

@ -317,21 +317,21 @@ export async function supaCheckTeamCredits(team_id: string, credits: number) {
// Compare the adjusted total credits used with the credits allowed by the plan
if (adjustedCreditsUsed + credits > price.credits) {
await sendNotification(
team_id,
NotificationType.LIMIT_REACHED,
subscription.current_period_start,
subscription.current_period_end
);
// await sendNotification(
// team_id,
// NotificationType.LIMIT_REACHED,
// subscription.current_period_start,
// subscription.current_period_end
// );
return { success: false, message: "Insufficient credits, please upgrade!", remainingCredits: creditLimit - adjustedCreditsUsed };
} else if (creditUsagePercentage >= 0.8) {
// Send email notification for approaching credit limit
await sendNotification(
team_id,
NotificationType.APPROACHING_LIMIT,
subscription.current_period_start,
subscription.current_period_end
);
// await sendNotification(
// team_id,
// NotificationType.APPROACHING_LIMIT,
// subscription.current_period_start,
// subscription.current_period_end
// );
}
return { success: true, message: "Sufficient credits available", remainingCredits: creditLimit - adjustedCreditsUsed };

View File

@ -2,6 +2,20 @@ import { Job, Queue } from "bullmq";
import { getScrapeQueue } from "./queue-service";
import { v4 as uuidv4 } from "uuid";
import { WebScraperOptions } from "../types";
import * as Sentry from "@sentry/node";
async function addScrapeJobRaw(
webScraperOptions: any,
options: any,
jobId: string,
jobPriority: number = 10
): Promise<Job> {
return await getScrapeQueue().add(jobId, webScraperOptions, {
...options,
priority: jobPriority,
jobId,
});
}
export async function addScrapeJob(
webScraperOptions: WebScraperOptions,
@ -9,11 +23,30 @@ export async function addScrapeJob(
jobId: string = uuidv4(),
jobPriority: number = 10
): Promise<Job> {
return await getScrapeQueue().add(jobId, webScraperOptions, {
priority: jobPriority,
...options,
jobId,
});
if (Sentry.isInitialized()) {
const size = JSON.stringify(webScraperOptions).length;
return await Sentry.startSpan({
name: "Add scrape job",
op: "queue.publish",
attributes: {
"messaging.message.id": jobId,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": size,
},
}, async (span) => {
return await addScrapeJobRaw({
...webScraperOptions,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
}, options, jobId, jobPriority);
});
} else {
return await addScrapeJobRaw(webScraperOptions, options, jobId, jobPriority);
}
}

View File

@ -35,6 +35,6 @@ export function getScrapeQueue() {
}
import { QueueEvents } from 'bullmq';
export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection });
// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE
// import { QueueEvents } from 'bullmq';
// export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() });

View File

@ -53,6 +53,7 @@ const processJobInternal = async (token: string, job: Job) => {
}, jobLockExtendInterval);
await addJobPriority(job.data.team_id, job.id );
let err = null;
try {
const result = await processJob(job, token);
try{
@ -65,11 +66,15 @@ const processJobInternal = async (token: string, job: Job) => {
}
} catch (error) {
console.log("Job failed, error:", error);
Sentry.captureException(error);
err = error;
await job.moveToFailed(error, token, false);
} finally {
await deleteJobPriority(job.data.team_id, job.id );
clearInterval(extendLockInterval);
}
return err;
};
let isShuttingDown = false;
@ -79,7 +84,7 @@ process.on("SIGINT", () => {
isShuttingDown = true;
});
const workerFun = async (queueName: string, processJobInternal: (token: string, job: Job) => Promise<void>) => {
const workerFun = async (queueName: string, processJobInternal: (token: string, job: Job) => Promise<any>) => {
const worker = new Worker(queueName, null, {
connection: redisConnection,
lockDuration: 1 * 60 * 1000, // 1 minute
@ -107,16 +112,47 @@ const workerFun = async (queueName: string, processJobInternal: (token: string,
const job = await worker.getNextJob(token);
if (job) {
Sentry.startSpan({
name: "Scrape job",
op: "bullmq.job",
attributes: {
job: job.id,
worker: process.env.FLY_MACHINE_ID ?? worker.id,
},
}, async () => {
await processJobInternal(token, job);
});
if (job.data && job.data.sentry && Sentry.isInitialized()) {
Sentry.continueTrace({ sentryTrace: job.data.sentry.trace, baggage: job.data.sentry.baggage }, () => {
Sentry.startSpan({
name: "Scrape job",
attributes: {
job: job.id,
worker: process.env.FLY_MACHINE_ID ?? worker.id,
},
}, async (span) => {
await Sentry.startSpan({
name: "Process scrape job",
op: "queue.process",
attributes: {
"messaging.message.id": job.id,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": job.data.sentry.size,
"messaging.message.receive.latency": Date.now() - (job.processedOn ?? job.timestamp),
"messaging.message.retry.count": job.attemptsMade,
}
}, async () => {
const res = await processJobInternal(token, job);
if (res !== null) {
span.setStatus({ code: 2 }); // ERROR
} else {
span.setStatus({ code: 1 }); // OK
}
});
});
});
} else {
Sentry.startSpan({
name: "Scrape job",
attributes: {
job: job.id,
worker: process.env.FLY_MACHINE_ID ?? worker.id,
},
}, () => {
processJobInternal(token, job);
});
}
await sleep(gotJobInterval);
} else {
await sleep(connectionMonitorInterval);

View File

@ -17,10 +17,23 @@ const RATE_LIMITS = {
growthdouble: 50,
},
scrape: {
default: 20,
free: 10,
starter: 20,
standard: 100,
standardOld: 40,
scale: 500,
hobby: 20,
standardNew: 100,
standardnew: 100,
growth: 1000,
growthdouble: 1000,
},
search: {
default: 20,
free: 5,
starter: 20,
standard: 50,
standard: 40,
standardOld: 40,
scale: 500,
hobby: 10,
@ -29,7 +42,7 @@ const RATE_LIMITS = {
growth: 500,
growthdouble: 500,
},
search: {
map:{
default: 20,
free: 5,
starter: 20,
@ -84,16 +97,28 @@ export const testSuiteRateLimiter = new RateLimiterRedis({
duration: 60, // Duration in seconds
});
export const devBRateLimiter = new RateLimiterRedis({
storeClient: redisRateLimitClient,
keyPrefix: "dev-b",
points: 1200,
duration: 60, // Duration in seconds
});
export function getRateLimiter(
mode: RateLimiterMode,
token: string,
plan?: string
plan?: string,
teamId?: string
) {
if (token.includes("a01ccae") || token.includes("6254cf9")) {
if (token.includes("a01ccae") || token.includes("6254cf9") || token.includes("0f96e673") || token.includes("23befa1b")) {
return testSuiteRateLimiter;
}
if(teamId === process.env.DEV_B_TEAM_ID) {
return devBRateLimiter;
}
const rateLimitConfig = RATE_LIMITS[mode]; // {default : 5}
if (!rateLimitConfig) return serverRateLimiter;

View File

@ -10,8 +10,9 @@ if (process.env.SENTRY_DSN) {
integrations: [
nodeProfilingIntegration(),
],
tracesSampleRate: 0.045,
tracesSampleRate: process.env.SENTRY_ENVIRONMENT === "dev" ? 1.0 : 0.045,
profilesSampleRate: 1.0,
serverName: process.env.FLY_MACHINE_ID,
environment: process.env.SENTRY_ENVIRONMENT ?? "production",
});
}

2
apps/redis/.dockerignore Normal file
View File

@ -0,0 +1,2 @@
.git
fly.toml

6
apps/redis/Dockerfile Normal file
View File

@ -0,0 +1,6 @@
ARG REDIS_VERSION=7.2.5
FROM bitnami/redis:${REDIS_VERSION}
COPY start-redis-server.sh /usr/bin/start-redis-server.sh
CMD ["/usr/bin/start-redis-server.sh"]

2
apps/redis/Procfile Normal file
View File

@ -0,0 +1,2 @@
redis: /usr/bin/start-redis-server.sh
metrics: /usr/local/bin/redis_exporter -redis.addr localhost:6379 -web.listen-address ":9091"

48
apps/redis/README.md Normal file
View File

@ -0,0 +1,48 @@
The official repository for Running Redis on Fly.io. Find the accompanying Docker image at [flyio/redis](https://hub.docker.com/repository/docker/flyio/redis).
## Usage
This installation requires setting a password on Redis. To do that, run `fly secrets set REDIS_PASSWORD=mypassword` before deploying. Keep
track of this password - it won't be visible again after deployment!
If you need no customizations, you can deploy using the official Docker image. See `fly.toml` in this repository for an example to get started with.
## Runtime requirements
By default, this Redis installation will only accept connections on the private IPv6 network, on the standard port 6379.
If you want to access it from the public internet, add a `[[services]]` section to your `fly.toml`. An example is included in this repo for accessing Redis on port 10000.
We recommend adding persistent storage for Redis data. If you skip this step, data will be lost across deploys or restarts. For Fly apps, the volume needs to be in the same region as the app instances. For example:
```cmd
flyctl volumes create redis_server --region ord
```
```out
Name: redis_server
Region: ord
Size GB: 10
Created at: 02 Nov 20 19:55 UTC
```
To connect this volume to the app, `fly.toml` includes a `[mounts]` entry.
```
[mounts]
source = "redis_server"
destination = "/data"
```
When the app starts, that volume will be mounted on /data.
## Cutting a release
If you have write access to this repo, you can ship a prerelease or full release with:
```
scripts/bump_version.sh
```
or
```
scripts/bump_version.sh prerel
```

View File

@ -1,13 +1,8 @@
app = 'firecrawl-dragonfly'
primary_region = 'iad'
[experimental]
cmd = ['dragonfly','--logtostderr', '--cluster_mode=emulated', '--lock_on_hashtags', "--bind","::"]
[build]
image = 'ghcr.io/dragonflydb/dragonfly'
[[mounts]]
source = 'firecrawl_dragonfly'
source = 'firecrawl_redis'
destination = '/data'
[[services]]

View File

@ -0,0 +1,91 @@
#!/usr/bin/env bash
set -euo pipefail
ORIGIN=${ORIGIN:-origin}
bump=${1:-patch}
prerel=${2:-none}
if [[ $bump == "prerel" ]]; then
bump="patch"
prerel="prerel"
fi
if [[ $(git status --porcelain) != "" ]]; then
echo "Error: repo is dirty. Run git status, clean repo and try again."
exit 1
elif [[ $(git status --porcelain -b | grep -e "ahead" -e "behind") != "" ]]; then
echo "Error: repo has unpushed commits. Push commits to remote and try again."
exit 1
fi
BRANCH="$(git rev-parse --abbrev-ref HEAD)"
if [[ "$prerel" == "prerel" && "$BRANCH" != "prerelease" ]]; then
# echo "❌ Sorry, you can only cut a pre-release from the 'prelease' branch"
# echo "Run 'git checkout prerelease && git pull origin prerelease' and try again."
# exit 1
echo "⚠️ Pre-releases should be cut from the 'prerelease' branch"
echo "Please make sure you're not overwriting someone else's prerelease!"
echo
read -p "Release anyway? " -n 1 -r
echo
if [[ $REPLY =~ ^[^Yy]$ ]]; then
echo Aborting.
exit 1
fi
fi
if [[ "$prerel" != "prerel" && "$BRANCH" != "main" ]]; then
echo "❌ Sorry, you can only cut a release from the 'main' branch"
echo "Run 'git checkout main && git pull origin main' and try again."
exit 1
fi
git fetch
if [[ "$(git rev-parse HEAD 2>&1)" != "$(git rev-parse '@{u}' 2>&1)" ]]; then
echo "There are upstream commits that won't be included in this release."
echo "You probably want to exit, run 'git pull', then release."
echo
read -p "Release anyway? " -n 1 -r
echo
if [[ $REPLY =~ ^[^Yy]$ ]]; then
echo Aborting.
exit 1
fi
fi
dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
previous_version="$("$dir"/../scripts/version.sh -s)"
if [[ $prerel == "prerel" ]]; then
prerelversion=$("$dir"/../scripts/semver get prerel "$previous_version")
if [[ $prerelversion == "" ]]; then
new_version=$("$dir"/../scripts/semver bump "$bump" "$previous_version")
new_version=$("$dir"/../scripts/semver bump prerel pre-1 "$new_version")
else
prerel=pre-$((${prerelversion#pre-} + 1))
new_version=$("$dir"/../scripts/semver bump prerel "$prerel" "$previous_version")
fi
else
prerelversion=$("$dir"/../scripts/semver get prerel "$previous_version")
if [[ $prerelversion == "" ]]; then
new_version=$("$dir"/../scripts/semver bump "$bump" "$previous_version")
else
new_version=${previous_version//-$prerelversion/}
fi
fi
new_version="v$new_version"
echo "Bumping version from v${previous_version} to ${new_version}"
read -p "Are you sure? " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]
then
git tag -m "release ${new_version}" -a "$new_version" && git push "${ORIGIN}" tag "$new_version"
echo "done"
fi

200
apps/redis/scripts/semver Executable file
View File

@ -0,0 +1,200 @@
#!/usr/bin/env bash
set -o errexit -o nounset -o pipefail
SEMVER_REGEX="^[vV]?(0|[1-9][0-9]*)\\.(0|[1-9][0-9]*)\\.(0|[1-9][0-9]*)(\\-[0-9A-Za-z-]+(\\.[0-9A-Za-z-]+)*)?(\\+[0-9A-Za-z-]+(\\.[0-9A-Za-z-]+)*)?$"
PROG=semver
PROG_VERSION=2.1.0
USAGE="\
Usage:
$PROG bump (major|minor|patch|release|prerel <prerel>|build <build>) <version>
$PROG compare <version> <other_version>
$PROG get (major|minor|patch|release|prerel|build) <version>
$PROG --help
$PROG --version
Arguments:
<version> A version must match the following regex pattern:
\"${SEMVER_REGEX}\".
In english, the version must match X.Y.Z(-PRERELEASE)(+BUILD)
where X, Y and Z are positive integers, PRERELEASE is an optional
string composed of alphanumeric characters and hyphens and
BUILD is also an optional string composed of alphanumeric
characters and hyphens.
<other_version> See <version> definition.
<prerel> String that must be composed of alphanumeric characters and hyphens.
<build> String that must be composed of alphanumeric characters and hyphens.
Options:
-v, --version Print the version of this tool.
-h, --help Print this help message.
Commands:
bump Bump <version> by one of major, minor, patch, prerel, build
or a forced potentially conflicting version. The bumped version is
shown to stdout.
compare Compare <version> with <other_version>, output to stdout the
following values: -1 if <other_version> is newer, 0 if equal, 1 if
older.
get Extract given part of <version>, where part is one of major, minor,
patch, prerel, build."
function error {
echo -e "$1" >&2
exit 1
}
function usage-help {
error "$USAGE"
}
function usage-version {
echo -e "${PROG}: $PROG_VERSION"
exit 0
}
function validate-version {
local version=$1
if [[ "$version" =~ $SEMVER_REGEX ]]; then
# if a second argument is passed, store the result in var named by $2
if [ "$#" -eq "2" ]; then
local major=${BASH_REMATCH[1]}
local minor=${BASH_REMATCH[2]}
local patch=${BASH_REMATCH[3]}
local prere=${BASH_REMATCH[4]}
local build=${BASH_REMATCH[6]}
eval "$2=(\"$major\" \"$minor\" \"$patch\" \"$prere\" \"$build\")"
else
echo "$version"
fi
else
error "version $version does not match the semver scheme 'X.Y.Z(-PRERELEASE)(+BUILD)'. See help for more information."
fi
}
function compare-version {
validate-version "$1" V
validate-version "$2" V_
# MAJOR, MINOR and PATCH should compare numerically
for i in 0 1 2; do
local diff=$((${V[$i]} - ${V_[$i]}))
if [[ $diff -lt 0 ]]; then
echo -1; return 0
elif [[ $diff -gt 0 ]]; then
echo 1; return 0
fi
done
# PREREL should compare with the ASCII order.
if [[ -z "${V[3]}" ]] && [[ -n "${V_[3]}" ]]; then
echo 1; return 0;
elif [[ -n "${V[3]}" ]] && [[ -z "${V_[3]}" ]]; then
echo -1; return 0;
elif [[ -n "${V[3]}" ]] && [[ -n "${V_[3]}" ]]; then
if [[ "${V[3]}" > "${V_[3]}" ]]; then
echo 1; return 0;
elif [[ "${V[3]}" < "${V_[3]}" ]]; then
echo -1; return 0;
fi
fi
echo 0
}
function command-bump {
local new; local version; local sub_version; local command;
case $# in
2) case $1 in
major|minor|patch|release) command=$1; version=$2;;
*) usage-help;;
esac ;;
3) case $1 in
prerel|build) command=$1; sub_version=$2 version=$3 ;;
*) usage-help;;
esac ;;
*) usage-help;;
esac
validate-version "$version" parts
# shellcheck disable=SC2154
local major="${parts[0]}"
local minor="${parts[1]}"
local patch="${parts[2]}"
local prere="${parts[3]}"
local build="${parts[4]}"
case "$command" in
major) new="$((major + 1)).0.0";;
minor) new="${major}.$((minor + 1)).0";;
patch) new="${major}.${minor}.$((patch + 1))";;
release) new="${major}.${minor}.${patch}";;
prerel) new=$(validate-version "${major}.${minor}.${patch}-${sub_version}");;
build) new=$(validate-version "${major}.${minor}.${patch}${prere}+${sub_version}");;
*) usage-help ;;
esac
echo "$new"
exit 0
}
function command-compare {
local v; local v_;
case $# in
2) v=$(validate-version "$1"); v_=$(validate-version "$2") ;;
*) usage-help ;;
esac
compare-version "$v" "$v_"
exit 0
}
# shellcheck disable=SC2034
function command-get {
local part version
if [[ "$#" -ne "2" ]] || [[ -z "$1" ]] || [[ -z "$2" ]]; then
usage-help
exit 0
fi
part="$1"
version="$2"
validate-version "$version" parts
local major="${parts[0]}"
local minor="${parts[1]}"
local patch="${parts[2]}"
local prerel="${parts[3]:1}"
local build="${parts[4]:1}"
case "$part" in
major|minor|patch|release|prerel|build) echo "${!part}" ;;
*) usage-help ;;
esac
exit 0
}
case $# in
0) echo "Unknown command: $*"; usage-help;;
esac
case $1 in
--help|-h) echo -e "$USAGE"; exit 0;;
--version|-v) usage-version ;;
bump) shift; command-bump "$@";;
get) shift; command-get "$@";;
compare) shift; command-compare "$@";;
*) echo "Unknown arguments: $*"; usage-help;;
esac

5
apps/redis/scripts/version.sh Executable file
View File

@ -0,0 +1,5 @@
ORIGIN=${ORIGIN:-origin}
version=$(git fetch --tags "${ORIGIN}" &>/dev/null | git -c "versionsort.prereleasesuffix=-pre" tag -l --sort=version:refname | grep -v dev | grep -vE '^v2$' | grep -vE '^v1$' | tail -n1 | cut -c 2-)
echo "$version"

View File

@ -0,0 +1,30 @@
#!/bin/bash
set -e
sysctl vm.overcommit_memory=1 || true
sysctl net.core.somaxconn=1024 || true
PW_ARG=""
if [[ ! -z "${REDIS_PASSWORD}" ]]; then
PW_ARG="--requirepass $REDIS_PASSWORD"
fi
# Set maxmemory-policy to 'allkeys-lru' for caching servers that should always evict old keys
: ${MAXMEMORY_POLICY:="volatile-lru"}
: ${APPENDONLY:="no"}
: ${FLY_VM_MEMORY_MB:=512}
if [ "${NOSAVE}" = "" ] ; then
: ${SAVE:="3600 1 300 100 60 10000"}
fi
# Set maxmemory to 10% of available memory
MAXMEMORY=$(($FLY_VM_MEMORY_MB*80/100))
mkdir /data/redis
redis-server $PW_ARG \
--dir /data/redis \
--maxmemory "${MAXMEMORY}mb" \
--maxmemory-policy $MAXMEMORY_POLICY \
--appendonly $APPENDONLY \
--save "$SAVE"

File diff suppressed because one or more lines are too long