further fixes

This commit is contained in:
Gergő Móricz 2024-08-23 18:27:00 +02:00
parent eea530e0ad
commit 866e71910c
5 changed files with 21 additions and 30 deletions

View File

@ -8,7 +8,7 @@ import { Document } from "../../lib/entities";
import { isUrlBlocked } from "../../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function import { isUrlBlocked } from "../../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function
import { numTokensFromString } from '../../lib/LLM-extraction/helpers'; import { numTokensFromString } from '../../lib/LLM-extraction/helpers';
import { defaultPageOptions, defaultExtractorOptions, defaultTimeout, defaultOrigin } from '../../lib/default-values'; import { defaultPageOptions, defaultExtractorOptions, defaultTimeout, defaultOrigin } from '../../lib/default-values';
import { addScrapeJob } from '../../services/queue-jobs'; import { addScrapeJob, waitForJob } from '../../services/queue-jobs';
import { getScrapeQueue } from '../../services/queue-service'; import { getScrapeQueue } from '../../services/queue-service';
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { Logger } from '../../lib/logger'; import { Logger } from '../../lib/logger';
@ -52,18 +52,7 @@ export async function scrapeHelper(
const err = await Sentry.startSpan({ name: "Wait for job to finish", op: "bullmq.wait", attributes: { job: jobId } }, async (span) => { const err = await Sentry.startSpan({ name: "Wait for job to finish", op: "bullmq.wait", attributes: { job: jobId } }, async (span) => {
try { try {
doc = (await new Promise((resolve, reject) => { doc = (await waitForJob(job.id, timeout))[0];
const start = Date.now();
const int = setInterval(async () => {
if (Date.now() >= start + timeout) {
clearInterval(int);
reject(new Error("Job wait "));
} else if (await job.getState() === "completed") {
clearInterval(int);
resolve((await getScrapeQueue().getJob(job.id)).returnvalue);
}
}, 1000);
}))[0]
} catch (e) { } catch (e) {
if (e instanceof Error && e.message.startsWith("Job wait")) { if (e instanceof Error && e.message.startsWith("Job wait")) {
span.setAttribute("timedOut", true); span.setAttribute("timedOut", true);

View File

@ -10,7 +10,7 @@ import { isUrlBlocked } from "../../scraper/WebScraper/utils/blocklist";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { Logger } from "../../lib/logger"; import { Logger } from "../../lib/logger";
import { getScrapeQueue } from "../../services/queue-service"; import { getScrapeQueue } from "../../services/queue-service";
import { addScrapeJob } from "../../services/queue-jobs"; import { addScrapeJob, waitForJob } from "../../services/queue-jobs";
import * as Sentry from "@sentry/node"; import * as Sentry from "@sentry/node";
export async function searchHelper( export async function searchHelper(
@ -108,18 +108,7 @@ export async function searchHelper(
await getScrapeQueue().addBulk(jobs); await getScrapeQueue().addBulk(jobs);
} }
const docs = (await Promise.all(jobs.map(x => new Promise((resolve, reject) => { const docs = (await Promise.all(jobs.map(x => waitForJob(x.id, 60000)))).map(x => x[0]);
const start = Date.now();
const int = setInterval(async () => {
if (Date.now() >= start + 60000) {
clearInterval(int);
reject(new Error("Job wait "));
} else if (await x.getState() === "completed") {
clearInterval(int);
resolve((await getScrapeQueue().getJob(x.id)).returnvalue);
}
}, 1000);
})))).map(x => x[0]);
if (docs.length === 0) { if (docs.length === 0) {
return { success: true, error: "No search results found", returnCode: 200 }; return { success: true, error: "No search results found", returnCode: 200 };

View File

@ -6,7 +6,7 @@ import { WebSocket } from "ws";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { Logger } from "../../lib/logger"; import { Logger } from "../../lib/logger";
import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis"; import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis";
import { getScrapeQueue, scrapeQueueEvents } from "../../services/queue-service"; import { getScrapeQueue } from "../../services/queue-service";
import { getJob, getJobs } from "./crawl-status"; import { getJob, getJobs } from "./crawl-status";
type ErrorMessage = { type ErrorMessage = {

View File

@ -4,8 +4,7 @@ import { Document, legacyDocumentConverter, legacyScrapeOptions, RequestWithAuth
import { billTeam } from "../../services/billing/credit_billing"; import { billTeam } from "../../services/billing/credit_billing";
import { v4 as uuidv4 } from 'uuid'; import { v4 as uuidv4 } from 'uuid';
import { numTokensFromString } from "../../lib/LLM-extraction/helpers"; import { numTokensFromString } from "../../lib/LLM-extraction/helpers";
import { addScrapeJob } from "../../services/queue-jobs"; import { addScrapeJob, waitForJob } from "../../services/queue-jobs";
import { scrapeQueueEvents } from '../../services/queue-service';
import { logJob } from "../../services/logging/log_job"; import { logJob } from "../../services/logging/log_job";
export async function scrapeController(req: RequestWithAuth<{}, ScrapeResponse, ScrapeRequest>, res: Response<ScrapeResponse>) { export async function scrapeController(req: RequestWithAuth<{}, ScrapeResponse, ScrapeRequest>, res: Response<ScrapeResponse>) {
@ -30,7 +29,7 @@ export async function scrapeController(req: RequestWithAuth<{}, ScrapeResponse,
let doc: any | undefined; let doc: any | undefined;
try { try {
doc = (await job.waitUntilFinished(scrapeQueueEvents, timeout))[0]; // 60 seconds timeout doc = (await waitForJob(job.id, timeout))[0];
} catch (e) { } catch (e) {
Logger.error(`Error in scrapeController: ${e}`); Logger.error(`Error in scrapeController: ${e}`);
if (e instanceof Error && e.message.startsWith("Job wait")) { if (e instanceof Error && e.message.startsWith("Job wait")) {

View File

@ -46,3 +46,17 @@ export async function addScrapeJob(
} }
} }
export function waitForJob(jobId: string, timeout: number) {
return new Promise((resolve, reject) => {
const start = Date.now();
const int = setInterval(async () => {
if (Date.now() >= start + timeout) {
clearInterval(int);
reject(new Error("Job wait "));
} else if (await getScrapeQueue().getJobState(jobId) === "completed") {
clearInterval(int);
resolve((await getScrapeQueue().getJob(jobId)).returnvalue);
}
}, 1000);
})
}