From 2e5e480cc2240a1c0450c45f5f41f0d38c29a8bf Mon Sep 17 00:00:00 2001 From: Gergo Moricz Date: Tue, 13 Aug 2024 22:10:17 +0200 Subject: [PATCH] fix(crawl): call webhooks --- apps/api/src/lib/crawl-redis.ts | 9 +++++++++ apps/api/src/services/queue-worker.ts | 12 +++++++++--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/apps/api/src/lib/crawl-redis.ts b/apps/api/src/lib/crawl-redis.ts index f77b66a5..040e6a7f 100644 --- a/apps/api/src/lib/crawl-redis.ts +++ b/apps/api/src/lib/crawl-redis.ts @@ -30,6 +30,15 @@ export async function addCrawlJob(id: string, job_id: string) { await redisConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60, "NX"); } +export async function addCrawlJobDone(id: string, job_id: string) { + await redisConnection.sadd("crawl:" + id + ":jobs_done", job_id); + await redisConnection.expire("crawl:" + id + ":jobs_done", 24 * 60 * 60, "NX"); +} + +export async function isCrawlFinished(id: string) { + return (await redisConnection.scard("crawl:" + id + ":jobs_done")) === (await redisConnection.scard("crawl:" + id + ":jobs")); +} + export async function getCrawlJobs(id: string): Promise { return await redisConnection.smembers("crawl:" + id + ":jobs"); } diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 7aa639dd..2db1b336 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -18,7 +18,7 @@ import systemMonitor from "./system-monitor"; import { v4 as uuidv4 } from "uuid"; import { WebCrawler } from "../scraper/WebScraper/crawler"; import { getAdjustedMaxDepth } from "../scraper/WebScraper/utils/maxDepthUtils"; -import { addCrawlJob, crawlToCrawler, getCrawl, lockURL } from "../lib/crawl-redis"; +import { addCrawlJob, addCrawlJobDone, crawlToCrawler, getCrawl, isCrawlFinished, lockURL } from "../lib/crawl-redis"; import { StoredCrawl } from "../lib/crawl-redis"; import { addScrapeJob } from "./queue-jobs"; @@ -168,6 +168,8 @@ async function processJob(job: Job, token: string) { }); if (job.data.crawl_id) { + await addCrawlJobDone(job.data.crawl_id, job.id); + if (!job.data.sitemapped) { const sc = await getCrawl(job.data.crawl_id) as StoredCrawl; @@ -198,6 +200,10 @@ async function processJob(job: Job, token: string) { } } } + + if (await isCrawlFinished(job.data.crawl_id)) { + await callWebhook(job.data.team_id, job.id as string, data); + } } Logger.info(`🐂 Job done ${job.id}`); @@ -229,8 +235,8 @@ async function processJob(job: Job, token: string) { error: "Something went wrong... Contact help@mendable.ai or try again." /* etc... */, }; - if (job.data.mode === "crawl") { - await callWebhook(job.data.team_id, job.id as string, data); + if (job.data.mode === "crawl" || job.data.crawl_id) { + await callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data); } await logJob({ job_id: job.id as string,