From cbe67d89a5dff6abef9db752ee07518df5fff8a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Wed, 15 Jan 2025 19:02:20 +0100 Subject: [PATCH] feat(queue-worker): proactive job cancel --- apps/api/src/services/queue-worker.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 81ed955a..d13cb198 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -739,6 +739,13 @@ async function processJob(job: Job & { id: string }, token: string) { }); const start = Date.now(); + if (job.data.crawl_id) { + const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl; + if (sc && sc.cancelled) { + throw new Error("Parent crawl/batch scrape was cancelled"); + } + } + const pipeline = await Promise.race([ startWebScraperPipeline({ job, @@ -983,11 +990,15 @@ async function processJob(job: Job & { id: string }, token: string) { } catch (error) { const isEarlyTimeout = error instanceof Error && error.message === "timeout"; + const isCancelled = + error instanceof Error && error.message === "Parent crawl/batch scrape was cancelled"; if (isEarlyTimeout) { logger.error(`🐂 Job timed out ${job.id}`); } else if (error instanceof RacedRedirectError) { logger.warn(`🐂 Job got redirect raced ${job.id}, silently failing`); + } else if (isCancelled) { + logger.warn(`🐂 Job got cancelled, silently failing`); } else { logger.error(`🐂 Job errored ${job.id} - ${error}`, { error });