From 44fe741c3551590e28e147d3a00254a47b718586 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 1 Sep 2024 14:19:43 -0300 Subject: [PATCH] Update queue-worker.ts --- apps/api/src/services/queue-worker.ts | 371 ++++++++++++++++---------- 1 file changed, 232 insertions(+), 139 deletions(-) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 477b3185..627185e9 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -1,5 +1,5 @@ import "dotenv/config"; -import "./sentry" +import "./sentry"; import * as Sentry from "@sentry/node"; import { CustomError } from "../lib/custom-error"; import { @@ -17,11 +17,23 @@ import { Logger } from "../lib/logger"; import { Worker } from "bullmq"; import systemMonitor from "./system-monitor"; import { v4 as uuidv4 } from "uuid"; -import { addCrawlJob, addCrawlJobDone, crawlToCrawler, finishCrawl, getCrawl, getCrawlJobs, lockURL } from "../lib/crawl-redis"; +import { + addCrawlJob, + addCrawlJobDone, + crawlToCrawler, + finishCrawl, + getCrawl, + getCrawlJobs, + lockURL, +} from "../lib/crawl-redis"; import { StoredCrawl } from "../lib/crawl-redis"; import { addScrapeJob } from "./queue-jobs"; import { supabaseGetJobById } from "../../src/lib/supabase-jobs"; -import { addJobPriority, deleteJobPriority, getJobPriority } from "../../src/lib/job-priority"; +import { + addJobPriority, + deleteJobPriority, + getJobPriority, +} from "../../src/lib/job-priority"; import { PlanType } from "../types"; if (process.env.ENV === "production") { @@ -52,25 +64,24 @@ const processJobInternal = async (token: string, job: Job) => { await job.extendLock(token, jobLockExtensionTime); }, jobLockExtendInterval); - await addJobPriority(job.data.team_id, job.id ); + await addJobPriority(job.data.team_id, job.id); let err = null; try { const result = await processJob(job, token); - try{ + try { if (job.data.crawl_id && process.env.USE_DB_AUTHENTICATION === "true") { await job.moveToCompleted(null, token, false); } else { await job.moveToCompleted(result.docs, token, false); } - }catch(e){ - } + } catch (e) {} } catch (error) { console.log("Job failed, error:", error); Sentry.captureException(error); err = error; await job.moveToFailed(error, token, false); } finally { - await deleteJobPriority(job.data.team_id, job.id ); + await deleteJobPriority(job.data.team_id, job.id); clearInterval(extendLockInterval); } @@ -84,7 +95,10 @@ process.on("SIGINT", () => { isShuttingDown = true; }); -const workerFun = async (queueName: string, processJobInternal: (token: string, job: Job) => Promise) => { +const workerFun = async ( + queueName: string, + processJobInternal: (token: string, job: Job) => Promise +) => { const worker = new Worker(queueName, null, { connection: redisConnection, lockDuration: 1 * 60 * 1000, // 1 minute @@ -113,46 +127,62 @@ const workerFun = async (queueName: string, processJobInternal: (token: string, const job = await worker.getNextJob(token); if (job) { if (job.data && job.data.sentry && Sentry.isInitialized()) { - Sentry.continueTrace({ sentryTrace: job.data.sentry.trace, baggage: job.data.sentry.baggage }, () => { - Sentry.startSpan({ + Sentry.continueTrace( + { + sentryTrace: job.data.sentry.trace, + baggage: job.data.sentry.baggage, + }, + () => { + Sentry.startSpan( + { + name: "Scrape job", + attributes: { + job: job.id, + worker: process.env.FLY_MACHINE_ID ?? worker.id, + }, + }, + async (span) => { + await Sentry.startSpan( + { + name: "Process scrape job", + op: "queue.process", + attributes: { + "messaging.message.id": job.id, + "messaging.destination.name": getScrapeQueue().name, + "messaging.message.body.size": job.data.sentry.size, + "messaging.message.receive.latency": + Date.now() - (job.processedOn ?? job.timestamp), + "messaging.message.retry.count": job.attemptsMade, + }, + }, + async () => { + const res = await processJobInternal(token, job); + if (res !== null) { + span.setStatus({ code: 2 }); // ERROR + } else { + span.setStatus({ code: 1 }); // OK + } + } + ); + } + ); + } + ); + } else { + Sentry.startSpan( + { name: "Scrape job", attributes: { job: job.id, worker: process.env.FLY_MACHINE_ID ?? worker.id, }, - }, async (span) => { - await Sentry.startSpan({ - name: "Process scrape job", - op: "queue.process", - attributes: { - "messaging.message.id": job.id, - "messaging.destination.name": getScrapeQueue().name, - "messaging.message.body.size": job.data.sentry.size, - "messaging.message.receive.latency": Date.now() - (job.processedOn ?? job.timestamp), - "messaging.message.retry.count": job.attemptsMade, - } - }, async () => { - const res = await processJobInternal(token, job); - if (res !== null) { - span.setStatus({ code: 2 }); // ERROR - } else { - span.setStatus({ code: 1 }); // OK - } - }); - }); - }); - } else { - Sentry.startSpan({ - name: "Scrape job", - attributes: { - job: job.id, - worker: process.env.FLY_MACHINE_ID ?? worker.id, }, - }, () => { - processJobInternal(token, job); - }); + () => { + processJobInternal(token, job); + } + ); } - + await sleep(gotJobInterval); } else { await sleep(connectionMonitorInterval); @@ -167,13 +197,20 @@ async function processJob(job: Job, token: string) { // Check if the job URL is researchhub and block it immediately // TODO: remove this once solve the root issue - if (job.data.url && (job.data.url.includes("researchhub.com") || job.data.url.includes("ebay.com") || job.data.url.includes("youtube.com") || job.data.url.includes("microsoft.com") )) { + if ( + job.data.url && + (job.data.url.includes("researchhub.com") || + job.data.url.includes("ebay.com") || + job.data.url.includes("youtube.com") || + job.data.url.includes("microsoft.com")) + ) { Logger.info(`🐂 Blocking job ${job.id} with URL ${job.data.url}`); const data = { success: false, docs: [], project_id: job.data.project_id, - error: "URL is blocked. Suspecious activity detected. Please contact hello@firecrawl.com if you believe this is an error.", + error: + "URL is blocked. Suspecious activity detected. Please contact hello@firecrawl.com if you believe this is an error.", }; await job.moveToCompleted(data.docs, token, false); return data; @@ -187,14 +224,14 @@ async function processJob(job: Job, token: string) { current_url: "", }); const start = Date.now(); - + const { success, message, docs } = await startWebScraperPipeline({ job, token, }); // Better if we throw here so we capture with the correct error - if(!success) { + if (!success) { throw new Error(message); } const end = Date.now(); @@ -217,14 +254,24 @@ async function processJob(job: Job, token: string) { docs, }; - - // No idea what this does and when it is called. if (job.data.mode === "crawl" && !job.data.v1) { - callWebhook(job.data.team_id, job.id as string, data, job.data.webhook, job.data.v1); + callWebhook( + job.data.team_id, + job.id as string, + data, + job.data.webhook, + job.data.v1 + ); } if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) { - callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1); + callWebhook( + job.data.team_id, + job.data.crawl_id, + data, + job.data.webhook, + job.data.v1 + ); } if (job.data.crawl_id) { @@ -246,7 +293,7 @@ async function processJob(job: Job, token: string) { await addCrawlJobDone(job.data.crawl_id, job.id); - const sc = await getCrawl(job.data.crawl_id) as StoredCrawl; + const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl; if (!job.data.sitemapped) { if (!sc.cancelled) { @@ -256,13 +303,16 @@ async function processJob(job: Job, token: string) { crawler.extractLinksFromHTML(rawHtml ?? "", sc.originUrl), Infinity, sc.crawlerOptions?.maxDepth ?? 10 - ) - + ); + for (const link of links) { if (await lockURL(job.data.crawl_id, sc, link)) { - // This seems to work really welel - const jobPriority = await getJobPriority({plan:sc.plan as PlanType, team_id: sc.team_id, basePriority: job.data.crawl_id ? 20 : 10}) + const jobPriority = await getJobPriority({ + plan: sc.plan as PlanType, + team_id: sc.team_id, + basePriority: job.data.crawl_id ? 20 : 10, + }); const jobId = uuidv4(); // console.log("plan: ", sc.plan); @@ -270,16 +320,21 @@ async function processJob(job: Job, token: string) { // console.log("base priority: ", job.data.crawl_id ? 20 : 10) // console.log("job priority: " , jobPriority, "\n\n\n") - const newJob = await addScrapeJob({ - url: link, - mode: "single_urls", - crawlerOptions: sc.crawlerOptions, - team_id: sc.team_id, - pageOptions: sc.pageOptions, - origin: job.data.origin, - crawl_id: job.data.crawl_id, - v1: job.data.v1, - }, {}, jobId, jobPriority); + const newJob = await addScrapeJob( + { + url: link, + mode: "single_urls", + crawlerOptions: sc.crawlerOptions, + team_id: sc.team_id, + pageOptions: sc.pageOptions, + origin: job.data.origin, + crawl_id: job.data.crawl_id, + v1: job.data.v1, + }, + {}, + jobId, + jobPriority + ); await addCrawlJob(job.data.crawl_id, newJob.id); } @@ -290,79 +345,102 @@ async function processJob(job: Job, token: string) { if (await finishCrawl(job.data.crawl_id)) { // v1 web hooks, call when done with no data, but with event completed if (job.data.v1 && job.data.webhook) { - callWebhook(job.data.team_id, job.data.crawl_id, [], job.data.webhook, job.data.v1, "crawl.completed"); + callWebhook( + job.data.team_id, + job.data.crawl_id, + [], + job.data.webhook, + job.data.v1, + "crawl.completed" + ); } - - const jobIDs = await getCrawlJobs(job.data.crawl_id); - const jobs = (await Promise.all(jobIDs.map(async x => { - if (x === job.id) { - return { - async getState() { - return "completed" - }, - timestamp: Date.now(), - returnvalue: docs, - } - } - - const j = await getScrapeQueue().getJob(x); - - if (process.env.USE_DB_AUTHENTICATION === "true") { - const supabaseData = await supabaseGetJobById(j.id); - - if (supabaseData) { - j.returnvalue = supabaseData.docs; - } - } - - return j; - }))).sort((a, b) => a.timestamp - b.timestamp); - const jobStatuses = await Promise.all(jobs.map(x => x.getState())); - const jobStatus = sc.cancelled || jobStatuses.some(x => x === "failed") ? "failed" : "completed"; - - const fullDocs = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue); - - await logJob({ - job_id: job.data.crawl_id, - success: jobStatus === "completed", - message: sc.cancelled ? "Cancelled" : message, - num_docs: fullDocs.length, - docs: [], - time_taken: (Date.now() - sc.createdAt) / 1000, - team_id: job.data.team_id, - mode: "crawl", - url: sc.originUrl, - crawlerOptions: sc.crawlerOptions, - pageOptions: sc.pageOptions, - origin: job.data.origin, - }); - - const data = { - success: jobStatus !== "failed", - result: { - links: fullDocs.map((doc) => { - return { - content: doc, - source: doc?.metadata?.sourceURL ?? doc?.url ?? "", - }; - }), - }, - project_id: job.data.project_id, - error: message /* etc... */, - docs: fullDocs, - }; - // v0 web hooks, call when done with all the data if (!job.data.v1) { - callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1, "crawl.completed"); + const jobIDs = await getCrawlJobs(job.data.crawl_id); + + const jobs = ( + await Promise.all( + jobIDs.map(async (x) => { + if (x === job.id) { + return { + async getState() { + return "completed"; + }, + timestamp: Date.now(), + returnvalue: docs, + }; + } + + const j = await getScrapeQueue().getJob(x); + + if (process.env.USE_DB_AUTHENTICATION === "true") { + const supabaseData = await supabaseGetJobById(j.id); + + if (supabaseData) { + j.returnvalue = supabaseData.docs; + } + } + + return j; + }) + ) + ).sort((a, b) => a.timestamp - b.timestamp); + const jobStatuses = await Promise.all(jobs.map((x) => x.getState())); + const jobStatus = + sc.cancelled || jobStatuses.some((x) => x === "failed") + ? "failed" + : "completed"; + + const fullDocs = jobs.map((x) => + Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue + ); + + await logJob({ + job_id: job.data.crawl_id, + success: jobStatus === "completed", + message: sc.cancelled ? "Cancelled" : message, + num_docs: fullDocs.length, + docs: [], + time_taken: (Date.now() - sc.createdAt) / 1000, + team_id: job.data.team_id, + mode: "crawl", + url: sc.originUrl, + crawlerOptions: sc.crawlerOptions, + pageOptions: sc.pageOptions, + origin: job.data.origin, + }); + + const data = { + success: jobStatus !== "failed", + result: { + links: fullDocs.map((doc) => { + return { + content: doc, + source: doc?.metadata?.sourceURL ?? doc?.url ?? "", + }; + }), + }, + project_id: job.data.project_id, + error: message /* etc... */, + docs: fullDocs, + }; + + console.log(fullDocs.length); + // v0 web hooks, call when done with all the data + if (!job.data.v1) { + callWebhook( + job.data.team_id, + job.data.crawl_id, + data, + job.data.webhook, + job.data.v1, + "crawl.completed" + ); + } } - } } - - - Logger.info(`🐂 Job done ${job.id}`); return data; } catch (error) { @@ -370,9 +448,9 @@ async function processJob(job: Job, token: string) { Sentry.captureException(error, { data: { - job: job.id + job: job.id, }, - }) + }); if (error instanceof CustomError) { // Here we handle the error, then save the failed job @@ -403,12 +481,25 @@ async function processJob(job: Job, token: string) { }; if (!job.data.v1 && (job.data.mode === "crawl" || job.data.crawl_id)) { - callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data, job.data.webhook, job.data.v1); + callWebhook( + job.data.team_id, + job.data.crawl_id ?? (job.id as string), + data, + job.data.webhook, + job.data.v1 + ); } - if(job.data.v1) { - callWebhook(job.data.team_id, job.id as string, [], job.data.webhook, job.data.v1, "crawl.failed"); + if (job.data.v1) { + callWebhook( + job.data.team_id, + job.id as string, + [], + job.data.webhook, + job.data.v1, + "crawl.failed" + ); } - + if (job.data.crawl_id) { await logJob({ job_id: job.id as string, @@ -416,7 +507,8 @@ async function processJob(job: Job, token: string) { message: typeof error === "string" ? error - : error.message ?? "Something went wrong... Contact help@mendable.ai", + : error.message ?? + "Something went wrong... Contact help@mendable.ai", num_docs: 0, docs: [], time_taken: 0, @@ -437,7 +529,8 @@ async function processJob(job: Job, token: string) { message: typeof error === "string" ? error - : error.message ?? "Something went wrong... Contact help@mendable.ai", + : error.message ?? + "Something went wrong... Contact help@mendable.ai", num_docs: 0, docs: [], time_taken: 0,