diff --git a/apps/api/src/controllers/v0/scrape.ts b/apps/api/src/controllers/v0/scrape.ts index 61a7cb32..6c131727 100644 --- a/apps/api/src/controllers/v0/scrape.ts +++ b/apps/api/src/controllers/v0/scrape.ts @@ -8,7 +8,7 @@ import { Document } from "../../lib/entities"; import { isUrlBlocked } from "../../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function import { numTokensFromString } from '../../lib/LLM-extraction/helpers'; import { defaultPageOptions, defaultExtractorOptions, defaultTimeout, defaultOrigin } from '../../lib/default-values'; -import { addScrapeJob } from '../../services/queue-jobs'; +import { addScrapeJob, waitForJob } from '../../services/queue-jobs'; import { getScrapeQueue } from '../../services/queue-service'; import { v4 as uuidv4 } from "uuid"; import { Logger } from '../../lib/logger'; @@ -52,18 +52,7 @@ export async function scrapeHelper( const err = await Sentry.startSpan({ name: "Wait for job to finish", op: "bullmq.wait", attributes: { job: jobId } }, async (span) => { try { - doc = (await new Promise((resolve, reject) => { - const start = Date.now(); - const int = setInterval(async () => { - if (Date.now() >= start + timeout) { - clearInterval(int); - reject(new Error("Job wait ")); - } else if (await job.getState() === "completed") { - clearInterval(int); - resolve((await getScrapeQueue().getJob(job.id)).returnvalue); - } - }, 1000); - }))[0] + doc = (await waitForJob(job.id, timeout))[0]; } catch (e) { if (e instanceof Error && e.message.startsWith("Job wait")) { span.setAttribute("timedOut", true); diff --git a/apps/api/src/controllers/v0/search.ts b/apps/api/src/controllers/v0/search.ts index fdb0c6fb..619aa984 100644 --- a/apps/api/src/controllers/v0/search.ts +++ b/apps/api/src/controllers/v0/search.ts @@ -10,7 +10,7 @@ import { isUrlBlocked } from "../../scraper/WebScraper/utils/blocklist"; import { v4 as uuidv4 } from "uuid"; import { Logger } from "../../lib/logger"; import { getScrapeQueue } from "../../services/queue-service"; -import { addScrapeJob } from "../../services/queue-jobs"; +import { addScrapeJob, waitForJob } from "../../services/queue-jobs"; import * as Sentry from "@sentry/node"; export async function searchHelper( @@ -108,18 +108,7 @@ export async function searchHelper( await getScrapeQueue().addBulk(jobs); } - const docs = (await Promise.all(jobs.map(x => new Promise((resolve, reject) => { - const start = Date.now(); - const int = setInterval(async () => { - if (Date.now() >= start + 60000) { - clearInterval(int); - reject(new Error("Job wait ")); - } else if (await x.getState() === "completed") { - clearInterval(int); - resolve((await getScrapeQueue().getJob(x.id)).returnvalue); - } - }, 1000); - })))).map(x => x[0]); + const docs = (await Promise.all(jobs.map(x => waitForJob(x.id, 60000)))).map(x => x[0]); if (docs.length === 0) { return { success: true, error: "No search results found", returnCode: 200 }; diff --git a/apps/api/src/controllers/v1/crawl-status-ws.ts b/apps/api/src/controllers/v1/crawl-status-ws.ts index 6e2e2eaf..89769416 100644 --- a/apps/api/src/controllers/v1/crawl-status-ws.ts +++ b/apps/api/src/controllers/v1/crawl-status-ws.ts @@ -6,7 +6,7 @@ import { WebSocket } from "ws"; import { v4 as uuidv4 } from "uuid"; import { Logger } from "../../lib/logger"; import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis"; -import { getScrapeQueue, scrapeQueueEvents } from "../../services/queue-service"; +import { getScrapeQueue } from "../../services/queue-service"; import { getJob, getJobs } from "./crawl-status"; type ErrorMessage = { diff --git a/apps/api/src/controllers/v1/scrape.ts b/apps/api/src/controllers/v1/scrape.ts index ffa04b82..cbd7fe2b 100644 --- a/apps/api/src/controllers/v1/scrape.ts +++ b/apps/api/src/controllers/v1/scrape.ts @@ -4,8 +4,7 @@ import { Document, legacyDocumentConverter, legacyScrapeOptions, RequestWithAuth import { billTeam } from "../../services/billing/credit_billing"; import { v4 as uuidv4 } from 'uuid'; import { numTokensFromString } from "../../lib/LLM-extraction/helpers"; -import { addScrapeJob } from "../../services/queue-jobs"; -import { scrapeQueueEvents } from '../../services/queue-service'; +import { addScrapeJob, waitForJob } from "../../services/queue-jobs"; import { logJob } from "../../services/logging/log_job"; export async function scrapeController(req: RequestWithAuth<{}, ScrapeResponse, ScrapeRequest>, res: Response) { @@ -30,7 +29,7 @@ export async function scrapeController(req: RequestWithAuth<{}, ScrapeResponse, let doc: any | undefined; try { - doc = (await job.waitUntilFinished(scrapeQueueEvents, timeout))[0]; // 60 seconds timeout + doc = (await waitForJob(job.id, timeout))[0]; } catch (e) { Logger.error(`Error in scrapeController: ${e}`); if (e instanceof Error && e.message.startsWith("Job wait")) { diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 888cdefc..671fe59f 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -46,3 +46,17 @@ export async function addScrapeJob( } } +export function waitForJob(jobId: string, timeout: number) { + return new Promise((resolve, reject) => { + const start = Date.now(); + const int = setInterval(async () => { + if (Date.now() >= start + timeout) { + clearInterval(int); + reject(new Error("Job wait ")); + } else if (await getScrapeQueue().getJobState(jobId) === "completed") { + clearInterval(int); + resolve((await getScrapeQueue().getJob(jobId)).returnvalue); + } + }, 1000); + }) +}