From a5efff07f909759978ad585695ce05ecb9ca3a1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Wed, 28 May 2025 09:58:04 +0200 Subject: [PATCH] feat(apps/api): add support for a separate, non-eviction Redis (#1600) * feat(apps/api): add support for a separate, non-eviction Redis * fix: misimport --- apps/api/src/controllers/v0/admin/cclog.ts | 6 +- apps/api/src/controllers/v0/crawl-cancel.ts | 4 +- apps/api/src/controllers/v0/crawl-status.ts | 5 +- apps/api/src/controllers/v0/crawl.ts | 4 +- apps/api/src/controllers/v0/keyAuth.ts | 4 +- apps/api/src/controllers/v0/scrape.ts | 5 +- apps/api/src/controllers/v0/search.ts | 5 +- .../src/controllers/v1/concurrency-check.ts | 4 +- apps/api/src/controllers/v1/crawl-errors.ts | 5 +- apps/api/src/lib/concurrency-limit.ts | 36 +++---- apps/api/src/lib/crawl-redis.ts | 94 +++++++++---------- .../lib/deep-research/deep-research-redis.ts | 14 +-- apps/api/src/lib/extract/extract-redis.ts | 14 +-- .../generate-llmstxt-redis.ts | 14 +-- apps/api/src/lib/job-priority.ts | 10 +- apps/api/src/scraper/WebScraper/crawler.ts | 14 +-- .../notification/email_notification.ts | 14 +-- apps/api/src/services/queue-service.ts | 4 - apps/api/src/services/queue-worker.ts | 18 ++-- apps/api/src/services/redis.ts | 5 +- 20 files changed, 140 insertions(+), 139 deletions(-) diff --git a/apps/api/src/controllers/v0/admin/cclog.ts b/apps/api/src/controllers/v0/admin/cclog.ts index f9654285..7d81201e 100644 --- a/apps/api/src/controllers/v0/admin/cclog.ts +++ b/apps/api/src/controllers/v0/admin/cclog.ts @@ -1,4 +1,4 @@ -import { redisConnection } from "../../../services/queue-service"; +import { redisEvictConnection } from "../../../services/redis"; import { supabase_service } from "../../../services/supabase"; import { logger as _logger } from "../../../lib/logger"; import { Request, Response } from "express"; @@ -10,7 +10,7 @@ async function cclog() { let cursor = 0; do { - const result = await redisConnection.scan(cursor, "MATCH", "concurrency-limiter:*", "COUNT", 100000); + const result = await redisEvictConnection.scan(cursor, "MATCH", "concurrency-limiter:*", "COUNT", 100000); cursor = parseInt(result[0], 10); const usable = result[1].filter(x => !x.includes("preview_")); @@ -25,7 +25,7 @@ async function cclog() { for (const x of usable) { const at = new Date(); - const concurrency = await redisConnection.zrangebyscore(x, Date.now(), Infinity); + const concurrency = await redisEvictConnection.zrangebyscore(x, Date.now(), Infinity); if (concurrency) { entries.push({ team_id: x.split(":")[1], diff --git a/apps/api/src/controllers/v0/crawl-cancel.ts b/apps/api/src/controllers/v0/crawl-cancel.ts index 16f6cc87..a9c07b8d 100644 --- a/apps/api/src/controllers/v0/crawl-cancel.ts +++ b/apps/api/src/controllers/v0/crawl-cancel.ts @@ -6,7 +6,7 @@ import { logger } from "../../../src/lib/logger"; import { getCrawl, saveCrawl } from "../../../src/lib/crawl-redis"; import * as Sentry from "@sentry/node"; import { configDotenv } from "dotenv"; -import { redisConnection } from "../../services/queue-service"; +import { redisEvictConnection } from "../../../src/services/redis"; configDotenv(); export async function crawlCancelController(req: Request, res: Response) { @@ -20,7 +20,7 @@ export async function crawlCancelController(req: Request, res: Response) { const { team_id } = auth; - redisConnection.sadd("teams_using_v0", team_id) + redisEvictConnection.sadd("teams_using_v0", team_id) .catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id })); const sc = await getCrawl(req.params.jobId); diff --git a/apps/api/src/controllers/v0/crawl-status.ts b/apps/api/src/controllers/v0/crawl-status.ts index f9ed7917..92482817 100644 --- a/apps/api/src/controllers/v0/crawl-status.ts +++ b/apps/api/src/controllers/v0/crawl-status.ts @@ -1,7 +1,8 @@ import { Request, Response } from "express"; import { authenticateUser } from "../auth"; import { RateLimiterMode } from "../../../src/types"; -import { getScrapeQueue, redisConnection } from "../../../src/services/queue-service"; +import { getScrapeQueue } from "../../../src/services/queue-service"; +import { redisEvictConnection } from "../../../src/services/redis"; import { logger } from "../../../src/lib/logger"; import { getCrawl, getCrawlJobs } from "../../../src/lib/crawl-redis"; import { supabaseGetJobsByCrawlId } from "../../../src/lib/supabase-jobs"; @@ -80,7 +81,7 @@ export async function crawlStatusController(req: Request, res: Response) { const { team_id } = auth; - redisConnection.sadd("teams_using_v0", team_id) + redisEvictConnection.sadd("teams_using_v0", team_id) .catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id })); const sc = await getCrawl(req.params.jobId); diff --git a/apps/api/src/controllers/v0/crawl.ts b/apps/api/src/controllers/v0/crawl.ts index 5755b3bb..e70c79d9 100644 --- a/apps/api/src/controllers/v0/crawl.ts +++ b/apps/api/src/controllers/v0/crawl.ts @@ -24,7 +24,7 @@ import { saveCrawl, StoredCrawl, } from "../../../src/lib/crawl-redis"; -import { getScrapeQueue, redisConnection } from "../../../src/services/queue-service"; +import { redisEvictConnection } from "../../../src/services/redis"; import { checkAndUpdateURL } from "../../../src/lib/validateUrl"; import * as Sentry from "@sentry/node"; import { getJobPriority } from "../../lib/job-priority"; @@ -41,7 +41,7 @@ export async function crawlController(req: Request, res: Response) { const { team_id, chunk } = auth; - redisConnection.sadd("teams_using_v0", team_id) + redisEvictConnection.sadd("teams_using_v0", team_id) .catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id })); if (req.headers["x-idempotency-key"]) { diff --git a/apps/api/src/controllers/v0/keyAuth.ts b/apps/api/src/controllers/v0/keyAuth.ts index ce8dc7ce..baf4427e 100644 --- a/apps/api/src/controllers/v0/keyAuth.ts +++ b/apps/api/src/controllers/v0/keyAuth.ts @@ -2,7 +2,7 @@ import { AuthResponse, RateLimiterMode } from "../../types"; import { Request, Response } from "express"; import { authenticateUser } from "../auth"; -import { redisConnection } from "../../services/queue-service"; +import { redisEvictConnection } from "../../../src/services/redis"; import { logger } from "../../lib/logger"; export const keyAuthController = async (req: Request, res: Response) => { @@ -13,7 +13,7 @@ export const keyAuthController = async (req: Request, res: Response) => { return res.status(auth.status).json({ error: auth.error }); } - redisConnection.sadd("teams_using_v0", auth.team_id) + redisEvictConnection.sadd("teams_using_v0", auth.team_id) .catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id: auth.team_id })); // if success, return success: true diff --git a/apps/api/src/controllers/v0/scrape.ts b/apps/api/src/controllers/v0/scrape.ts index 8eb38f81..f7a9fbdd 100644 --- a/apps/api/src/controllers/v0/scrape.ts +++ b/apps/api/src/controllers/v0/scrape.ts @@ -21,7 +21,8 @@ import { defaultOrigin, } from "../../lib/default-values"; import { addScrapeJob, waitForJob } from "../../services/queue-jobs"; -import { getScrapeQueue, redisConnection } from "../../services/queue-service"; +import { getScrapeQueue } from "../../services/queue-service"; +import { redisEvictConnection } from "../../../src/services/redis"; import { v4 as uuidv4 } from "uuid"; import { logger } from "../../lib/logger"; import * as Sentry from "@sentry/node"; @@ -184,7 +185,7 @@ export async function scrapeController(req: Request, res: Response) { const { team_id, chunk } = auth; - redisConnection.sadd("teams_using_v0", team_id) + redisEvictConnection.sadd("teams_using_v0", team_id) .catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id })); const crawlerOptions = req.body.crawlerOptions ?? {}; diff --git a/apps/api/src/controllers/v0/search.ts b/apps/api/src/controllers/v0/search.ts index 1d5d16fb..15172e37 100644 --- a/apps/api/src/controllers/v0/search.ts +++ b/apps/api/src/controllers/v0/search.ts @@ -11,7 +11,8 @@ import { search } from "../../search"; import { isUrlBlocked } from "../../scraper/WebScraper/utils/blocklist"; import { v4 as uuidv4 } from "uuid"; import { logger } from "../../lib/logger"; -import { getScrapeQueue, redisConnection } from "../../services/queue-service"; +import { getScrapeQueue } from "../../services/queue-service"; +import { redisEvictConnection } from "../../../src/services/redis"; import { addScrapeJob, waitForJob } from "../../services/queue-jobs"; import * as Sentry from "@sentry/node"; import { getJobPriority } from "../../lib/job-priority"; @@ -167,7 +168,7 @@ export async function searchController(req: Request, res: Response) { } const { team_id, chunk } = auth; - redisConnection.sadd("teams_using_v0", team_id) + redisEvictConnection.sadd("teams_using_v0", team_id) .catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id })); const crawlerOptions = req.body.crawlerOptions ?? {}; diff --git a/apps/api/src/controllers/v1/concurrency-check.ts b/apps/api/src/controllers/v1/concurrency-check.ts index 1aa69363..a6313c72 100644 --- a/apps/api/src/controllers/v1/concurrency-check.ts +++ b/apps/api/src/controllers/v1/concurrency-check.ts @@ -4,7 +4,7 @@ import { RequestWithAuth, } from "./types"; import { Response } from "express"; -import { redisConnection } from "../../services/queue-service"; +import { redisEvictConnection } from "../../../src/services/redis"; // Basically just middleware and error wrapping export async function concurrencyCheckController( @@ -13,7 +13,7 @@ export async function concurrencyCheckController( ) { const concurrencyLimiterKey = "concurrency-limiter:" + req.auth.team_id; const now = Date.now(); - const activeJobsOfTeam = await redisConnection.zrangebyscore( + const activeJobsOfTeam = await redisEvictConnection.zrangebyscore( concurrencyLimiterKey, now, Infinity, diff --git a/apps/api/src/controllers/v1/crawl-errors.ts b/apps/api/src/controllers/v1/crawl-errors.ts index 979a6d7a..d8d029ee 100644 --- a/apps/api/src/controllers/v1/crawl-errors.ts +++ b/apps/api/src/controllers/v1/crawl-errors.ts @@ -8,7 +8,8 @@ import { getCrawl, getCrawlJobs, } from "../../lib/crawl-redis"; -import { getScrapeQueue, redisConnection } from "../../services/queue-service"; +import { getScrapeQueue } from "../../services/queue-service"; +import { redisEvictConnection } from "../../../src/services/redis"; import { configDotenv } from "dotenv"; import { Job } from "bullmq"; configDotenv(); @@ -65,7 +66,7 @@ export async function crawlErrorsController( url: x.data.url, error: x.failedReason, })), - robotsBlocked: await redisConnection.smembers( + robotsBlocked: await redisEvictConnection.smembers( "crawl:" + req.params.jobId + ":robots_blocked", ), }); diff --git a/apps/api/src/lib/concurrency-limit.ts b/apps/api/src/lib/concurrency-limit.ts index 8901413f..59634e0b 100644 --- a/apps/api/src/lib/concurrency-limit.ts +++ b/apps/api/src/lib/concurrency-limit.ts @@ -1,4 +1,4 @@ -import { redisConnection } from "../services/queue-service"; +import { redisEvictConnection } from "../services/redis"; import type { JobsOptions } from "bullmq"; const constructKey = (team_id: string) => "concurrency-limiter:" + team_id; @@ -12,14 +12,14 @@ export async function cleanOldConcurrencyLimitEntries( team_id: string, now: number = Date.now(), ) { - await redisConnection.zremrangebyscore(constructKey(team_id), -Infinity, now); + await redisEvictConnection.zremrangebyscore(constructKey(team_id), -Infinity, now); } export async function getConcurrencyLimitActiveJobs( team_id: string, now: number = Date.now(), ): Promise { - return await redisConnection.zrangebyscore( + return await redisEvictConnection.zrangebyscore( constructKey(team_id), now, Infinity, @@ -32,7 +32,7 @@ export async function pushConcurrencyLimitActiveJob( timeout: number, now: number = Date.now(), ) { - await redisConnection.zadd( + await redisEvictConnection.zadd( constructKey(team_id), now + timeout, id, @@ -43,7 +43,7 @@ export async function removeConcurrencyLimitActiveJob( team_id: string, id: string, ) { - await redisConnection.zrem(constructKey(team_id), id); + await redisEvictConnection.zrem(constructKey(team_id), id); } export type ConcurrencyLimitedJob = { @@ -56,8 +56,8 @@ export type ConcurrencyLimitedJob = { export async function takeConcurrencyLimitedJob( team_id: string, ): Promise { - await redisConnection.zremrangebyscore(constructQueueKey(team_id), -Infinity, Date.now()); - const res = await redisConnection.zmpop(1, constructQueueKey(team_id), "MIN"); + await redisEvictConnection.zremrangebyscore(constructQueueKey(team_id), -Infinity, Date.now()); + const res = await redisEvictConnection.zmpop(1, constructQueueKey(team_id), "MIN"); if (res === null || res === undefined) { return null; } @@ -70,7 +70,7 @@ export async function pushConcurrencyLimitedJob( job: ConcurrencyLimitedJob, timeout: number, ) { - await redisConnection.zadd( + await redisEvictConnection.zadd( constructQueueKey(team_id), Date.now() + timeout, JSON.stringify(job), @@ -80,11 +80,11 @@ export async function pushConcurrencyLimitedJob( export async function getConcurrencyLimitedJobs( team_id: string, ) { - return new Set((await redisConnection.zrange(constructQueueKey(team_id), 0, -1)).map(x => JSON.parse(x).id)); + return new Set((await redisEvictConnection.zrange(constructQueueKey(team_id), 0, -1)).map(x => JSON.parse(x).id)); } export async function getConcurrencyQueueJobsCount(team_id: string): Promise { - const count = await redisConnection.zcard(constructQueueKey(team_id)); + const count = await redisEvictConnection.zcard(constructQueueKey(team_id)); return count; } @@ -92,14 +92,14 @@ export async function cleanOldCrawlConcurrencyLimitEntries( crawl_id: string, now: number = Date.now(), ) { - await redisConnection.zremrangebyscore(constructCrawlKey(crawl_id), -Infinity, now); + await redisEvictConnection.zremrangebyscore(constructCrawlKey(crawl_id), -Infinity, now); } export async function getCrawlConcurrencyLimitActiveJobs( crawl_id: string, now: number = Date.now(), ): Promise { - return await redisConnection.zrangebyscore( + return await redisEvictConnection.zrangebyscore( constructCrawlKey(crawl_id), now, Infinity, @@ -112,7 +112,7 @@ export async function pushCrawlConcurrencyLimitActiveJob( timeout: number, now: number = Date.now(), ) { - await redisConnection.zadd( + await redisEvictConnection.zadd( constructCrawlKey(crawl_id), now + timeout, id, @@ -123,13 +123,13 @@ export async function removeCrawlConcurrencyLimitActiveJob( crawl_id: string, id: string, ) { - await redisConnection.zrem(constructCrawlKey(crawl_id), id); + await redisEvictConnection.zrem(constructCrawlKey(crawl_id), id); } export async function takeCrawlConcurrencyLimitedJob( crawl_id: string, ): Promise { - const res = await redisConnection.zmpop(1, constructCrawlQueueKey(crawl_id), "MIN"); + const res = await redisEvictConnection.zmpop(1, constructCrawlQueueKey(crawl_id), "MIN"); if (res === null || res === undefined) { return null; } @@ -140,7 +140,7 @@ export async function pushCrawlConcurrencyLimitedJob( crawl_id: string, job: ConcurrencyLimitedJob, ) { - await redisConnection.zadd( + await redisEvictConnection.zadd( constructCrawlQueueKey(crawl_id), job.priority ?? 1, JSON.stringify(job), @@ -150,10 +150,10 @@ export async function pushCrawlConcurrencyLimitedJob( export async function getCrawlConcurrencyLimitedJobs( crawl_id: string, ) { - return new Set((await redisConnection.zrange(constructCrawlQueueKey(crawl_id), 0, -1)).map(x => JSON.parse(x).id)); + return new Set((await redisEvictConnection.zrange(constructCrawlQueueKey(crawl_id), 0, -1)).map(x => JSON.parse(x).id)); } export async function getCrawlConcurrencyQueueJobsCount(crawl_id: string): Promise { - const count = await redisConnection.zcard(constructCrawlQueueKey(crawl_id)); + const count = await redisEvictConnection.zcard(constructCrawlQueueKey(crawl_id)); return count; } diff --git a/apps/api/src/lib/crawl-redis.ts b/apps/api/src/lib/crawl-redis.ts index 96de520d..0984a628 100644 --- a/apps/api/src/lib/crawl-redis.ts +++ b/apps/api/src/lib/crawl-redis.ts @@ -1,7 +1,7 @@ import { InternalOptions } from "../scraper/scrapeURL"; import { ScrapeOptions, TeamFlags } from "../controllers/v1/types"; import { WebCrawler } from "../scraper/WebScraper/crawler"; -import { redisConnection } from "../services/queue-service"; +import { redisEvictConnection } from "../services/redis"; import { logger as _logger } from "./logger"; import { getAdjustedMaxDepth } from "../scraper/WebScraper/utils/maxDepthUtils"; @@ -24,24 +24,24 @@ export async function saveCrawl(id: string, crawl: StoredCrawl) { crawlId: id, teamId: crawl.team_id, }); - await redisConnection.set("crawl:" + id, JSON.stringify(crawl)); - await redisConnection.expire("crawl:" + id, 24 * 60 * 60); + await redisEvictConnection.set("crawl:" + id, JSON.stringify(crawl)); + await redisEvictConnection.expire("crawl:" + id, 24 * 60 * 60); } export async function getCrawl(id: string): Promise { - const x = await redisConnection.get("crawl:" + id); + const x = await redisEvictConnection.get("crawl:" + id); if (x === null) { return null; } - await redisConnection.expire("crawl:" + id, 24 * 60 * 60); + await redisEvictConnection.expire("crawl:" + id, 24 * 60 * 60); return JSON.parse(x); } export async function getCrawlExpiry(id: string): Promise { const d = new Date(); - const ttl = await redisConnection.pttl("crawl:" + id); + const ttl = await redisEvictConnection.pttl("crawl:" + id); d.setMilliseconds(d.getMilliseconds() + ttl); d.setMilliseconds(0); return d; @@ -54,8 +54,8 @@ export async function addCrawlJob(id: string, job_id: string) { method: "addCrawlJob", crawlId: id, }); - await redisConnection.sadd("crawl:" + id + ":jobs", job_id); - await redisConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60); + await redisEvictConnection.sadd("crawl:" + id + ":jobs", job_id); + await redisEvictConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60); } export async function addCrawlJobs(id: string, job_ids: string[]) { @@ -67,8 +67,8 @@ export async function addCrawlJobs(id: string, job_ids: string[]) { method: "addCrawlJobs", crawlId: id, }); - await redisConnection.sadd("crawl:" + id + ":jobs", ...job_ids); - await redisConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60); + await redisEvictConnection.sadd("crawl:" + id + ":jobs", ...job_ids); + await redisEvictConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60); } export async function addCrawlJobDone( @@ -82,32 +82,32 @@ export async function addCrawlJobDone( method: "addCrawlJobDone", crawlId: id, }); - await redisConnection.sadd("crawl:" + id + ":jobs_done", job_id); - await redisConnection.expire( + await redisEvictConnection.sadd("crawl:" + id + ":jobs_done", job_id); + await redisEvictConnection.expire( "crawl:" + id + ":jobs_done", 24 * 60 * 60, ); if (success) { - await redisConnection.rpush("crawl:" + id + ":jobs_done_ordered", job_id); + await redisEvictConnection.rpush("crawl:" + id + ":jobs_done_ordered", job_id); } else { // in case it's already been pushed, make sure it's removed - await redisConnection.lrem( + await redisEvictConnection.lrem( "crawl:" + id + ":jobs_done_ordered", -1, job_id, ); } - await redisConnection.expire( + await redisEvictConnection.expire( "crawl:" + id + ":jobs_done_ordered", 24 * 60 * 60, ); } export async function getDoneJobsOrderedLength(id: string): Promise { - await redisConnection.expire("crawl:" + id + ":jobs_done_ordered", 24 * 60 * 60); - return await redisConnection.llen("crawl:" + id + ":jobs_done_ordered"); + await redisEvictConnection.expire("crawl:" + id + ":jobs_done_ordered", 24 * 60 * 60); + return await redisEvictConnection.llen("crawl:" + id + ":jobs_done_ordered"); } export async function getDoneJobsOrdered( @@ -115,8 +115,8 @@ export async function getDoneJobsOrdered( start = 0, end = -1, ): Promise { - await redisConnection.expire("crawl:" + id + ":jobs_done_ordered", 24 * 60 * 60); - return await redisConnection.lrange( + await redisEvictConnection.expire("crawl:" + id + ":jobs_done_ordered", 24 * 60 * 60); + return await redisEvictConnection.lrange( "crawl:" + id + ":jobs_done_ordered", start, end, @@ -124,27 +124,27 @@ export async function getDoneJobsOrdered( } export async function isCrawlFinished(id: string) { - await redisConnection.expire("crawl:" + id + ":kickoff:finish", 24 * 60 * 60); + await redisEvictConnection.expire("crawl:" + id + ":kickoff:finish", 24 * 60 * 60); return ( - (await redisConnection.scard("crawl:" + id + ":jobs_done")) === - (await redisConnection.scard("crawl:" + id + ":jobs")) && - (await redisConnection.get("crawl:" + id + ":kickoff:finish")) !== null + (await redisEvictConnection.scard("crawl:" + id + ":jobs_done")) === + (await redisEvictConnection.scard("crawl:" + id + ":jobs")) && + (await redisEvictConnection.get("crawl:" + id + ":kickoff:finish")) !== null ); } export async function isCrawlKickoffFinished(id: string) { - await redisConnection.expire("crawl:" + id + ":kickoff:finish", 24 * 60 * 60); + await redisEvictConnection.expire("crawl:" + id + ":kickoff:finish", 24 * 60 * 60); return ( - (await redisConnection.get("crawl:" + id + ":kickoff:finish")) !== null + (await redisEvictConnection.get("crawl:" + id + ":kickoff:finish")) !== null ); } export async function isCrawlFinishedLocked(id: string) { - return await redisConnection.exists("crawl:" + id + ":finish"); + return await redisEvictConnection.exists("crawl:" + id + ":finish"); } export async function finishCrawlKickoff(id: string) { - await redisConnection.set( + await redisEvictConnection.set( "crawl:" + id + ":kickoff:finish", "yes", "EX", @@ -159,18 +159,18 @@ export async function finishCrawlPre(id: string) { method: "finishCrawlPre", crawlId: id, }); - const set = await redisConnection.setnx("crawl:" + id + ":finished_pre", "yes"); - await redisConnection.expire("crawl:" + id + ":finished_pre", 24 * 60 * 60); + const set = await redisEvictConnection.setnx("crawl:" + id + ":finished_pre", "yes"); + await redisEvictConnection.expire("crawl:" + id + ":finished_pre", 24 * 60 * 60); return set === 1; } else { // _logger.debug("Crawl can not be pre-finished yet, not marking as finished.", { // module: "crawl-redis", // method: "finishCrawlPre", // crawlId: id, - // jobs_done: await redisConnection.scard("crawl:" + id + ":jobs_done"), - // jobs: await redisConnection.scard("crawl:" + id + ":jobs"), + // jobs_done: await redisEvictConnection.scard("crawl:" + id + ":jobs_done"), + // jobs: await redisEvictConnection.scard("crawl:" + id + ":jobs"), // kickoff_finished: - // (await redisConnection.get("crawl:" + id + ":kickoff:finish")) !== null, + // (await redisEvictConnection.get("crawl:" + id + ":kickoff:finish")) !== null, // }); } } @@ -181,16 +181,16 @@ export async function finishCrawl(id: string) { method: "finishCrawl", crawlId: id, }); - await redisConnection.set("crawl:" + id + ":finish", "yes"); - await redisConnection.expire("crawl:" + id + ":finish", 24 * 60 * 60); + await redisEvictConnection.set("crawl:" + id + ":finish", "yes"); + await redisEvictConnection.expire("crawl:" + id + ":finish", 24 * 60 * 60); } export async function getCrawlJobs(id: string): Promise { - return await redisConnection.smembers("crawl:" + id + ":jobs"); + return await redisEvictConnection.smembers("crawl:" + id + ":jobs"); } export async function getCrawlJobCount(id: string): Promise { - return await redisConnection.scard("crawl:" + id + ":jobs"); + return await redisEvictConnection.scard("crawl:" + id + ":jobs"); } export function normalizeURL(url: string, sc: StoredCrawl): string { @@ -276,7 +276,7 @@ export async function lockURL( if (typeof sc.crawlerOptions?.limit === "number") { if ( - (await redisConnection.scard("crawl:" + id + ":visited_unique")) >= + (await redisEvictConnection.scard("crawl:" + id + ":visited_unique")) >= sc.crawlerOptions.limit ) { // logger.debug( @@ -291,22 +291,22 @@ export async function lockURL( let res: boolean; if (!sc.crawlerOptions?.deduplicateSimilarURLs) { - res = (await redisConnection.sadd("crawl:" + id + ":visited", url)) !== 0; + res = (await redisEvictConnection.sadd("crawl:" + id + ":visited", url)) !== 0; } else { const permutations = generateURLPermutations(url).map((x) => x.href); // logger.debug("Adding URL permutations for URL " + JSON.stringify(url) + "...", { permutations }); - const x = await redisConnection.sadd( + const x = await redisEvictConnection.sadd( "crawl:" + id + ":visited", ...permutations, ); res = x === permutations.length; } - await redisConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60); + await redisEvictConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60); if (res) { - await redisConnection.sadd("crawl:" + id + ":visited_unique", url); - await redisConnection.expire( + await redisEvictConnection.sadd("crawl:" + id + ":visited_unique", url); + await redisEvictConnection.expire( "crawl:" + id + ":visited_unique", 24 * 60 * 60, ); @@ -336,29 +336,29 @@ export async function lockURLs( // Add to visited_unique set logger.debug("Locking " + urls.length + " URLs..."); - await redisConnection.sadd("crawl:" + id + ":visited_unique", ...urls); - await redisConnection.expire( + await redisEvictConnection.sadd("crawl:" + id + ":visited_unique", ...urls); + await redisEvictConnection.expire( "crawl:" + id + ":visited_unique", 24 * 60 * 60, ); let res: boolean; if (!sc.crawlerOptions?.deduplicateSimilarURLs) { - const x = await redisConnection.sadd("crawl:" + id + ":visited", ...urls); + const x = await redisEvictConnection.sadd("crawl:" + id + ":visited", ...urls); res = x === urls.length; } else { const allPermutations = urls.flatMap((url) => generateURLPermutations(url).map((x) => x.href), ); logger.debug("Adding " + allPermutations.length + " URL permutations..."); - const x = await redisConnection.sadd( + const x = await redisEvictConnection.sadd( "crawl:" + id + ":visited", ...allPermutations, ); res = x === allPermutations.length; } - await redisConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60); + await redisEvictConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60); logger.debug("lockURLs final result: " + res, { res }); return res; diff --git a/apps/api/src/lib/deep-research/deep-research-redis.ts b/apps/api/src/lib/deep-research/deep-research-redis.ts index 3e846b49..b2a0b8c8 100644 --- a/apps/api/src/lib/deep-research/deep-research-redis.ts +++ b/apps/api/src/lib/deep-research/deep-research-redis.ts @@ -1,4 +1,4 @@ -import { redisConnection } from "../../services/queue-service"; +import { redisEvictConnection } from "../../services/redis"; import { logger as _logger } from "../logger"; export enum DeepResearchStep { @@ -52,12 +52,12 @@ const DEEP_RESEARCH_TTL = 6 * 60 * 60; export async function saveDeepResearch(id: string, research: StoredDeepResearch) { _logger.debug("Saving deep research " + id + " to Redis..."); - await redisConnection.set("deep-research:" + id, JSON.stringify(research)); - await redisConnection.expire("deep-research:" + id, DEEP_RESEARCH_TTL); + await redisEvictConnection.set("deep-research:" + id, JSON.stringify(research)); + await redisEvictConnection.expire("deep-research:" + id, DEEP_RESEARCH_TTL); } export async function getDeepResearch(id: string): Promise { - const x = await redisConnection.get("deep-research:" + id); + const x = await redisEvictConnection.get("deep-research:" + id); return x ? JSON.parse(x) : null; } @@ -91,13 +91,13 @@ export async function updateDeepResearch( - await redisConnection.set("deep-research:" + id, JSON.stringify(updatedResearch)); - await redisConnection.expire("deep-research:" + id, DEEP_RESEARCH_TTL); + await redisEvictConnection.set("deep-research:" + id, JSON.stringify(updatedResearch)); + await redisEvictConnection.expire("deep-research:" + id, DEEP_RESEARCH_TTL); } export async function getDeepResearchExpiry(id: string): Promise { const d = new Date(); - const ttl = await redisConnection.pttl("deep-research:" + id); + const ttl = await redisEvictConnection.pttl("deep-research:" + id); d.setMilliseconds(d.getMilliseconds() + ttl); d.setMilliseconds(0); return d; diff --git a/apps/api/src/lib/extract/extract-redis.ts b/apps/api/src/lib/extract/extract-redis.ts index d256c582..397a8a60 100644 --- a/apps/api/src/lib/extract/extract-redis.ts +++ b/apps/api/src/lib/extract/extract-redis.ts @@ -1,4 +1,4 @@ -import { redisConnection } from "../../services/queue-service"; +import { redisEvictConnection } from "../../services/redis"; import { logger as _logger } from "../logger"; import { CostTracking } from "./extraction-service"; @@ -61,12 +61,12 @@ export async function saveExtract(id: string, extract: StoredExtract) { discoveredLinks: step.discoveredLinks?.slice(0, STEPS_MAX_DISCOVERED_LINKS) })) }; - await redisConnection.set("extract:" + id, JSON.stringify(minimalExtract)); - await redisConnection.expire("extract:" + id, EXTRACT_TTL); + await redisEvictConnection.set("extract:" + id, JSON.stringify(minimalExtract)); + await redisEvictConnection.expire("extract:" + id, EXTRACT_TTL); } export async function getExtract(id: string): Promise { - const x = await redisConnection.get("extract:" + id); + const x = await redisEvictConnection.get("extract:" + id); return x ? JSON.parse(x) : null; } @@ -111,13 +111,13 @@ export async function updateExtract( console.log(minimalExtract.sessionIds) - await redisConnection.set("extract:" + id, JSON.stringify(minimalExtract)); - await redisConnection.expire("extract:" + id, EXTRACT_TTL); + await redisEvictConnection.set("extract:" + id, JSON.stringify(minimalExtract)); + await redisEvictConnection.expire("extract:" + id, EXTRACT_TTL); } export async function getExtractExpiry(id: string): Promise { const d = new Date(); - const ttl = await redisConnection.pttl("extract:" + id); + const ttl = await redisEvictConnection.pttl("extract:" + id); d.setMilliseconds(d.getMilliseconds() + ttl); d.setMilliseconds(0); return d; diff --git a/apps/api/src/lib/generate-llmstxt/generate-llmstxt-redis.ts b/apps/api/src/lib/generate-llmstxt/generate-llmstxt-redis.ts index c5bf6479..f44148cf 100644 --- a/apps/api/src/lib/generate-llmstxt/generate-llmstxt-redis.ts +++ b/apps/api/src/lib/generate-llmstxt/generate-llmstxt-redis.ts @@ -1,4 +1,4 @@ -import { redisConnection } from "../../services/queue-service"; +import { redisEvictConnection } from "../../services/redis"; import { logger as _logger } from "../logger"; export interface GenerationData { @@ -20,12 +20,12 @@ const GENERATION_TTL = 24 * 60 * 60; export async function saveGeneratedLlmsTxt(id: string, data: GenerationData): Promise { _logger.debug("Saving llmstxt generation " + id + " to Redis..."); - await redisConnection.set("generation:" + id, JSON.stringify(data)); - await redisConnection.expire("generation:" + id, GENERATION_TTL); + await redisEvictConnection.set("generation:" + id, JSON.stringify(data)); + await redisEvictConnection.expire("generation:" + id, GENERATION_TTL); } export async function getGeneratedLlmsTxt(id: string): Promise { - const x = await redisConnection.get("generation:" + id); + const x = await redisEvictConnection.get("generation:" + id); return x ? JSON.parse(x) : null; } @@ -41,13 +41,13 @@ export async function updateGeneratedLlmsTxt( ...data }; - await redisConnection.set("generation:" + id, JSON.stringify(updatedGeneration)); - await redisConnection.expire("generation:" + id, GENERATION_TTL); + await redisEvictConnection.set("generation:" + id, JSON.stringify(updatedGeneration)); + await redisEvictConnection.expire("generation:" + id, GENERATION_TTL); } export async function getGeneratedLlmsTxtExpiry(id: string): Promise { const d = new Date(); - const ttl = await redisConnection.pttl("generation:" + id); + const ttl = await redisEvictConnection.pttl("generation:" + id); d.setMilliseconds(d.getMilliseconds() + ttl); d.setMilliseconds(0); return d; diff --git a/apps/api/src/lib/job-priority.ts b/apps/api/src/lib/job-priority.ts index 02356c21..3e225db5 100644 --- a/apps/api/src/lib/job-priority.ts +++ b/apps/api/src/lib/job-priority.ts @@ -1,6 +1,6 @@ import { RateLimiterMode } from "../types"; import { getACUC, getACUCTeam } from "../controllers/auth"; -import { redisConnection } from "../services/queue-service"; +import { redisEvictConnection } from "../services/redis"; import { logger } from "./logger"; const SET_KEY_PREFIX = "limit_team_id:"; @@ -9,10 +9,10 @@ export async function addJobPriority(team_id, job_id) { const setKey = SET_KEY_PREFIX + team_id; // Add scrape job id to the set - await redisConnection.sadd(setKey, job_id); + await redisEvictConnection.sadd(setKey, job_id); // This approach will reset the expiration time to 60 seconds every time a new job is added to the set. - await redisConnection.expire(setKey, 60); + await redisEvictConnection.expire(setKey, 60); } catch (e) { logger.error(`Add job priority (sadd) failed: ${team_id}, ${job_id}`); } @@ -23,7 +23,7 @@ export async function deleteJobPriority(team_id, job_id) { const setKey = SET_KEY_PREFIX + team_id; // remove job_id from the set - await redisConnection.srem(setKey, job_id); + await redisEvictConnection.srem(setKey, job_id); } catch (e) { logger.error(`Delete job priority (srem) failed: ${team_id}, ${job_id}`); } @@ -48,7 +48,7 @@ export async function getJobPriority({ const setKey = SET_KEY_PREFIX + team_id; // Get the length of the set - const setLength = await redisConnection.scard(setKey); + const setLength = await redisEvictConnection.scard(setKey); // Determine the priority based on the plan and set length let planModifier = acuc?.plan_priority.planModifier ?? 1; diff --git a/apps/api/src/scraper/WebScraper/crawler.ts b/apps/api/src/scraper/WebScraper/crawler.ts index 56c46faf..899e5157 100644 --- a/apps/api/src/scraper/WebScraper/crawler.ts +++ b/apps/api/src/scraper/WebScraper/crawler.ts @@ -7,7 +7,7 @@ import { getURLDepth } from "./utils/maxDepthUtils"; import { axiosTimeout } from "../../lib/timeout"; import { logger as _logger } from "../../lib/logger"; import https from "https"; -import { redisConnection } from "../../services/queue-service"; +import { redisEvictConnection } from "../../services/redis"; import { extractLinks } from "../../lib/html-transformer"; import { TimeoutSignal } from "../../controllers/v1/types"; export class WebCrawler { @@ -287,7 +287,7 @@ export class WebCrawler { let uniqueURLs: string[] = []; for (const url of filteredLinks) { if ( - await redisConnection.sadd( + await redisEvictConnection.sadd( "sitemap:" + this.jobId + ":links", normalizeUrl(url), ) @@ -296,7 +296,7 @@ export class WebCrawler { } } - await redisConnection.expire( + await redisEvictConnection.expire( "sitemap:" + this.jobId + ":links", 3600, "NX", @@ -324,7 +324,7 @@ export class WebCrawler { if (count > 0) { if ( - await redisConnection.sadd( + await redisEvictConnection.sadd( "sitemap:" + this.jobId + ":links", normalizeUrl(this.initialUrl), ) @@ -334,7 +334,7 @@ export class WebCrawler { count++; } - await redisConnection.expire( + await redisEvictConnection.expire( "sitemap:" + this.jobId + ":links", 3600, "NX", @@ -390,11 +390,11 @@ export class WebCrawler { !this.isRobotsAllowed(fullUrl, this.ignoreRobotsTxt) ) { (async () => { - await redisConnection.sadd( + await redisEvictConnection.sadd( "crawl:" + this.jobId + ":robots_blocked", fullUrl, ); - await redisConnection.expire( + await redisEvictConnection.expire( "crawl:" + this.jobId + ":robots_blocked", 24 * 60 * 60, ); diff --git a/apps/api/src/services/notification/email_notification.ts b/apps/api/src/services/notification/email_notification.ts index cf238b06..e85109f0 100644 --- a/apps/api/src/services/notification/email_notification.ts +++ b/apps/api/src/services/notification/email_notification.ts @@ -7,7 +7,7 @@ import { sendSlackWebhook } from "../alerts/slack"; import { getNotificationString } from "./notification_string"; import { AuthCreditUsageChunk } from "../../controllers/v1/types"; import { redlock } from "../redlock"; -import { redisConnection } from "../queue-service"; +import { redisEvictConnection } from "../redis"; const emailTemplates: Record< NotificationType, @@ -268,14 +268,14 @@ export async function sendNotificationWithCustomDays( ) => { const redisKey = "notification_sent:" + notificationType + ":" + team_id; - const didSendRecentNotification = (await redisConnection.get(redisKey)) !== null; + const didSendRecentNotification = (await redisEvictConnection.get(redisKey)) !== null; if (didSendRecentNotification && !bypassRecentChecks) { logger.debug(`Notification already sent within the last ${daysBetweenEmails} days for team_id: ${team_id} and notificationType: ${notificationType}`); return { success: true }; } - await redisConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60); + await redisEvictConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60); const now = new Date(); const pastDate = new Date(now.getTime() - daysBetweenEmails * 24 * 60 * 60 * 1000); @@ -289,13 +289,13 @@ export async function sendNotificationWithCustomDays( if (recentNotificationsError) { logger.debug(`Error fetching recent notifications: ${recentNotificationsError}`); - await redisConnection.del(redisKey); // free up redis, let it try again + await redisEvictConnection.del(redisKey); // free up redis, let it try again return { success: false }; } if (recentNotifications.length > 0 && !bypassRecentChecks) { logger.debug(`Notification already sent within the last ${daysBetweenEmails} days for team_id: ${team_id} and notificationType: ${notificationType}`); - await redisConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60); + await redisEvictConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60); return { success: true }; } @@ -310,7 +310,7 @@ export async function sendNotificationWithCustomDays( if (emailsError) { logger.debug(`Error fetching emails: ${emailsError}`); - await redisConnection.del(redisKey); // free up redis, let it try again + await redisEvictConnection.del(redisKey); // free up redis, let it try again return { success: false }; } @@ -341,7 +341,7 @@ export async function sendNotificationWithCustomDays( if (insertError) { logger.debug(`Error inserting notification record: ${insertError}`); - await redisConnection.del(redisKey); // free up redis, let it try again + await redisEvictConnection.del(redisKey); // free up redis, let it try again return { success: false }; } diff --git a/apps/api/src/services/queue-service.ts b/apps/api/src/services/queue-service.ts index e84c006c..b2353296 100644 --- a/apps/api/src/services/queue-service.ts +++ b/apps/api/src/services/queue-service.ts @@ -131,7 +131,3 @@ export function getBillingQueue() { } return billingQueue; } - -// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE -// import { QueueEvents } from 'bullmq'; -// export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() }); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 2738b8ee..601e319e 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -6,11 +6,8 @@ import { getScrapeQueue, getExtractQueue, getDeepResearchQueue, - redisConnection, - scrapeQueueName, - extractQueueName, - deepResearchQueueName, getIndexQueue, + redisConnection, getGenerateLlmsTxtQueue, getBillingQueue, } from "./queue-service"; @@ -88,6 +85,7 @@ import https from "https"; import { cacheableLookup } from "../scraper/scrapeURL/lib/cacheableLookup"; import { robustFetch } from "../scraper/scrapeURL/lib/fetch"; import { RateLimiterMode } from "../types"; +import { redisEvictConnection } from "./redis"; configDotenv(); @@ -132,11 +130,11 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) { logger.info("Crawl is pre-finished, checking if we need to add more jobs"); if ( job.data.crawlerOptions && - !(await redisConnection.exists( + !(await redisEvictConnection.exists( "crawl:" + job.data.crawl_id + ":invisible_urls", )) ) { - await redisConnection.set( + await redisEvictConnection.set( "crawl:" + job.data.crawl_id + ":invisible_urls", "done", "EX", @@ -146,7 +144,7 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) { const sc = (await getCrawl(job.data.crawl_id))!; const visitedUrls = new Set( - await redisConnection.smembers( + await redisEvictConnection.smembers( "crawl:" + job.data.crawl_id + ":visited_unique", ), ); @@ -261,7 +259,7 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) { ? normalizeUrlOnlyHostname(sc.originUrl) : undefined; // Get all visited unique URLs from Redis - const visitedUrls = await redisConnection.smembers( + const visitedUrls = await redisEvictConnection.smembers( "crawl:" + job.data.crawl_id + ":visited_unique", ); // Upload to Supabase if we have URLs and this is a crawl (not a batch scrape) @@ -1230,7 +1228,7 @@ async function processJob(job: Job & { id: string }, token: string) { // Prevent redirect target from being visited in the crawl again // See lockURL - const x = await redisConnection.sadd( + const x = await redisEvictConnection.sadd( "crawl:" + job.data.crawl_id + ":visited", ...p1.map((x) => x.href), ); @@ -1452,7 +1450,7 @@ async function processJob(job: Job & { id: string }, token: string) { logger.debug("Declaring job as done..."); await addCrawlJobDone(job.data.crawl_id, job.id, false); - await redisConnection.srem( + await redisEvictConnection.srem( "crawl:" + job.data.crawl_id + ":visited_unique", normalizeURL(job.data.url, sc), ); diff --git a/apps/api/src/services/redis.ts b/apps/api/src/services/redis.ts index d2c7dd3a..4076aab8 100644 --- a/apps/api/src/services/redis.ts +++ b/apps/api/src/services/redis.ts @@ -1,4 +1,4 @@ -import Redis from "ioredis"; +import IORedis from "ioredis"; import { redisRateLimitClient } from "./rate-limiter"; import { logger } from "../lib/logger"; @@ -70,3 +70,6 @@ const deleteKey = async (key: string) => { }; export { setValue, getValue, deleteKey }; + +const redisEvictURL = process.env.REDIS_EVICT_URL ?? process.env.REDIS_RATE_LIMIT_URL; +export const redisEvictConnection = new IORedis(redisEvictURL!);