diff --git a/apps/api/src/controllers/scrape.ts b/apps/api/src/controllers/scrape.ts index 3666fc1a..e9bd33b8 100644 --- a/apps/api/src/controllers/scrape.ts +++ b/apps/api/src/controllers/scrape.ts @@ -9,7 +9,7 @@ import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import import { numTokensFromString } from '../lib/LLM-extraction/helpers'; import { defaultPageOptions, defaultExtractorOptions, defaultTimeout, defaultOrigin } from '../lib/default-values'; import { addScrapeJob } from '../services/queue-jobs'; -import { scrapeQueueEvents } from '../services/queue-service'; +import { getScrapeQueue, scrapeQueueEvents } from '../services/queue-service'; import { v4 as uuidv4 } from "uuid"; import { Logger } from '../lib/logger'; import * as Sentry from "@sentry/node"; @@ -52,7 +52,19 @@ export async function scrapeHelper( const err = await Sentry.startSpan({ name: "Wait for job to finish", op: "bullmq.wait", attributes: { job: jobId } }, async (span) => { try { - doc = (await job.waitUntilFinished(scrapeQueueEvents, timeout))[0] + doc = (await new Promise((resolve, reject) => { + const start = Date.now(); + const int = setInterval(async () => { + if (Date.now() >= start + timeout) { + clearInterval(int); + reject(new Error("Job wait ")); + } else if (await job.getState() === "completed") { + clearInterval(int); + resolve((await getScrapeQueue().getJob(job.id)).returnvalue); + } + }, 1000); + job.waitUntilFinished(scrapeQueueEvents, timeout) + }))[0] } catch (e) { if (e instanceof Error && e.message.startsWith("Job wait")) { span.setAttribute("timedOut", true);