From 6287db8492953bab7ee466b1a3b43ae376f7ac36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Fri, 4 Apr 2025 22:04:19 +0200 Subject: [PATCH] fix: broken on self-host --- apps/api/src/__tests__/snips/crawl.test.ts | 2 +- apps/api/src/services/queue-worker.ts | 54 +++++++++++----------- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/apps/api/src/__tests__/snips/crawl.test.ts b/apps/api/src/__tests__/snips/crawl.test.ts index 14503be7..87f43bf6 100644 --- a/apps/api/src/__tests__/snips/crawl.test.ts +++ b/apps/api/src/__tests__/snips/crawl.test.ts @@ -78,5 +78,5 @@ describe("Crawl tests", () => { limit: 3, delay: 5, }); - }, 600000); + }, 300000); }); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 1e4c0573..5c635916 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -674,37 +674,37 @@ const workerFun = async ( runningJobs.delete(job.id); } + if (job.id && job.data.crawl_id && job.data.crawlerOptions?.delay) { + await removeCrawlConcurrencyLimitActiveJob(job.data.crawl_id, job.id); + cleanOldCrawlConcurrencyLimitEntries(job.data.crawl_id); + + const delayInSeconds = job.data.crawlerOptions.delay; + const delayInMs = delayInSeconds * 1000; + + await new Promise(resolve => setTimeout(resolve, delayInMs)); + + const nextCrawlJob = await takeCrawlConcurrencyLimitedJob(job.data.crawl_id); + if (nextCrawlJob !== null) { + await pushCrawlConcurrencyLimitActiveJob(job.data.crawl_id, nextCrawlJob.id, 60 * 1000); + + await queue.add( + nextCrawlJob.id, + { + ...nextCrawlJob.data, + }, + { + ...nextCrawlJob.opts, + jobId: nextCrawlJob.id, + priority: nextCrawlJob.priority, + }, + ); + } + } + if (job.id && job.data && job.data.team_id && job.data.plan) { await removeConcurrencyLimitActiveJob(job.data.team_id, job.id); cleanOldConcurrencyLimitEntries(job.data.team_id); - if (job.data.crawl_id && job.data.crawlerOptions?.delay) { - await removeCrawlConcurrencyLimitActiveJob(job.data.crawl_id, job.id); - cleanOldCrawlConcurrencyLimitEntries(job.data.crawl_id); - - const delayInSeconds = job.data.crawlerOptions.delay; - const delayInMs = delayInSeconds * 1000; - - await new Promise(resolve => setTimeout(resolve, delayInMs)); - - const nextCrawlJob = await takeCrawlConcurrencyLimitedJob(job.data.crawl_id); - if (nextCrawlJob !== null) { - await pushCrawlConcurrencyLimitActiveJob(job.data.crawl_id, nextCrawlJob.id, 60 * 1000); - - await queue.add( - nextCrawlJob.id, - { - ...nextCrawlJob.data, - }, - { - ...nextCrawlJob.opts, - jobId: nextCrawlJob.id, - priority: nextCrawlJob.priority, - }, - ); - } - } - // No need to check if we're under the limit here -- if the current job is finished, // we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG const nextJob = await takeConcurrencyLimitedJob(job.data.team_id);