diff --git a/apps/api/src/controllers/v1/crawl-status-ws.ts b/apps/api/src/controllers/v1/crawl-status-ws.ts index 89769416..f7dd1b77 100644 --- a/apps/api/src/controllers/v1/crawl-status-ws.ts +++ b/apps/api/src/controllers/v1/crawl-status-ws.ts @@ -8,6 +8,7 @@ import { Logger } from "../../lib/logger"; import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis"; import { getScrapeQueue } from "../../services/queue-service"; import { getJob, getJobs } from "./crawl-status"; +import * as Sentry from "@sentry/node"; type ErrorMessage = { type: "error", @@ -56,31 +57,38 @@ async function crawlStatusWS(ws: WebSocket, req: RequestWithAuth { - const job = await getScrapeQueue().getJob(e.jobId) - if (job.data.crawl_id === req.params.jobId) { - if (doneJobIDs.includes(job.id)) return; - const j = await getJob(job.id); - if (j.returnvalue) { + const loop = async () => { + if (finished) return; + + const jobIDs = await getCrawlJobs(req.params.jobId); + + if (jobIDs.length === doneJobIDs.length) { + return close(ws, 1000, { type: "done" }); + } + + const notDoneJobIDs = jobIDs.filter(x => !doneJobIDs.includes(x)); + const jobStatuses = await Promise.all(notDoneJobIDs.map(async x => [x, await getScrapeQueue().getJobState(x)])); + const newlyDoneJobIDs = jobStatuses.filter(x => x[1] === "completed" || x[1] === "failed").map(x => x[0]); + + for (const jobID of newlyDoneJobIDs) { + const job = await getJob(jobID); + + if (job.returnvalue) { send(ws, { type: "document", - data: legacyDocumentConverter(j.returnvalue), - }); - if (await isCrawlFinishedLocked(req.params.jobId)) { - await new Promise((resolve) => setTimeout(() => resolve(true), 5000)) // wait for last events to pour in - scrapeQueueEvents.removeListener("completed", completedListener); - close(ws, 1000, { type: "done" }) - } + data: legacyDocumentConverter(job.returnvalue), + }) } else { - // FAILED + return close(ws, 3000, { type: "error", error: job.failedReason }); } } + + setTimeout(loop, 1000); }; - // TODO: handle failed jobs - - scrapeQueueEvents.addListener("completed", completedListener); + setTimeout(loop, 1000); doneJobIDs = await getDoneJobsOrdered(req.params.jobId); @@ -102,7 +110,7 @@ async function crawlStatusWS(ws: WebSocket, req: RequestWithAuth