feat(apps/api): add support for a separate, non-eviction Redis (#1600)

* feat(apps/api): add support for a separate, non-eviction Redis

* fix: misimport
This commit is contained in:
Gergő Móricz 2025-05-28 09:58:04 +02:00 committed by GitHub
parent 756b452a01
commit a5efff07f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 140 additions and 139 deletions

View File

@ -1,4 +1,4 @@
import { redisConnection } from "../../../services/queue-service";
import { redisEvictConnection } from "../../../services/redis";
import { supabase_service } from "../../../services/supabase";
import { logger as _logger } from "../../../lib/logger";
import { Request, Response } from "express";
@ -10,7 +10,7 @@ async function cclog() {
let cursor = 0;
do {
const result = await redisConnection.scan(cursor, "MATCH", "concurrency-limiter:*", "COUNT", 100000);
const result = await redisEvictConnection.scan(cursor, "MATCH", "concurrency-limiter:*", "COUNT", 100000);
cursor = parseInt(result[0], 10);
const usable = result[1].filter(x => !x.includes("preview_"));
@ -25,7 +25,7 @@ async function cclog() {
for (const x of usable) {
const at = new Date();
const concurrency = await redisConnection.zrangebyscore(x, Date.now(), Infinity);
const concurrency = await redisEvictConnection.zrangebyscore(x, Date.now(), Infinity);
if (concurrency) {
entries.push({
team_id: x.split(":")[1],

View File

@ -6,7 +6,7 @@ import { logger } from "../../../src/lib/logger";
import { getCrawl, saveCrawl } from "../../../src/lib/crawl-redis";
import * as Sentry from "@sentry/node";
import { configDotenv } from "dotenv";
import { redisConnection } from "../../services/queue-service";
import { redisEvictConnection } from "../../../src/services/redis";
configDotenv();
export async function crawlCancelController(req: Request, res: Response) {
@ -20,7 +20,7 @@ export async function crawlCancelController(req: Request, res: Response) {
const { team_id } = auth;
redisConnection.sadd("teams_using_v0", team_id)
redisEvictConnection.sadd("teams_using_v0", team_id)
.catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id }));
const sc = await getCrawl(req.params.jobId);

View File

@ -1,7 +1,8 @@
import { Request, Response } from "express";
import { authenticateUser } from "../auth";
import { RateLimiterMode } from "../../../src/types";
import { getScrapeQueue, redisConnection } from "../../../src/services/queue-service";
import { getScrapeQueue } from "../../../src/services/queue-service";
import { redisEvictConnection } from "../../../src/services/redis";
import { logger } from "../../../src/lib/logger";
import { getCrawl, getCrawlJobs } from "../../../src/lib/crawl-redis";
import { supabaseGetJobsByCrawlId } from "../../../src/lib/supabase-jobs";
@ -80,7 +81,7 @@ export async function crawlStatusController(req: Request, res: Response) {
const { team_id } = auth;
redisConnection.sadd("teams_using_v0", team_id)
redisEvictConnection.sadd("teams_using_v0", team_id)
.catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id }));
const sc = await getCrawl(req.params.jobId);

View File

@ -24,7 +24,7 @@ import {
saveCrawl,
StoredCrawl,
} from "../../../src/lib/crawl-redis";
import { getScrapeQueue, redisConnection } from "../../../src/services/queue-service";
import { redisEvictConnection } from "../../../src/services/redis";
import { checkAndUpdateURL } from "../../../src/lib/validateUrl";
import * as Sentry from "@sentry/node";
import { getJobPriority } from "../../lib/job-priority";
@ -41,7 +41,7 @@ export async function crawlController(req: Request, res: Response) {
const { team_id, chunk } = auth;
redisConnection.sadd("teams_using_v0", team_id)
redisEvictConnection.sadd("teams_using_v0", team_id)
.catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id }));
if (req.headers["x-idempotency-key"]) {

View File

@ -2,7 +2,7 @@ import { AuthResponse, RateLimiterMode } from "../../types";
import { Request, Response } from "express";
import { authenticateUser } from "../auth";
import { redisConnection } from "../../services/queue-service";
import { redisEvictConnection } from "../../../src/services/redis";
import { logger } from "../../lib/logger";
export const keyAuthController = async (req: Request, res: Response) => {
@ -13,7 +13,7 @@ export const keyAuthController = async (req: Request, res: Response) => {
return res.status(auth.status).json({ error: auth.error });
}
redisConnection.sadd("teams_using_v0", auth.team_id)
redisEvictConnection.sadd("teams_using_v0", auth.team_id)
.catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id: auth.team_id }));
// if success, return success: true

View File

@ -21,7 +21,8 @@ import {
defaultOrigin,
} from "../../lib/default-values";
import { addScrapeJob, waitForJob } from "../../services/queue-jobs";
import { getScrapeQueue, redisConnection } from "../../services/queue-service";
import { getScrapeQueue } from "../../services/queue-service";
import { redisEvictConnection } from "../../../src/services/redis";
import { v4 as uuidv4 } from "uuid";
import { logger } from "../../lib/logger";
import * as Sentry from "@sentry/node";
@ -184,7 +185,7 @@ export async function scrapeController(req: Request, res: Response) {
const { team_id, chunk } = auth;
redisConnection.sadd("teams_using_v0", team_id)
redisEvictConnection.sadd("teams_using_v0", team_id)
.catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id }));
const crawlerOptions = req.body.crawlerOptions ?? {};

View File

@ -11,7 +11,8 @@ import { search } from "../../search";
import { isUrlBlocked } from "../../scraper/WebScraper/utils/blocklist";
import { v4 as uuidv4 } from "uuid";
import { logger } from "../../lib/logger";
import { getScrapeQueue, redisConnection } from "../../services/queue-service";
import { getScrapeQueue } from "../../services/queue-service";
import { redisEvictConnection } from "../../../src/services/redis";
import { addScrapeJob, waitForJob } from "../../services/queue-jobs";
import * as Sentry from "@sentry/node";
import { getJobPriority } from "../../lib/job-priority";
@ -167,7 +168,7 @@ export async function searchController(req: Request, res: Response) {
}
const { team_id, chunk } = auth;
redisConnection.sadd("teams_using_v0", team_id)
redisEvictConnection.sadd("teams_using_v0", team_id)
.catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id }));
const crawlerOptions = req.body.crawlerOptions ?? {};

View File

@ -4,7 +4,7 @@ import {
RequestWithAuth,
} from "./types";
import { Response } from "express";
import { redisConnection } from "../../services/queue-service";
import { redisEvictConnection } from "../../../src/services/redis";
// Basically just middleware and error wrapping
export async function concurrencyCheckController(
@ -13,7 +13,7 @@ export async function concurrencyCheckController(
) {
const concurrencyLimiterKey = "concurrency-limiter:" + req.auth.team_id;
const now = Date.now();
const activeJobsOfTeam = await redisConnection.zrangebyscore(
const activeJobsOfTeam = await redisEvictConnection.zrangebyscore(
concurrencyLimiterKey,
now,
Infinity,

View File

@ -8,7 +8,8 @@ import {
getCrawl,
getCrawlJobs,
} from "../../lib/crawl-redis";
import { getScrapeQueue, redisConnection } from "../../services/queue-service";
import { getScrapeQueue } from "../../services/queue-service";
import { redisEvictConnection } from "../../../src/services/redis";
import { configDotenv } from "dotenv";
import { Job } from "bullmq";
configDotenv();
@ -65,7 +66,7 @@ export async function crawlErrorsController(
url: x.data.url,
error: x.failedReason,
})),
robotsBlocked: await redisConnection.smembers(
robotsBlocked: await redisEvictConnection.smembers(
"crawl:" + req.params.jobId + ":robots_blocked",
),
});

View File

@ -1,4 +1,4 @@
import { redisConnection } from "../services/queue-service";
import { redisEvictConnection } from "../services/redis";
import type { JobsOptions } from "bullmq";
const constructKey = (team_id: string) => "concurrency-limiter:" + team_id;
@ -12,14 +12,14 @@ export async function cleanOldConcurrencyLimitEntries(
team_id: string,
now: number = Date.now(),
) {
await redisConnection.zremrangebyscore(constructKey(team_id), -Infinity, now);
await redisEvictConnection.zremrangebyscore(constructKey(team_id), -Infinity, now);
}
export async function getConcurrencyLimitActiveJobs(
team_id: string,
now: number = Date.now(),
): Promise<string[]> {
return await redisConnection.zrangebyscore(
return await redisEvictConnection.zrangebyscore(
constructKey(team_id),
now,
Infinity,
@ -32,7 +32,7 @@ export async function pushConcurrencyLimitActiveJob(
timeout: number,
now: number = Date.now(),
) {
await redisConnection.zadd(
await redisEvictConnection.zadd(
constructKey(team_id),
now + timeout,
id,
@ -43,7 +43,7 @@ export async function removeConcurrencyLimitActiveJob(
team_id: string,
id: string,
) {
await redisConnection.zrem(constructKey(team_id), id);
await redisEvictConnection.zrem(constructKey(team_id), id);
}
export type ConcurrencyLimitedJob = {
@ -56,8 +56,8 @@ export type ConcurrencyLimitedJob = {
export async function takeConcurrencyLimitedJob(
team_id: string,
): Promise<ConcurrencyLimitedJob | null> {
await redisConnection.zremrangebyscore(constructQueueKey(team_id), -Infinity, Date.now());
const res = await redisConnection.zmpop(1, constructQueueKey(team_id), "MIN");
await redisEvictConnection.zremrangebyscore(constructQueueKey(team_id), -Infinity, Date.now());
const res = await redisEvictConnection.zmpop(1, constructQueueKey(team_id), "MIN");
if (res === null || res === undefined) {
return null;
}
@ -70,7 +70,7 @@ export async function pushConcurrencyLimitedJob(
job: ConcurrencyLimitedJob,
timeout: number,
) {
await redisConnection.zadd(
await redisEvictConnection.zadd(
constructQueueKey(team_id),
Date.now() + timeout,
JSON.stringify(job),
@ -80,11 +80,11 @@ export async function pushConcurrencyLimitedJob(
export async function getConcurrencyLimitedJobs(
team_id: string,
) {
return new Set((await redisConnection.zrange(constructQueueKey(team_id), 0, -1)).map(x => JSON.parse(x).id));
return new Set((await redisEvictConnection.zrange(constructQueueKey(team_id), 0, -1)).map(x => JSON.parse(x).id));
}
export async function getConcurrencyQueueJobsCount(team_id: string): Promise<number> {
const count = await redisConnection.zcard(constructQueueKey(team_id));
const count = await redisEvictConnection.zcard(constructQueueKey(team_id));
return count;
}
@ -92,14 +92,14 @@ export async function cleanOldCrawlConcurrencyLimitEntries(
crawl_id: string,
now: number = Date.now(),
) {
await redisConnection.zremrangebyscore(constructCrawlKey(crawl_id), -Infinity, now);
await redisEvictConnection.zremrangebyscore(constructCrawlKey(crawl_id), -Infinity, now);
}
export async function getCrawlConcurrencyLimitActiveJobs(
crawl_id: string,
now: number = Date.now(),
): Promise<string[]> {
return await redisConnection.zrangebyscore(
return await redisEvictConnection.zrangebyscore(
constructCrawlKey(crawl_id),
now,
Infinity,
@ -112,7 +112,7 @@ export async function pushCrawlConcurrencyLimitActiveJob(
timeout: number,
now: number = Date.now(),
) {
await redisConnection.zadd(
await redisEvictConnection.zadd(
constructCrawlKey(crawl_id),
now + timeout,
id,
@ -123,13 +123,13 @@ export async function removeCrawlConcurrencyLimitActiveJob(
crawl_id: string,
id: string,
) {
await redisConnection.zrem(constructCrawlKey(crawl_id), id);
await redisEvictConnection.zrem(constructCrawlKey(crawl_id), id);
}
export async function takeCrawlConcurrencyLimitedJob(
crawl_id: string,
): Promise<ConcurrencyLimitedJob | null> {
const res = await redisConnection.zmpop(1, constructCrawlQueueKey(crawl_id), "MIN");
const res = await redisEvictConnection.zmpop(1, constructCrawlQueueKey(crawl_id), "MIN");
if (res === null || res === undefined) {
return null;
}
@ -140,7 +140,7 @@ export async function pushCrawlConcurrencyLimitedJob(
crawl_id: string,
job: ConcurrencyLimitedJob,
) {
await redisConnection.zadd(
await redisEvictConnection.zadd(
constructCrawlQueueKey(crawl_id),
job.priority ?? 1,
JSON.stringify(job),
@ -150,10 +150,10 @@ export async function pushCrawlConcurrencyLimitedJob(
export async function getCrawlConcurrencyLimitedJobs(
crawl_id: string,
) {
return new Set((await redisConnection.zrange(constructCrawlQueueKey(crawl_id), 0, -1)).map(x => JSON.parse(x).id));
return new Set((await redisEvictConnection.zrange(constructCrawlQueueKey(crawl_id), 0, -1)).map(x => JSON.parse(x).id));
}
export async function getCrawlConcurrencyQueueJobsCount(crawl_id: string): Promise<number> {
const count = await redisConnection.zcard(constructCrawlQueueKey(crawl_id));
const count = await redisEvictConnection.zcard(constructCrawlQueueKey(crawl_id));
return count;
}

View File

@ -1,7 +1,7 @@
import { InternalOptions } from "../scraper/scrapeURL";
import { ScrapeOptions, TeamFlags } from "../controllers/v1/types";
import { WebCrawler } from "../scraper/WebScraper/crawler";
import { redisConnection } from "../services/queue-service";
import { redisEvictConnection } from "../services/redis";
import { logger as _logger } from "./logger";
import { getAdjustedMaxDepth } from "../scraper/WebScraper/utils/maxDepthUtils";
@ -24,24 +24,24 @@ export async function saveCrawl(id: string, crawl: StoredCrawl) {
crawlId: id,
teamId: crawl.team_id,
});
await redisConnection.set("crawl:" + id, JSON.stringify(crawl));
await redisConnection.expire("crawl:" + id, 24 * 60 * 60);
await redisEvictConnection.set("crawl:" + id, JSON.stringify(crawl));
await redisEvictConnection.expire("crawl:" + id, 24 * 60 * 60);
}
export async function getCrawl(id: string): Promise<StoredCrawl | null> {
const x = await redisConnection.get("crawl:" + id);
const x = await redisEvictConnection.get("crawl:" + id);
if (x === null) {
return null;
}
await redisConnection.expire("crawl:" + id, 24 * 60 * 60);
await redisEvictConnection.expire("crawl:" + id, 24 * 60 * 60);
return JSON.parse(x);
}
export async function getCrawlExpiry(id: string): Promise<Date> {
const d = new Date();
const ttl = await redisConnection.pttl("crawl:" + id);
const ttl = await redisEvictConnection.pttl("crawl:" + id);
d.setMilliseconds(d.getMilliseconds() + ttl);
d.setMilliseconds(0);
return d;
@ -54,8 +54,8 @@ export async function addCrawlJob(id: string, job_id: string) {
method: "addCrawlJob",
crawlId: id,
});
await redisConnection.sadd("crawl:" + id + ":jobs", job_id);
await redisConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60);
await redisEvictConnection.sadd("crawl:" + id + ":jobs", job_id);
await redisEvictConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60);
}
export async function addCrawlJobs(id: string, job_ids: string[]) {
@ -67,8 +67,8 @@ export async function addCrawlJobs(id: string, job_ids: string[]) {
method: "addCrawlJobs",
crawlId: id,
});
await redisConnection.sadd("crawl:" + id + ":jobs", ...job_ids);
await redisConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60);
await redisEvictConnection.sadd("crawl:" + id + ":jobs", ...job_ids);
await redisEvictConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60);
}
export async function addCrawlJobDone(
@ -82,32 +82,32 @@ export async function addCrawlJobDone(
method: "addCrawlJobDone",
crawlId: id,
});
await redisConnection.sadd("crawl:" + id + ":jobs_done", job_id);
await redisConnection.expire(
await redisEvictConnection.sadd("crawl:" + id + ":jobs_done", job_id);
await redisEvictConnection.expire(
"crawl:" + id + ":jobs_done",
24 * 60 * 60,
);
if (success) {
await redisConnection.rpush("crawl:" + id + ":jobs_done_ordered", job_id);
await redisEvictConnection.rpush("crawl:" + id + ":jobs_done_ordered", job_id);
} else {
// in case it's already been pushed, make sure it's removed
await redisConnection.lrem(
await redisEvictConnection.lrem(
"crawl:" + id + ":jobs_done_ordered",
-1,
job_id,
);
}
await redisConnection.expire(
await redisEvictConnection.expire(
"crawl:" + id + ":jobs_done_ordered",
24 * 60 * 60,
);
}
export async function getDoneJobsOrderedLength(id: string): Promise<number> {
await redisConnection.expire("crawl:" + id + ":jobs_done_ordered", 24 * 60 * 60);
return await redisConnection.llen("crawl:" + id + ":jobs_done_ordered");
await redisEvictConnection.expire("crawl:" + id + ":jobs_done_ordered", 24 * 60 * 60);
return await redisEvictConnection.llen("crawl:" + id + ":jobs_done_ordered");
}
export async function getDoneJobsOrdered(
@ -115,8 +115,8 @@ export async function getDoneJobsOrdered(
start = 0,
end = -1,
): Promise<string[]> {
await redisConnection.expire("crawl:" + id + ":jobs_done_ordered", 24 * 60 * 60);
return await redisConnection.lrange(
await redisEvictConnection.expire("crawl:" + id + ":jobs_done_ordered", 24 * 60 * 60);
return await redisEvictConnection.lrange(
"crawl:" + id + ":jobs_done_ordered",
start,
end,
@ -124,27 +124,27 @@ export async function getDoneJobsOrdered(
}
export async function isCrawlFinished(id: string) {
await redisConnection.expire("crawl:" + id + ":kickoff:finish", 24 * 60 * 60);
await redisEvictConnection.expire("crawl:" + id + ":kickoff:finish", 24 * 60 * 60);
return (
(await redisConnection.scard("crawl:" + id + ":jobs_done")) ===
(await redisConnection.scard("crawl:" + id + ":jobs")) &&
(await redisConnection.get("crawl:" + id + ":kickoff:finish")) !== null
(await redisEvictConnection.scard("crawl:" + id + ":jobs_done")) ===
(await redisEvictConnection.scard("crawl:" + id + ":jobs")) &&
(await redisEvictConnection.get("crawl:" + id + ":kickoff:finish")) !== null
);
}
export async function isCrawlKickoffFinished(id: string) {
await redisConnection.expire("crawl:" + id + ":kickoff:finish", 24 * 60 * 60);
await redisEvictConnection.expire("crawl:" + id + ":kickoff:finish", 24 * 60 * 60);
return (
(await redisConnection.get("crawl:" + id + ":kickoff:finish")) !== null
(await redisEvictConnection.get("crawl:" + id + ":kickoff:finish")) !== null
);
}
export async function isCrawlFinishedLocked(id: string) {
return await redisConnection.exists("crawl:" + id + ":finish");
return await redisEvictConnection.exists("crawl:" + id + ":finish");
}
export async function finishCrawlKickoff(id: string) {
await redisConnection.set(
await redisEvictConnection.set(
"crawl:" + id + ":kickoff:finish",
"yes",
"EX",
@ -159,18 +159,18 @@ export async function finishCrawlPre(id: string) {
method: "finishCrawlPre",
crawlId: id,
});
const set = await redisConnection.setnx("crawl:" + id + ":finished_pre", "yes");
await redisConnection.expire("crawl:" + id + ":finished_pre", 24 * 60 * 60);
const set = await redisEvictConnection.setnx("crawl:" + id + ":finished_pre", "yes");
await redisEvictConnection.expire("crawl:" + id + ":finished_pre", 24 * 60 * 60);
return set === 1;
} else {
// _logger.debug("Crawl can not be pre-finished yet, not marking as finished.", {
// module: "crawl-redis",
// method: "finishCrawlPre",
// crawlId: id,
// jobs_done: await redisConnection.scard("crawl:" + id + ":jobs_done"),
// jobs: await redisConnection.scard("crawl:" + id + ":jobs"),
// jobs_done: await redisEvictConnection.scard("crawl:" + id + ":jobs_done"),
// jobs: await redisEvictConnection.scard("crawl:" + id + ":jobs"),
// kickoff_finished:
// (await redisConnection.get("crawl:" + id + ":kickoff:finish")) !== null,
// (await redisEvictConnection.get("crawl:" + id + ":kickoff:finish")) !== null,
// });
}
}
@ -181,16 +181,16 @@ export async function finishCrawl(id: string) {
method: "finishCrawl",
crawlId: id,
});
await redisConnection.set("crawl:" + id + ":finish", "yes");
await redisConnection.expire("crawl:" + id + ":finish", 24 * 60 * 60);
await redisEvictConnection.set("crawl:" + id + ":finish", "yes");
await redisEvictConnection.expire("crawl:" + id + ":finish", 24 * 60 * 60);
}
export async function getCrawlJobs(id: string): Promise<string[]> {
return await redisConnection.smembers("crawl:" + id + ":jobs");
return await redisEvictConnection.smembers("crawl:" + id + ":jobs");
}
export async function getCrawlJobCount(id: string): Promise<number> {
return await redisConnection.scard("crawl:" + id + ":jobs");
return await redisEvictConnection.scard("crawl:" + id + ":jobs");
}
export function normalizeURL(url: string, sc: StoredCrawl): string {
@ -276,7 +276,7 @@ export async function lockURL(
if (typeof sc.crawlerOptions?.limit === "number") {
if (
(await redisConnection.scard("crawl:" + id + ":visited_unique")) >=
(await redisEvictConnection.scard("crawl:" + id + ":visited_unique")) >=
sc.crawlerOptions.limit
) {
// logger.debug(
@ -291,22 +291,22 @@ export async function lockURL(
let res: boolean;
if (!sc.crawlerOptions?.deduplicateSimilarURLs) {
res = (await redisConnection.sadd("crawl:" + id + ":visited", url)) !== 0;
res = (await redisEvictConnection.sadd("crawl:" + id + ":visited", url)) !== 0;
} else {
const permutations = generateURLPermutations(url).map((x) => x.href);
// logger.debug("Adding URL permutations for URL " + JSON.stringify(url) + "...", { permutations });
const x = await redisConnection.sadd(
const x = await redisEvictConnection.sadd(
"crawl:" + id + ":visited",
...permutations,
);
res = x === permutations.length;
}
await redisConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60);
await redisEvictConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60);
if (res) {
await redisConnection.sadd("crawl:" + id + ":visited_unique", url);
await redisConnection.expire(
await redisEvictConnection.sadd("crawl:" + id + ":visited_unique", url);
await redisEvictConnection.expire(
"crawl:" + id + ":visited_unique",
24 * 60 * 60,
);
@ -336,29 +336,29 @@ export async function lockURLs(
// Add to visited_unique set
logger.debug("Locking " + urls.length + " URLs...");
await redisConnection.sadd("crawl:" + id + ":visited_unique", ...urls);
await redisConnection.expire(
await redisEvictConnection.sadd("crawl:" + id + ":visited_unique", ...urls);
await redisEvictConnection.expire(
"crawl:" + id + ":visited_unique",
24 * 60 * 60,
);
let res: boolean;
if (!sc.crawlerOptions?.deduplicateSimilarURLs) {
const x = await redisConnection.sadd("crawl:" + id + ":visited", ...urls);
const x = await redisEvictConnection.sadd("crawl:" + id + ":visited", ...urls);
res = x === urls.length;
} else {
const allPermutations = urls.flatMap((url) =>
generateURLPermutations(url).map((x) => x.href),
);
logger.debug("Adding " + allPermutations.length + " URL permutations...");
const x = await redisConnection.sadd(
const x = await redisEvictConnection.sadd(
"crawl:" + id + ":visited",
...allPermutations,
);
res = x === allPermutations.length;
}
await redisConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60);
await redisEvictConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60);
logger.debug("lockURLs final result: " + res, { res });
return res;

View File

@ -1,4 +1,4 @@
import { redisConnection } from "../../services/queue-service";
import { redisEvictConnection } from "../../services/redis";
import { logger as _logger } from "../logger";
export enum DeepResearchStep {
@ -52,12 +52,12 @@ const DEEP_RESEARCH_TTL = 6 * 60 * 60;
export async function saveDeepResearch(id: string, research: StoredDeepResearch) {
_logger.debug("Saving deep research " + id + " to Redis...");
await redisConnection.set("deep-research:" + id, JSON.stringify(research));
await redisConnection.expire("deep-research:" + id, DEEP_RESEARCH_TTL);
await redisEvictConnection.set("deep-research:" + id, JSON.stringify(research));
await redisEvictConnection.expire("deep-research:" + id, DEEP_RESEARCH_TTL);
}
export async function getDeepResearch(id: string): Promise<StoredDeepResearch | null> {
const x = await redisConnection.get("deep-research:" + id);
const x = await redisEvictConnection.get("deep-research:" + id);
return x ? JSON.parse(x) : null;
}
@ -91,13 +91,13 @@ export async function updateDeepResearch(
await redisConnection.set("deep-research:" + id, JSON.stringify(updatedResearch));
await redisConnection.expire("deep-research:" + id, DEEP_RESEARCH_TTL);
await redisEvictConnection.set("deep-research:" + id, JSON.stringify(updatedResearch));
await redisEvictConnection.expire("deep-research:" + id, DEEP_RESEARCH_TTL);
}
export async function getDeepResearchExpiry(id: string): Promise<Date> {
const d = new Date();
const ttl = await redisConnection.pttl("deep-research:" + id);
const ttl = await redisEvictConnection.pttl("deep-research:" + id);
d.setMilliseconds(d.getMilliseconds() + ttl);
d.setMilliseconds(0);
return d;

View File

@ -1,4 +1,4 @@
import { redisConnection } from "../../services/queue-service";
import { redisEvictConnection } from "../../services/redis";
import { logger as _logger } from "../logger";
import { CostTracking } from "./extraction-service";
@ -61,12 +61,12 @@ export async function saveExtract(id: string, extract: StoredExtract) {
discoveredLinks: step.discoveredLinks?.slice(0, STEPS_MAX_DISCOVERED_LINKS)
}))
};
await redisConnection.set("extract:" + id, JSON.stringify(minimalExtract));
await redisConnection.expire("extract:" + id, EXTRACT_TTL);
await redisEvictConnection.set("extract:" + id, JSON.stringify(minimalExtract));
await redisEvictConnection.expire("extract:" + id, EXTRACT_TTL);
}
export async function getExtract(id: string): Promise<StoredExtract | null> {
const x = await redisConnection.get("extract:" + id);
const x = await redisEvictConnection.get("extract:" + id);
return x ? JSON.parse(x) : null;
}
@ -111,13 +111,13 @@ export async function updateExtract(
console.log(minimalExtract.sessionIds)
await redisConnection.set("extract:" + id, JSON.stringify(minimalExtract));
await redisConnection.expire("extract:" + id, EXTRACT_TTL);
await redisEvictConnection.set("extract:" + id, JSON.stringify(minimalExtract));
await redisEvictConnection.expire("extract:" + id, EXTRACT_TTL);
}
export async function getExtractExpiry(id: string): Promise<Date> {
const d = new Date();
const ttl = await redisConnection.pttl("extract:" + id);
const ttl = await redisEvictConnection.pttl("extract:" + id);
d.setMilliseconds(d.getMilliseconds() + ttl);
d.setMilliseconds(0);
return d;

View File

@ -1,4 +1,4 @@
import { redisConnection } from "../../services/queue-service";
import { redisEvictConnection } from "../../services/redis";
import { logger as _logger } from "../logger";
export interface GenerationData {
@ -20,12 +20,12 @@ const GENERATION_TTL = 24 * 60 * 60;
export async function saveGeneratedLlmsTxt(id: string, data: GenerationData): Promise<void> {
_logger.debug("Saving llmstxt generation " + id + " to Redis...");
await redisConnection.set("generation:" + id, JSON.stringify(data));
await redisConnection.expire("generation:" + id, GENERATION_TTL);
await redisEvictConnection.set("generation:" + id, JSON.stringify(data));
await redisEvictConnection.expire("generation:" + id, GENERATION_TTL);
}
export async function getGeneratedLlmsTxt(id: string): Promise<GenerationData | null> {
const x = await redisConnection.get("generation:" + id);
const x = await redisEvictConnection.get("generation:" + id);
return x ? JSON.parse(x) : null;
}
@ -41,13 +41,13 @@ export async function updateGeneratedLlmsTxt(
...data
};
await redisConnection.set("generation:" + id, JSON.stringify(updatedGeneration));
await redisConnection.expire("generation:" + id, GENERATION_TTL);
await redisEvictConnection.set("generation:" + id, JSON.stringify(updatedGeneration));
await redisEvictConnection.expire("generation:" + id, GENERATION_TTL);
}
export async function getGeneratedLlmsTxtExpiry(id: string): Promise<Date> {
const d = new Date();
const ttl = await redisConnection.pttl("generation:" + id);
const ttl = await redisEvictConnection.pttl("generation:" + id);
d.setMilliseconds(d.getMilliseconds() + ttl);
d.setMilliseconds(0);
return d;

View File

@ -1,6 +1,6 @@
import { RateLimiterMode } from "../types";
import { getACUC, getACUCTeam } from "../controllers/auth";
import { redisConnection } from "../services/queue-service";
import { redisEvictConnection } from "../services/redis";
import { logger } from "./logger";
const SET_KEY_PREFIX = "limit_team_id:";
@ -9,10 +9,10 @@ export async function addJobPriority(team_id, job_id) {
const setKey = SET_KEY_PREFIX + team_id;
// Add scrape job id to the set
await redisConnection.sadd(setKey, job_id);
await redisEvictConnection.sadd(setKey, job_id);
// This approach will reset the expiration time to 60 seconds every time a new job is added to the set.
await redisConnection.expire(setKey, 60);
await redisEvictConnection.expire(setKey, 60);
} catch (e) {
logger.error(`Add job priority (sadd) failed: ${team_id}, ${job_id}`);
}
@ -23,7 +23,7 @@ export async function deleteJobPriority(team_id, job_id) {
const setKey = SET_KEY_PREFIX + team_id;
// remove job_id from the set
await redisConnection.srem(setKey, job_id);
await redisEvictConnection.srem(setKey, job_id);
} catch (e) {
logger.error(`Delete job priority (srem) failed: ${team_id}, ${job_id}`);
}
@ -48,7 +48,7 @@ export async function getJobPriority({
const setKey = SET_KEY_PREFIX + team_id;
// Get the length of the set
const setLength = await redisConnection.scard(setKey);
const setLength = await redisEvictConnection.scard(setKey);
// Determine the priority based on the plan and set length
let planModifier = acuc?.plan_priority.planModifier ?? 1;

View File

@ -7,7 +7,7 @@ import { getURLDepth } from "./utils/maxDepthUtils";
import { axiosTimeout } from "../../lib/timeout";
import { logger as _logger } from "../../lib/logger";
import https from "https";
import { redisConnection } from "../../services/queue-service";
import { redisEvictConnection } from "../../services/redis";
import { extractLinks } from "../../lib/html-transformer";
import { TimeoutSignal } from "../../controllers/v1/types";
export class WebCrawler {
@ -287,7 +287,7 @@ export class WebCrawler {
let uniqueURLs: string[] = [];
for (const url of filteredLinks) {
if (
await redisConnection.sadd(
await redisEvictConnection.sadd(
"sitemap:" + this.jobId + ":links",
normalizeUrl(url),
)
@ -296,7 +296,7 @@ export class WebCrawler {
}
}
await redisConnection.expire(
await redisEvictConnection.expire(
"sitemap:" + this.jobId + ":links",
3600,
"NX",
@ -324,7 +324,7 @@ export class WebCrawler {
if (count > 0) {
if (
await redisConnection.sadd(
await redisEvictConnection.sadd(
"sitemap:" + this.jobId + ":links",
normalizeUrl(this.initialUrl),
)
@ -334,7 +334,7 @@ export class WebCrawler {
count++;
}
await redisConnection.expire(
await redisEvictConnection.expire(
"sitemap:" + this.jobId + ":links",
3600,
"NX",
@ -390,11 +390,11 @@ export class WebCrawler {
!this.isRobotsAllowed(fullUrl, this.ignoreRobotsTxt)
) {
(async () => {
await redisConnection.sadd(
await redisEvictConnection.sadd(
"crawl:" + this.jobId + ":robots_blocked",
fullUrl,
);
await redisConnection.expire(
await redisEvictConnection.expire(
"crawl:" + this.jobId + ":robots_blocked",
24 * 60 * 60,
);

View File

@ -7,7 +7,7 @@ import { sendSlackWebhook } from "../alerts/slack";
import { getNotificationString } from "./notification_string";
import { AuthCreditUsageChunk } from "../../controllers/v1/types";
import { redlock } from "../redlock";
import { redisConnection } from "../queue-service";
import { redisEvictConnection } from "../redis";
const emailTemplates: Record<
NotificationType,
@ -268,14 +268,14 @@ export async function sendNotificationWithCustomDays(
) => {
const redisKey = "notification_sent:" + notificationType + ":" + team_id;
const didSendRecentNotification = (await redisConnection.get(redisKey)) !== null;
const didSendRecentNotification = (await redisEvictConnection.get(redisKey)) !== null;
if (didSendRecentNotification && !bypassRecentChecks) {
logger.debug(`Notification already sent within the last ${daysBetweenEmails} days for team_id: ${team_id} and notificationType: ${notificationType}`);
return { success: true };
}
await redisConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60);
await redisEvictConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60);
const now = new Date();
const pastDate = new Date(now.getTime() - daysBetweenEmails * 24 * 60 * 60 * 1000);
@ -289,13 +289,13 @@ export async function sendNotificationWithCustomDays(
if (recentNotificationsError) {
logger.debug(`Error fetching recent notifications: ${recentNotificationsError}`);
await redisConnection.del(redisKey); // free up redis, let it try again
await redisEvictConnection.del(redisKey); // free up redis, let it try again
return { success: false };
}
if (recentNotifications.length > 0 && !bypassRecentChecks) {
logger.debug(`Notification already sent within the last ${daysBetweenEmails} days for team_id: ${team_id} and notificationType: ${notificationType}`);
await redisConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60);
await redisEvictConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60);
return { success: true };
}
@ -310,7 +310,7 @@ export async function sendNotificationWithCustomDays(
if (emailsError) {
logger.debug(`Error fetching emails: ${emailsError}`);
await redisConnection.del(redisKey); // free up redis, let it try again
await redisEvictConnection.del(redisKey); // free up redis, let it try again
return { success: false };
}
@ -341,7 +341,7 @@ export async function sendNotificationWithCustomDays(
if (insertError) {
logger.debug(`Error inserting notification record: ${insertError}`);
await redisConnection.del(redisKey); // free up redis, let it try again
await redisEvictConnection.del(redisKey); // free up redis, let it try again
return { success: false };
}

View File

@ -131,7 +131,3 @@ export function getBillingQueue() {
}
return billingQueue;
}
// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE
// import { QueueEvents } from 'bullmq';
// export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() });

View File

@ -6,11 +6,8 @@ import {
getScrapeQueue,
getExtractQueue,
getDeepResearchQueue,
redisConnection,
scrapeQueueName,
extractQueueName,
deepResearchQueueName,
getIndexQueue,
redisConnection,
getGenerateLlmsTxtQueue,
getBillingQueue,
} from "./queue-service";
@ -88,6 +85,7 @@ import https from "https";
import { cacheableLookup } from "../scraper/scrapeURL/lib/cacheableLookup";
import { robustFetch } from "../scraper/scrapeURL/lib/fetch";
import { RateLimiterMode } from "../types";
import { redisEvictConnection } from "./redis";
configDotenv();
@ -132,11 +130,11 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) {
logger.info("Crawl is pre-finished, checking if we need to add more jobs");
if (
job.data.crawlerOptions &&
!(await redisConnection.exists(
!(await redisEvictConnection.exists(
"crawl:" + job.data.crawl_id + ":invisible_urls",
))
) {
await redisConnection.set(
await redisEvictConnection.set(
"crawl:" + job.data.crawl_id + ":invisible_urls",
"done",
"EX",
@ -146,7 +144,7 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) {
const sc = (await getCrawl(job.data.crawl_id))!;
const visitedUrls = new Set(
await redisConnection.smembers(
await redisEvictConnection.smembers(
"crawl:" + job.data.crawl_id + ":visited_unique",
),
);
@ -261,7 +259,7 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) {
? normalizeUrlOnlyHostname(sc.originUrl)
: undefined;
// Get all visited unique URLs from Redis
const visitedUrls = await redisConnection.smembers(
const visitedUrls = await redisEvictConnection.smembers(
"crawl:" + job.data.crawl_id + ":visited_unique",
);
// Upload to Supabase if we have URLs and this is a crawl (not a batch scrape)
@ -1230,7 +1228,7 @@ async function processJob(job: Job & { id: string }, token: string) {
// Prevent redirect target from being visited in the crawl again
// See lockURL
const x = await redisConnection.sadd(
const x = await redisEvictConnection.sadd(
"crawl:" + job.data.crawl_id + ":visited",
...p1.map((x) => x.href),
);
@ -1452,7 +1450,7 @@ async function processJob(job: Job & { id: string }, token: string) {
logger.debug("Declaring job as done...");
await addCrawlJobDone(job.data.crawl_id, job.id, false);
await redisConnection.srem(
await redisEvictConnection.srem(
"crawl:" + job.data.crawl_id + ":visited_unique",
normalizeURL(job.data.url, sc),
);

View File

@ -1,4 +1,4 @@
import Redis from "ioredis";
import IORedis from "ioredis";
import { redisRateLimitClient } from "./rate-limiter";
import { logger } from "../lib/logger";
@ -70,3 +70,6 @@ const deleteKey = async (key: string) => {
};
export { setValue, getValue, deleteKey };
const redisEvictURL = process.env.REDIS_EVICT_URL ?? process.env.REDIS_RATE_LIMIT_URL;
export const redisEvictConnection = new IORedis(redisEvictURL!);