diff --git a/apps/api/src/lib/crawl-redis.ts b/apps/api/src/lib/crawl-redis.ts index 88d6b716..9e8a0cf6 100644 --- a/apps/api/src/lib/crawl-redis.ts +++ b/apps/api/src/lib/crawl-redis.ts @@ -45,6 +45,16 @@ export async function isCrawlFinished(id: string) { return (await redisConnection.scard("crawl:" + id + ":jobs_done")) === (await redisConnection.scard("crawl:" + id + ":jobs")); } +export async function finishCrawl(id: string) { + if (await isCrawlFinished(id)) { + const set = await redisConnection.setnx("crawl:" + id + ":finish", "yes"); + if (set === 1) { + await redisConnection.expire("crawl:" + id + ":finish", 24 * 60 * 60); + } + return set === 1 + } +} + export async function getCrawlJobs(id: string): Promise { return await redisConnection.smembers("crawl:" + id + ":jobs"); } diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index de53b495..089e0aa7 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -15,7 +15,7 @@ import { Logger } from "../lib/logger"; import { Worker } from "bullmq"; import systemMonitor from "./system-monitor"; import { v4 as uuidv4 } from "uuid"; -import { addCrawlJob, addCrawlJobDone, crawlToCrawler, getCrawl, getCrawlJobs, isCrawlFinished, lockURL } from "../lib/crawl-redis"; +import { addCrawlJob, addCrawlJobDone, crawlToCrawler, finishCrawl, getCrawl, getCrawlJobs, isCrawlFinished, lockURL } from "../lib/crawl-redis"; import { StoredCrawl } from "../lib/crawl-redis"; import { addScrapeJob } from "./queue-jobs"; import { supabaseGetJobById } from "../../src/lib/supabase-jobs"; @@ -199,7 +199,7 @@ async function processJob(job: Job, token: string) { } } - if (await isCrawlFinished(job.data.crawl_id)) { + if (await finishCrawl(job.data.crawl_id)) { const jobIDs = await getCrawlJobs(job.data.crawl_id); const jobs = (await Promise.all(jobIDs.map(async x => {