From d0a8382a5be1b4815aef2050776a53c54de95e6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Fri, 16 Aug 2024 18:48:52 +0200 Subject: [PATCH] fix(queue-worker): crawl finishing race condition --- apps/api/src/lib/crawl-redis.ts | 10 ++++++++++ apps/api/src/services/queue-worker.ts | 4 ++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/apps/api/src/lib/crawl-redis.ts b/apps/api/src/lib/crawl-redis.ts index 88d6b716..9e8a0cf6 100644 --- a/apps/api/src/lib/crawl-redis.ts +++ b/apps/api/src/lib/crawl-redis.ts @@ -45,6 +45,16 @@ export async function isCrawlFinished(id: string) { return (await redisConnection.scard("crawl:" + id + ":jobs_done")) === (await redisConnection.scard("crawl:" + id + ":jobs")); } +export async function finishCrawl(id: string) { + if (await isCrawlFinished(id)) { + const set = await redisConnection.setnx("crawl:" + id + ":finish", "yes"); + if (set === 1) { + await redisConnection.expire("crawl:" + id + ":finish", 24 * 60 * 60); + } + return set === 1 + } +} + export async function getCrawlJobs(id: string): Promise { return await redisConnection.smembers("crawl:" + id + ":jobs"); } diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index de53b495..089e0aa7 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -15,7 +15,7 @@ import { Logger } from "../lib/logger"; import { Worker } from "bullmq"; import systemMonitor from "./system-monitor"; import { v4 as uuidv4 } from "uuid"; -import { addCrawlJob, addCrawlJobDone, crawlToCrawler, getCrawl, getCrawlJobs, isCrawlFinished, lockURL } from "../lib/crawl-redis"; +import { addCrawlJob, addCrawlJobDone, crawlToCrawler, finishCrawl, getCrawl, getCrawlJobs, isCrawlFinished, lockURL } from "../lib/crawl-redis"; import { StoredCrawl } from "../lib/crawl-redis"; import { addScrapeJob } from "./queue-jobs"; import { supabaseGetJobById } from "../../src/lib/supabase-jobs"; @@ -199,7 +199,7 @@ async function processJob(job: Job, token: string) { } } - if (await isCrawlFinished(job.data.crawl_id)) { + if (await finishCrawl(job.data.crawl_id)) { const jobIDs = await getCrawlJobs(job.data.crawl_id); const jobs = (await Promise.all(jobIDs.map(async x => {