Merge pull request #424 from mendableai/nsc/seperate-rate-limit

Redis Health Checks
This commit is contained in:
Nicolas 2024-07-16 22:53:28 -04:00 committed by GitHub
commit d7f185428f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 444 additions and 80 deletions

20
.github/workflows/check-redis.yml vendored Normal file
View File

@ -0,0 +1,20 @@
name: Check Redis
on:
schedule:
- cron: '*/5 * * * *'
env:
BULL_AUTH_KEY: ${{ secrets.BULL_AUTH_KEY }}
jobs:
clean-jobs:
runs-on: ubuntu-latest
steps:
- name: Send GET request to check queues
run: |
response=$(curl --write-out '%{http_code}' --silent --output /dev/null --max-time 180 https://api.firecrawl.dev/admin/${{ secrets.BULL_AUTH_KEY }}/redis-health)
if [ "$response" -ne 200 ]; then
echo "Failed to check queues. Response: $response"
exit 1
fi
echo "Successfully checked queues. Response: $response"

View File

@ -73,7 +73,7 @@
"form-data": "^4.0.0",
"glob": "^10.4.2",
"gpt3-tokenizer": "^1.1.5",
"ioredis": "^5.3.2",
"ioredis": "^5.4.1",
"joplin-turndown-plugin-gfm": "^1.0.12",
"json-schema-to-zod": "^2.3.0",
"keyword-extractor": "^0.0.28",
@ -92,7 +92,6 @@
"promptable": "^0.0.10",
"puppeteer": "^22.12.1",
"rate-limiter-flexible": "2.4.2",
"redis": "^4.6.7",
"resend": "^3.4.0",
"robots-parser": "^3.0.1",
"scrapingbee": "^1.7.4",

View File

@ -90,7 +90,7 @@ importers:
specifier: ^1.1.5
version: 1.1.5
ioredis:
specifier: ^5.3.2
specifier: ^5.4.1
version: 5.4.1
joplin-turndown-plugin-gfm:
specifier: ^1.0.12
@ -146,9 +146,6 @@ importers:
rate-limiter-flexible:
specifier: 2.4.2
version: 2.4.2
redis:
specifier: ^4.6.7
version: 4.6.14
resend:
specifier: ^3.4.0
version: 3.4.0

View File

@ -3,7 +3,6 @@ import bodyParser from "body-parser";
import cors from "cors";
import "dotenv/config";
import { getWebScraperQueue } from "./services/queue-service";
import { redisClient } from "./services/rate-limiter";
import { v0Router } from "./routes/v0";
import { initSDK } from "@hyperdx/node-opentelemetry";
import cluster from "cluster";
@ -11,6 +10,8 @@ import os from "os";
import { Job } from "bull";
import { sendSlackWebhook } from "./services/alerts/slack";
import { checkAlerts } from "./services/alerts";
import Redis from "ioredis";
import { redisRateLimitClient } from "./services/rate-limiter";
const { createBullBoard } = require("@bull-board/api");
const { BullAdapter } = require("@bull-board/api/bullAdapter");
@ -34,11 +35,9 @@ if (cluster.isMaster) {
cluster.fork();
}
});
} else {
const app = express();
global.isProduction = process.env.IS_PRODUCTION === "true";
app.use(bodyParser.urlencoded({ extended: true }));
@ -46,6 +45,7 @@ if (cluster.isMaster) {
app.use(cors()); // Add this line to enable CORS
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`);
@ -73,7 +73,6 @@ if (cluster.isMaster) {
const DEFAULT_PORT = process.env.PORT ?? 3002;
const HOST = process.env.HOST ?? "localhost";
redisClient.connect();
// HyperDX OpenTelemetry
if (process.env.ENV === "production") {
@ -121,7 +120,6 @@ if (cluster.isMaster) {
});
app.post(`/admin/${process.env.BULL_AUTH_KEY}/shutdown`, async (req, res) => {
// return res.status(200).json({ ok: true });
try {
console.log("Gracefully shutting down...");
@ -138,34 +136,38 @@ if (cluster.isMaster) {
const wsq = getWebScraperQueue();
const jobs = await wsq.getActive();
console.log("Requeueing", jobs.length, "jobs...");
if (jobs.length > 0) {
console.log(" Removing", jobs.length, "jobs...");
await Promise.all(jobs.map(async x => {
try {
await wsq.client.del(await x.lockKey());
await x.takeLock();
await x.moveToFailed({ message: "interrupted" });
await x.remove();
} catch (e) {
console.warn("Failed to remove job", x.id, e);
}
}));
await Promise.all(
jobs.map(async (x) => {
try {
await wsq.client.del(await x.lockKey());
await x.takeLock();
await x.moveToFailed({ message: "interrupted" });
await x.remove();
} catch (e) {
console.warn("Failed to remove job", x.id, e);
}
})
);
console.log(" Re-adding", jobs.length, "jobs...");
await wsq.addBulk(jobs.map(x => ({
data: x.data,
opts: {
jobId: x.id,
},
})));
await wsq.addBulk(
jobs.map((x) => ({
data: x.data,
opts: {
jobId: x.id,
},
}))
);
console.log(" Done!");
}
await getWebScraperQueue().resume(false);
res.json({ ok: true });
} catch (error) {
@ -268,27 +270,32 @@ if (cluster.isMaster) {
const numberOfBatches = 9; // Adjust based on your needs
const completedJobsPromises: Promise<Job[]>[] = [];
for (let i = 0; i < numberOfBatches; i++) {
completedJobsPromises.push(webScraperQueue.getJobs(
["completed"],
i * batchSize,
i * batchSize + batchSize,
true
));
completedJobsPromises.push(
webScraperQueue.getJobs(
["completed"],
i * batchSize,
i * batchSize + batchSize,
true
)
);
}
const completedJobs: Job[] = (await Promise.all(completedJobsPromises)).flat();
const before24hJobs = completedJobs.filter(
(job) => job.finishedOn < Date.now() - 24 * 60 * 60 * 1000
) || [];
const completedJobs: Job[] = (
await Promise.all(completedJobsPromises)
).flat();
const before24hJobs =
completedJobs.filter(
(job) => job.finishedOn < Date.now() - 24 * 60 * 60 * 1000
) || [];
let count = 0;
if (!before24hJobs) {
return res.status(200).send(`No jobs to remove.`);
}
for (const job of before24hJobs) {
try {
await job.remove()
await job.remove();
count++;
} catch (jobError) {
console.error(`Failed to remove job with ID ${job.id}:`, jobError);
@ -306,8 +313,75 @@ if (cluster.isMaster) {
res.send({ isProduction: global.isProduction });
});
app.get(
`/admin/${process.env.BULL_AUTH_KEY}/redis-health`,
async (req, res) => {
try {
const queueRedis = new Redis(process.env.REDIS_URL);
const testKey = "test";
const testValue = "test";
// Test queueRedis
let queueRedisHealth;
try {
await queueRedis.set(testKey, testValue);
queueRedisHealth = await queueRedis.get(testKey);
await queueRedis.del(testKey);
} catch (error) {
console.error("queueRedis health check failed:", error);
queueRedisHealth = null;
}
// Test redisRateLimitClient
let redisRateLimitHealth;
try {
await redisRateLimitClient.set(testKey, testValue);
redisRateLimitHealth = await redisRateLimitClient.get(testKey);
await redisRateLimitClient.del(testKey);
} catch (error) {
console.error("redisRateLimitClient health check failed:", error);
redisRateLimitHealth = null;
}
const healthStatus = {
queueRedis: queueRedisHealth === testValue ? "healthy" : "unhealthy",
redisRateLimitClient:
redisRateLimitHealth === testValue ? "healthy" : "unhealthy",
};
if (
healthStatus.queueRedis === "healthy" &&
healthStatus.redisRateLimitClient === "healthy"
) {
console.log("Both Redis instances are healthy");
return res
.status(200)
.json({ status: "healthy", details: healthStatus });
} else {
console.log("Redis instances health check:", healthStatus);
await sendSlackWebhook(
`[REDIS DOWN] Redis instances health check: ${JSON.stringify(
healthStatus
)}`,
true
);
return res
.status(500)
.json({ status: "unhealthy", details: healthStatus });
}
} catch (error) {
console.error("Redis health check failed:", error);
await sendSlackWebhook(
`[REDIS DOWN] Redis instances health check: ${error.message}`,
true
);
return res
.status(500)
.json({ status: "unhealthy", message: error.message });
}
}
);
console.log(`Worker ${process.pid} started`);
}

View File

@ -1,48 +1,98 @@
import { getRateLimiter, serverRateLimiter, testSuiteRateLimiter, redisClient } from "./rate-limiter";
import {
getRateLimiter,
serverRateLimiter,
testSuiteRateLimiter,
redisRateLimitClient,
} from "./rate-limiter";
import { RateLimiterMode } from "../../src/types";
import { RateLimiterRedis } from "rate-limiter-flexible";
describe("Rate Limiter Service", () => {
beforeAll(async () => {
await redisClient.connect();
try {
await redisRateLimitClient.connect();
// if (process.env.REDIS_RATE_LIMIT_URL === "redis://localhost:6379") {
// console.log("Erasing all keys");
// // erase all the keys that start with "test-prefix"
// const keys = await redisRateLimitClient.keys("test-prefix:*");
// if (keys.length > 0) {
// await redisRateLimitClient.del(...keys);
// }
// }
} catch (error) {}
});
afterAll(async () => {
await redisClient.disconnect();
try {
// if (process.env.REDIS_RATE_LIMIT_URL === "redis://localhost:6379") {
await redisRateLimitClient.disconnect();
// }
} catch (error) {}
});
it("should return the testSuiteRateLimiter for specific tokens", () => {
const limiter = getRateLimiter("crawl" as RateLimiterMode, "a01ccae");
const limiter = getRateLimiter(
"crawl" as RateLimiterMode,
"test-prefix:a01ccae"
);
expect(limiter).toBe(testSuiteRateLimiter);
const limiter2 = getRateLimiter("scrape" as RateLimiterMode, "6254cf9");
const limiter2 = getRateLimiter(
"scrape" as RateLimiterMode,
"test-prefix:6254cf9"
);
expect(limiter2).toBe(testSuiteRateLimiter);
});
it("should return the serverRateLimiter if mode is not found", () => {
const limiter = getRateLimiter("nonexistent" as RateLimiterMode, "someToken");
const limiter = getRateLimiter(
"nonexistent" as RateLimiterMode,
"test-prefix:someToken"
);
expect(limiter).toBe(serverRateLimiter);
});
it("should return the correct rate limiter based on mode and plan", () => {
const limiter = getRateLimiter("crawl" as RateLimiterMode, "someToken", "free");
const limiter = getRateLimiter(
"crawl" as RateLimiterMode,
"test-prefix:someToken",
"free"
);
expect(limiter.points).toBe(2);
const limiter2 = getRateLimiter("scrape" as RateLimiterMode, "someToken", "standard");
const limiter2 = getRateLimiter(
"scrape" as RateLimiterMode,
"test-prefix:someToken",
"standard"
);
expect(limiter2.points).toBe(50);
const limiter3 = getRateLimiter("search" as RateLimiterMode, "someToken", "growth");
const limiter3 = getRateLimiter(
"search" as RateLimiterMode,
"test-prefix:someToken",
"growth"
);
expect(limiter3.points).toBe(500);
const limiter4 = getRateLimiter("crawlStatus" as RateLimiterMode, "someToken", "growth");
const limiter4 = getRateLimiter(
"crawlStatus" as RateLimiterMode,
"test-prefix:someToken",
"growth"
);
expect(limiter4.points).toBe(150);
});
it("should return the default rate limiter if plan is not provided", () => {
const limiter = getRateLimiter("crawl" as RateLimiterMode, "someToken");
const limiter = getRateLimiter(
"crawl" as RateLimiterMode,
"test-prefix:someToken"
);
expect(limiter.points).toBe(3);
const limiter2 = getRateLimiter("scrape" as RateLimiterMode, "someToken");
const limiter2 = getRateLimiter(
"scrape" as RateLimiterMode,
"test-prefix:someToken"
);
expect(limiter2.points).toBe(20);
});
@ -50,7 +100,7 @@ describe("Rate Limiter Service", () => {
const keyPrefix = "test-prefix";
const points = 10;
const limiter = new RateLimiterRedis({
storeClient: redisClient,
storeClient: redisRateLimitClient,
keyPrefix,
points,
duration: 60,
@ -62,26 +112,253 @@ describe("Rate Limiter Service", () => {
});
it("should return the correct rate limiter for 'preview' mode", () => {
const limiter = getRateLimiter("preview" as RateLimiterMode, "someToken", "free");
const limiter = getRateLimiter(
"preview" as RateLimiterMode,
"test-prefix:someToken",
"free"
);
expect(limiter.points).toBe(5);
const limiter2 = getRateLimiter("preview" as RateLimiterMode, "someToken");
const limiter2 = getRateLimiter(
"preview" as RateLimiterMode,
"test-prefix:someToken"
);
expect(limiter2.points).toBe(5);
});
it("should return the correct rate limiter for 'account' mode", () => {
const limiter = getRateLimiter("account" as RateLimiterMode, "someToken", "free");
const limiter = getRateLimiter(
"account" as RateLimiterMode,
"test-prefix:someToken",
"free"
);
expect(limiter.points).toBe(100);
const limiter2 = getRateLimiter("account" as RateLimiterMode, "someToken");
const limiter2 = getRateLimiter(
"account" as RateLimiterMode,
"test-prefix:someToken"
);
expect(limiter2.points).toBe(100);
});
it("should return the correct rate limiter for 'crawlStatus' mode", () => {
const limiter = getRateLimiter("crawlStatus" as RateLimiterMode, "someToken", "free");
const limiter = getRateLimiter(
"crawlStatus" as RateLimiterMode,
"test-prefix:someToken",
"free"
);
expect(limiter.points).toBe(150);
const limiter2 = getRateLimiter("crawlStatus" as RateLimiterMode, "someToken");
const limiter2 = getRateLimiter(
"crawlStatus" as RateLimiterMode,
"test-prefix:someToken"
);
expect(limiter2.points).toBe(150);
});
it("should consume points correctly for 'crawl' mode", async () => {
const limiter = getRateLimiter(
"crawl" as RateLimiterMode,
"test-prefix:someTokenCRAWL",
"free"
);
const consumePoints = 1;
const res = await limiter.consume(
"test-prefix:someTokenCRAWL",
consumePoints
);
expect(res.remainingPoints).toBe(1);
});
it("should consume points correctly for 'scrape' mode (DEFAULT)", async () => {
const limiter = getRateLimiter(
"scrape" as RateLimiterMode,
"test-prefix:someTokenX"
);
const consumePoints = 4;
const res = await limiter.consume("test-prefix:someTokenX", consumePoints);
expect(res.remainingPoints).toBe(16);
});
it("should consume points correctly for 'scrape' mode (HOBBY)", async () => {
const limiter = getRateLimiter(
"scrape" as RateLimiterMode,
"test-prefix:someTokenXY",
"hobby"
);
// expect hobby to have 100 points
expect(limiter.points).toBe(10);
const consumePoints = 5;
const res = await limiter.consume("test-prefix:someTokenXY", consumePoints);
expect(res.consumedPoints).toBe(5);
expect(res.remainingPoints).toBe(5);
});
it("should return the correct rate limiter for 'crawl' mode", () => {
const limiter = getRateLimiter(
"crawl" as RateLimiterMode,
"test-prefix:someToken",
"free"
);
expect(limiter.points).toBe(2);
const limiter2 = getRateLimiter(
"crawl" as RateLimiterMode,
"test-prefix:someToken",
"starter"
);
expect(limiter2.points).toBe(3);
const limiter3 = getRateLimiter(
"crawl" as RateLimiterMode,
"test-prefix:someToken",
"standard"
);
expect(limiter3.points).toBe(5);
});
it("should return the correct rate limiter for 'scrape' mode", () => {
const limiter = getRateLimiter(
"scrape" as RateLimiterMode,
"test-prefix:someToken",
"free"
);
expect(limiter.points).toBe(5);
const limiter2 = getRateLimiter(
"scrape" as RateLimiterMode,
"test-prefix:someToken",
"starter"
);
expect(limiter2.points).toBe(20);
const limiter3 = getRateLimiter(
"scrape" as RateLimiterMode,
"test-prefix:someToken",
"standard"
);
expect(limiter3.points).toBe(50);
});
it("should return the correct rate limiter for 'search' mode", () => {
const limiter = getRateLimiter(
"search" as RateLimiterMode,
"test-prefix:someToken",
"free"
);
expect(limiter.points).toBe(5);
const limiter2 = getRateLimiter(
"search" as RateLimiterMode,
"test-prefix:someToken",
"starter"
);
expect(limiter2.points).toBe(20);
const limiter3 = getRateLimiter(
"search" as RateLimiterMode,
"test-prefix:someToken",
"standard"
);
expect(limiter3.points).toBe(40);
});
it("should return the correct rate limiter for 'preview' mode", () => {
const limiter = getRateLimiter(
"preview" as RateLimiterMode,
"test-prefix:someToken",
"free"
);
expect(limiter.points).toBe(5);
const limiter2 = getRateLimiter(
"preview" as RateLimiterMode,
"test-prefix:someToken"
);
expect(limiter2.points).toBe(5);
});
it("should return the correct rate limiter for 'account' mode", () => {
const limiter = getRateLimiter(
"account" as RateLimiterMode,
"test-prefix:someToken",
"free"
);
expect(limiter.points).toBe(100);
const limiter2 = getRateLimiter(
"account" as RateLimiterMode,
"test-prefix:someToken"
);
expect(limiter2.points).toBe(100);
});
it("should return the correct rate limiter for 'crawlStatus' mode", () => {
const limiter = getRateLimiter(
"crawlStatus" as RateLimiterMode,
"test-prefix:someToken",
"free"
);
expect(limiter.points).toBe(150);
const limiter2 = getRateLimiter(
"crawlStatus" as RateLimiterMode,
"test-prefix:someToken"
);
expect(limiter2.points).toBe(150);
});
it("should return the correct rate limiter for 'testSuite' mode", () => {
const limiter = getRateLimiter(
"testSuite" as RateLimiterMode,
"test-prefix:someToken",
"free"
);
expect(limiter.points).toBe(10000);
const limiter2 = getRateLimiter(
"testSuite" as RateLimiterMode,
"test-prefix:someToken"
);
expect(limiter2.points).toBe(10000);
});
it("should throw an error when consuming more points than available", async () => {
const limiter = getRateLimiter(
"crawl" as RateLimiterMode,
"test-prefix:someToken"
);
const consumePoints = limiter.points + 1;
try {
await limiter.consume("test-prefix:someToken", consumePoints);
} catch (error) {
// expect remaining points to be 0
const res = await limiter.get("test-prefix:someToken");
expect(res.remainingPoints).toBe(0);
}
});
it("should reset points after duration", async () => {
const keyPrefix = "test-prefix";
const points = 10;
const duration = 1; // 1 second
const limiter = new RateLimiterRedis({
storeClient: redisRateLimitClient,
keyPrefix,
points,
duration,
});
const consumePoints = 5;
await limiter.consume("test-prefix:someToken", consumePoints);
await new Promise((resolve) => setTimeout(resolve, duration * 1000 + 100)); // Wait for duration + 100ms
const res = await limiter.consume("test-prefix:someToken", consumePoints);
expect(res.remainingPoints).toBe(points - consumePoints);
});
});

View File

@ -1,6 +1,6 @@
import { RateLimiterRedis } from "rate-limiter-flexible";
import * as redis from "redis";
import { RateLimiterMode } from "../../src/types";
import Redis from "ioredis";
const RATE_LIMITS = {
crawl: {
@ -57,14 +57,13 @@ const RATE_LIMITS = {
},
};
export const redisClient = redis.createClient({
url: process.env.REDIS_RATE_LIMIT_URL,
legacyMode: true,
});
export const redisRateLimitClient = new Redis(
process.env.REDIS_RATE_LIMIT_URL
)
const createRateLimiter = (keyPrefix, points) =>
new RateLimiterRedis({
storeClient: redisClient,
storeClient: redisRateLimitClient,
keyPrefix,
points,
duration: 60, // Duration in seconds
@ -76,7 +75,7 @@ export const serverRateLimiter = createRateLimiter(
);
export const testSuiteRateLimiter = new RateLimiterRedis({
storeClient: redisClient,
storeClient: redisRateLimitClient,
keyPrefix: "test-suite",
points: 10000,
duration: 60, // Duration in seconds

View File

@ -1,10 +1,8 @@
import Redis from "ioredis";
// Initialize Redis client
const redis = new Redis(process.env.REDIS_URL);
import { redisRateLimitClient } from "./rate-limiter";
// Listen to 'error' events to the Redis connection
redis.on("error", (error) => {
redisRateLimitClient.on("error", (error) => {
try {
if (error.message === "ECONNRESET") {
console.log("Connection to Redis Session Store timed out.");
@ -15,16 +13,16 @@ redis.on("error", (error) => {
});
// Listen to 'reconnecting' event to Redis
redis.on("reconnecting", (err) => {
redisRateLimitClient.on("reconnecting", (err) => {
try {
if (redis.status === "reconnecting")
if (redisRateLimitClient.status === "reconnecting")
console.log("Reconnecting to Redis Session Store...");
else console.log("Error reconnecting to Redis Session Store.");
} catch (error) {}
});
// Listen to the 'connect' event to Redis
redis.on("connect", (err) => {
redisRateLimitClient.on("connect", (err) => {
try {
if (!err) console.log("Connected to Redis Session Store!");
} catch (error) {}
@ -38,9 +36,9 @@ redis.on("connect", (err) => {
*/
const setValue = async (key: string, value: string, expire?: number) => {
if (expire) {
await redis.set(key, value, "EX", expire);
await redisRateLimitClient.set(key, value, "EX", expire);
} else {
await redis.set(key, value);
await redisRateLimitClient.set(key, value);
}
};
@ -50,7 +48,7 @@ const setValue = async (key: string, value: string, expire?: number) => {
* @returns {Promise<string|null>} The value, if found, otherwise null.
*/
const getValue = async (key: string): Promise<string | null> => {
const value = await redis.get(key);
const value = await redisRateLimitClient.get(key);
return value;
};
@ -59,7 +57,7 @@ const getValue = async (key: string): Promise<string | null> => {
* @param {string} key The key to delete.
*/
const deleteKey = async (key: string) => {
await redis.del(key);
await redisRateLimitClient.del(key);
};
export { setValue, getValue, deleteKey };