diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 2db1b336..971d7d0b 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -10,17 +10,15 @@ import { startWebScraperPipeline } from "../main/runWebScraper"; import { callWebhook } from "./webhook"; import { logJob } from "./logging/log_job"; import { initSDK } from "@hyperdx/node-opentelemetry"; -import { Job, QueueEvents, tryCatch } from "bullmq"; +import { Job } from "bullmq"; import { Logger } from "../lib/logger"; -import { ScrapeEvents } from "../lib/scrape-events"; import { Worker } from "bullmq"; import systemMonitor from "./system-monitor"; import { v4 as uuidv4 } from "uuid"; -import { WebCrawler } from "../scraper/WebScraper/crawler"; -import { getAdjustedMaxDepth } from "../scraper/WebScraper/utils/maxDepthUtils"; -import { addCrawlJob, addCrawlJobDone, crawlToCrawler, getCrawl, isCrawlFinished, lockURL } from "../lib/crawl-redis"; +import { addCrawlJob, addCrawlJobDone, crawlToCrawler, getCrawl, getCrawlJobs, isCrawlFinished, lockURL } from "../lib/crawl-redis"; import { StoredCrawl } from "../lib/crawl-redis"; import { addScrapeJob } from "./queue-jobs"; +import { supabaseGetJobById } from "../../src/lib/supabase-jobs"; if (process.env.ENV === "production") { initSDK({ @@ -170,9 +168,9 @@ async function processJob(job: Job, token: string) { if (job.data.crawl_id) { await addCrawlJobDone(job.data.crawl_id, job.id); - if (!job.data.sitemapped) { - const sc = await getCrawl(job.data.crawl_id) as StoredCrawl; + const sc = await getCrawl(job.data.crawl_id) as StoredCrawl; + if (!job.data.sitemapped) { if (!sc.cancelled) { const crawler = crawlToCrawler(job.data.crawl_id, sc); @@ -202,6 +200,51 @@ async function processJob(job: Job, token: string) { } if (await isCrawlFinished(job.data.crawl_id)) { + const jobIDs = await getCrawlJobs(job.data.crawl_id); + + const jobs = (await Promise.all(jobIDs.map(async x => { + if (x === job.id) { + return { + async getState() { + return "completed" + }, + timestamp: Date.now(), + returnvalue: docs, + } + } + + const j = await getScrapeQueue().getJob(x); + + if (process.env.USE_DB_AUTHENTICATION === "true") { + const supabaseData = await supabaseGetJobById(j.id); + + if (supabaseData) { + j.returnvalue = supabaseData.docs; + } + } + + return j; + }))).sort((a, b) => a.timestamp - b.timestamp); + const jobStatuses = await Promise.all(jobs.map(x => x.getState())); + const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active"; + + const docs = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue); + + const data = { + success: jobStatus !== "failed", + result: { + links: docs.map((doc) => { + return { + content: doc, + source: doc?.metadata?.sourceURL ?? doc?.url ?? "", + }; + }), + }, + project_id: job.data.project_id, + error: message /* etc... */, + docs, + }; + await callWebhook(job.data.team_id, job.id as string, data); } }