From f43d5e78954f9229a08db8ec063f2083e104f66e Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 30 Jul 2024 14:44:13 -0400 Subject: [PATCH] Nick: scrape queue --- apps/api/src/controllers/scrape.ts | 56 +++++++++++---------- apps/api/src/index.ts | 4 +- apps/api/src/main/runWebScraper.ts | 14 ++++-- apps/api/src/services/queue-jobs.ts | 12 +++++ apps/api/src/services/queue-service.ts | 39 ++++++++++++--- apps/api/src/services/queue-worker.ts | 68 +++++++++++++++----------- apps/api/src/types.ts | 2 +- 7 files changed, 124 insertions(+), 71 deletions(-) diff --git a/apps/api/src/controllers/scrape.ts b/apps/api/src/controllers/scrape.ts index 52ebba31..d23dcc88 100644 --- a/apps/api/src/controllers/scrape.ts +++ b/apps/api/src/controllers/scrape.ts @@ -9,8 +9,8 @@ import { Document } from "../lib/entities"; import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function import { numTokensFromString } from '../lib/LLM-extraction/helpers'; import { defaultPageOptions, defaultExtractorOptions, defaultTimeout, defaultOrigin } from '../lib/default-values'; -import { addWebScraperJob } from '../services/queue-jobs'; -import { getWebScraperQueue } from '../services/queue-service'; +import { addScrapeJob, addWebScraperJob } from '../services/queue-jobs'; +import { getScrapeQueue, getWebScraperQueue, scrapeQueueEvents } from '../services/queue-service'; import { supabase_service } from '../services/supabase'; import { v4 as uuidv4 } from "uuid"; import { Logger } from '../lib/logger'; @@ -50,7 +50,7 @@ export async function scrapeHelper( // extractorOptions: extractorOptions, // }); - const job = await addWebScraperJob({ + const job = await addScrapeJob({ url, mode: "single_urls", crawlerOptions, @@ -60,39 +60,41 @@ export async function scrapeHelper( origin: req.body.origin ?? defaultOrigin, }); - const wsq = getWebScraperQueue(); - let promiseResolve; + // const docsPromise = new Promise((resolve) => { + // promiseResolve = resolve; + // }); - const docsPromise = new Promise((resolve) => { - promiseResolve = resolve; - }); + // const listener = (j: string, res: any) => { + // console.log("JOB COMPLETED", j, "vs", job.id, res); + // if (j === job.id) { + // promiseResolve([j, res]); + // sq.removeListener("global:completed", listener); + // } + // } + const jobResult = await job.waitUntilFinished(scrapeQueueEvents, 60 * 1000);//60 seconds timeout - const listener = (j: string, res: any) => { - console.log("JOB COMPLETED", j, "vs", job.id, res); - if (j === job.id) { - promiseResolve([j, res]); - wsq.removeListener("global:completed", listener); - } - } // wsq.on("global:completed", listener); - const timeoutPromise = new Promise<{ success: boolean; error?: string; returnCode: number }>((_, reject) => - setTimeout(() => reject({ success: false, error: "Request timed out. Increase the timeout by passing `timeout` param to the request.", returnCode: 408 }), timeout) - ); + // const timeoutPromise = new Promise<{ success: boolean; error?: string; returnCode: number }>((_, reject) => + // setTimeout(() => reject({ success: false, error: "Request timed out. Increase the timeout by passing `timeout` param to the request.", returnCode: 408 }), timeout) + // ); - let j; - try { - j = await Promise.race([docsPromise, timeoutPromise]); - } catch (error) { - wsq.removeListener("global:completed", listener); - return error; - } + // let j; + // try { + // j = await Promise.race([jobResult, timeoutPromise]); + // } catch (error) { + // // sq.removeListener("global:completed", listener); + // return error; + // } + // console.log("JOB RESULT", j[1]); - let j1 = typeof j[1] === "string" ? JSON.parse(j[1]) : j[1]; + // let j1 = typeof j[1] === "string" ? JSON.parse(j[1]) : j[1]; - const doc = j1 !== null ? j1.result.links[0].content : (await supabase_service + console.log("JOB RESULT", jobResult); + + const doc = jobResult !== null ? jobResult[0] : (await supabase_service .from("firecrawl_jobs") .select("docs") .eq("job_id", job.id as string)).data[0]?.docs[0]; diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index c9bb6589..6550a2fd 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -2,7 +2,7 @@ import express from "express"; import bodyParser from "body-parser"; import cors from "cors"; import "dotenv/config"; -import { getWebScraperQueue } from "./services/queue-service"; +import { getScrapeQueue, getWebScraperQueue } from "./services/queue-service"; import { v0Router } from "./routes/v0"; import { initSDK } from "@hyperdx/node-opentelemetry"; import cluster from "cluster"; @@ -58,7 +58,7 @@ if (cluster.isMaster) { serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`); const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({ - queues: [new BullAdapter(getWebScraperQueue())], + queues: [new BullAdapter(getWebScraperQueue()), new BullAdapter(getScrapeQueue())], serverAdapter: serverAdapter, }); diff --git a/apps/api/src/main/runWebScraper.ts b/apps/api/src/main/runWebScraper.ts index 8eb95eb2..3f3a29ef 100644 --- a/apps/api/src/main/runWebScraper.ts +++ b/apps/api/src/main/runWebScraper.ts @@ -36,9 +36,9 @@ export async function startWebScraperPipeline({ job.updateProgress({ ...progress, partialDocs: partialDocs }); } }, - onSuccess: (result) => { + onSuccess: (result, mode) => { Logger.debug(`🐂 Job completed ${job.id}`); - saveJob(job, result, token); + saveJob(job, result, token, mode); }, onError: (error) => { Logger.error(`🐂 Job failed ${job.id}`); @@ -113,7 +113,7 @@ export async function runWebScraper({ } // This is where the returnvalue from the job is set - onSuccess(filteredDocs); + onSuccess(filteredDocs, mode); // this return doesn't matter too much for the job completion result return { success: true, message: "", docs: filteredDocs }; @@ -123,7 +123,7 @@ export async function runWebScraper({ } } -const saveJob = async (job: Job, result: any, token: string) => { +const saveJob = async (job: Job, result: any, token: string, mode: string) => { try { if (process.env.USE_DB_AUTHENTICATION === "true") { const { data, error } = await supabase_service @@ -133,7 +133,11 @@ const saveJob = async (job: Job, result: any, token: string) => { if (error) throw new Error(error.message); try { - await job.moveToCompleted(null, token, false); + if (mode === "crawl") { + await job.moveToCompleted(null, token, false); + } else { + await job.moveToCompleted(result, token, false); + } } catch (error) { // I think the job won't exist here anymore } diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 028cdd77..46c2fb22 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -1,5 +1,6 @@ import { Job, Queue } from "bullmq"; import { + getScrapeQueue, getWebScraperQueue, } from "./queue-service"; import { v4 as uuidv4 } from "uuid"; @@ -16,3 +17,14 @@ export async function addWebScraperJob( }); } +export async function addScrapeJob( + webScraperOptions: WebScraperOptions, + options: any = {}, + jobId: string = uuidv4(), +): Promise { + return await getScrapeQueue().add(jobId, webScraperOptions, { + ...options, + jobId, + }); +} + diff --git a/apps/api/src/services/queue-service.ts b/apps/api/src/services/queue-service.ts index db62c711..348e3d7c 100644 --- a/apps/api/src/services/queue-service.ts +++ b/apps/api/src/services/queue-service.ts @@ -1,19 +1,16 @@ import { Queue } from "bullmq"; import { Logger } from "../lib/logger"; import IORedis from "ioredis"; -import { Worker } from "bullmq"; -import systemMonitor from "./system-monitor"; -import { v4 as uuidv4 } from "uuid"; let webScraperQueue: Queue; +let scrapeQueue: Queue; + export const redisConnection = new IORedis(process.env.REDIS_URL, { maxRetriesPerRequest: null, }); - - - -export const webScraperQueueName = "{webscraperQueue}"; +export const webScraperQueueName = "{crawlQueue}"; +export const scrapeQueueName = "{scrapeQueue}"; export function getWebScraperQueue() { if (!webScraperQueue) { webScraperQueue = new Queue( @@ -38,4 +35,32 @@ export function getWebScraperQueue() { return webScraperQueue; } +export function getScrapeQueue() { + if (!scrapeQueue) { + scrapeQueue = new Queue( + scrapeQueueName, + { + connection: redisConnection, + } + // { + // settings: { + // lockDuration: 1 * 60 * 1000, // 1 minute in milliseconds, + // lockRenewTime: 15 * 1000, // 15 seconds in milliseconds + // stalledInterval: 30 * 1000, + // maxStalledCount: 10, + // }, + // defaultJobOptions:{ + // attempts: 5 + // } + // } + ); + Logger.info("Web scraper queue created"); + } + return scrapeQueue; +} + +import { QueueEvents } from 'bullmq'; + +export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection }); +export const webScraperQueueEvents = new QueueEvents(webScraperQueueName, { connection: redisConnection }); \ No newline at end of file diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 3cd62751..9cd0b089 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -1,19 +1,25 @@ import { CustomError } from "../lib/custom-error"; -import { getWebScraperQueue, redisConnection, webScraperQueueName } from "./queue-service"; +import { + getWebScraperQueue, + getScrapeQueue, + redisConnection, + webScraperQueueName, + scrapeQueueName, +} from "./queue-service"; import "dotenv/config"; import { logtail } from "./logtail"; import { startWebScraperPipeline } from "../main/runWebScraper"; import { callWebhook } from "./webhook"; import { logJob } from "./logging/log_job"; -import { initSDK } from '@hyperdx/node-opentelemetry'; -import { Job } from "bullmq"; +import { initSDK } from "@hyperdx/node-opentelemetry"; +import { Job, tryCatch } from "bullmq"; import { Logger } from "../lib/logger"; import { ScrapeEvents } from "../lib/scrape-events"; import { Worker } from "bullmq"; import systemMonitor from "./system-monitor"; import { v4 as uuidv4 } from "uuid"; -if (process.env.ENV === 'production') { +if (process.env.ENV === "production") { initSDK({ consoleCapture: true, additionalInstrumentations: [], @@ -35,21 +41,23 @@ const connectionMonitorInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10; const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20; const wsq = getWebScraperQueue(); +const sq = getScrapeQueue(); const processJobInternal = async (token: string, job: Job) => { - const extendLockInterval = setInterval(async () => { await job.extendLock(token, jobLockExtensionTime); }, jobLockExtendInterval); try { const result = await processJob(job, token); - - // await resultQueue.add('resultJob', result,{jobId: job.id}); - console.log("🐂 Job completed", result); - console.log({token}) - console.log(await job.getState()) - // await job.moveToCompleted(result, token, false); //3rd arg fetchNext + const jobState = await job.getState(); + if(jobState !== "completed" && jobState !== "failed"){ + try{ + await job.moveToCompleted(result.docs, token, false); //3rd arg fetchNext + }catch(e){ + // console.log("Job already completed, error:", e); + } + } } catch (error) { console.log("Job failed, error:", error); @@ -66,12 +74,8 @@ process.on("SIGINT", () => { isShuttingDown = true; }); - -const workerFun = async () => { - // const bullQueueName = queueNames[engine]; - // const resultQueue = messageQueues[engine]; - - const worker = new Worker(webScraperQueueName, null, { +const workerFun = async (queueName: string, processJobInternal: (token: string, job: Job) => Promise) => { + const worker = new Worker(queueName, null, { connection: redisConnection, lockDuration: 1 * 60 * 1000, // 1 minute // lockRenewTime: 15 * 1000, // 15 seconds @@ -81,8 +85,6 @@ const workerFun = async () => { worker.startStalledCheckTimer(); - let contextManager; - const monitor = await systemMonitor; while (true) { @@ -91,10 +93,7 @@ const workerFun = async () => { break; } const token = uuidv4(); - // console.time("acceptConnectionDelay"); const canAcceptConnection = await monitor.acceptConnection(); - // console.timeEnd("acceptConnectionDelay"); - // console.log("canAcceptConnection", canAcceptConnection); if (!canAcceptConnection) { console.log("Cant accept connection"); await sleep(cantAcceptConnectionInterval); // more sleep @@ -102,9 +101,8 @@ const workerFun = async () => { } const job = await worker.getNextJob(token); - // console.log("job", job); if (job) { - processJobInternal(token, job); + await processJobInternal(token, job); await sleep(gotJobInterval); } else { await sleep(connectionMonitorInterval); @@ -112,14 +110,15 @@ const workerFun = async () => { } }; -workerFun(); +workerFun(webScraperQueueName, processJobInternal); +workerFun(scrapeQueueName, processJobInternal); async function processJob(job: Job, token: string) { Logger.debug(`🐂 Worker taking job ${job.id}`); try { console.log("🐂 Updating progress"); - console.log({job}) + console.log({ job }); job.updateProgress({ current: 1, total: 100, @@ -127,7 +126,10 @@ async function processJob(job: Job, token: string) { current_url: "", }); const start = Date.now(); - const { success, message, docs } = await startWebScraperPipeline({ job, token }); + const { success, message, docs } = await startWebScraperPipeline({ + job, + token, + }); const end = Date.now(); const timeTakenInSeconds = (end - start) / 1000; @@ -135,11 +137,15 @@ async function processJob(job: Job, token: string) { success: success, result: { links: docs.map((doc) => { - return { content: doc, source: doc?.metadata?.sourceURL ?? doc?.url ?? "" }; + return { + content: doc, + source: doc?.metadata?.sourceURL ?? doc?.url ?? "", + }; }), }, project_id: job.data.project_id, error: message /* etc... */, + docs: docs, }; if (job.data.mode === "crawl") { @@ -189,6 +195,7 @@ async function processJob(job: Job, token: string) { const data = { success: false, + docs: [], project_id: job.data.project_id, error: "Something went wrong... Contact help@mendable.ai or try again." /* etc... */, @@ -199,7 +206,10 @@ async function processJob(job: Job, token: string) { await logJob({ job_id: job.id as string, success: false, - message: typeof error === 'string' ? error : (error.message ?? "Something went wrong... Contact help@mendable.ai"), + message: + typeof error === "string" + ? error + : error.message ?? "Something went wrong... Contact help@mendable.ai", num_docs: 0, docs: [], time_taken: 0, diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index 03296062..fa746688 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -36,7 +36,7 @@ export interface RunWebScraperParams { crawlerOptions: any; pageOptions?: any; inProgress: (progress: any) => void; - onSuccess: (result: any) => void; + onSuccess: (result: any, mode: string) => void; onError: (error: Error) => void; team_id: string; bull_job_id: string;