fix: remove QueueEvents

This commit is contained in:
Gergő Móricz 2024-08-22 22:38:39 +02:00
parent 76c8e9f996
commit e690a6fda7
3 changed files with 17 additions and 6 deletions

View File

@ -9,7 +9,7 @@ import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import
import { numTokensFromString } from '../lib/LLM-extraction/helpers';
import { defaultPageOptions, defaultExtractorOptions, defaultTimeout, defaultOrigin } from '../lib/default-values';
import { addScrapeJob } from '../services/queue-jobs';
import { getScrapeQueue, scrapeQueueEvents } from '../services/queue-service';
import { getScrapeQueue } from '../services/queue-service';
import { v4 as uuidv4 } from "uuid";
import { Logger } from '../lib/logger';
import * as Sentry from "@sentry/node";

View File

@ -9,7 +9,7 @@ import { search } from "../search";
import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist";
import { v4 as uuidv4 } from "uuid";
import { Logger } from "../lib/logger";
import { getScrapeQueue, scrapeQueueEvents } from "../services/queue-service";
import { getScrapeQueue } from "../services/queue-service";
import * as Sentry from "@sentry/node";
import { addScrapeJob } from "../services/queue-jobs";
@ -108,7 +108,18 @@ export async function searchHelper(
await getScrapeQueue().addBulk(jobs);
}
const docs = (await Promise.all(jobs.map(x => x.waitUntilFinished(scrapeQueueEvents, 60000)))).map(x => x[0]);
const docs = (await Promise.all(jobs.map(x => new Promise((resolve, reject) => {
const start = Date.now();
const int = setInterval(async () => {
if (Date.now() >= start + 60000) {
clearInterval(int);
reject(new Error("Job wait "));
} else if (await x.getState() === "completed") {
clearInterval(int);
resolve((await getScrapeQueue().getJob(x.id)).returnvalue);
}
}, 1000);
})))).map(x => x[0]);
if (docs.length === 0) {
return { success: true, error: "No search results found", returnCode: 200 };

View File

@ -35,6 +35,6 @@ export function getScrapeQueue() {
}
import { QueueEvents } from 'bullmq';
export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() });
// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE
// import { QueueEvents } from 'bullmq';
// export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() });