From 03c84a93726786f9c549e56c9679e39f7dbbdb13 Mon Sep 17 00:00:00 2001 From: Gergo Moricz Date: Tue, 6 Aug 2024 16:26:46 +0200 Subject: [PATCH] cleanup and fix cancelling --- apps/api/src/controllers/crawl-cancel.ts | 10 +--- apps/api/src/controllers/crawl-status.ts | 8 ++++ apps/api/src/controllers/scrape.ts | 58 ++---------------------- apps/api/src/main/runWebScraper.ts | 21 +++++---- apps/api/src/services/queue-worker.ts | 29 +++++++----- 5 files changed, 45 insertions(+), 81 deletions(-) diff --git a/apps/api/src/controllers/crawl-cancel.ts b/apps/api/src/controllers/crawl-cancel.ts index eb870f0a..86a4c5b4 100644 --- a/apps/api/src/controllers/crawl-cancel.ts +++ b/apps/api/src/controllers/crawl-cancel.ts @@ -1,7 +1,6 @@ import { Request, Response } from "express"; import { authenticateUser } from "./auth"; import { RateLimiterMode } from "../../src/types"; -import { addWebScraperJob } from "../../src/services/queue-jobs"; import { getWebScraperQueue } from "../../src/services/queue-service"; import { supabase_service } from "../../src/services/supabase"; import { billTeam } from "../../src/services/billing/credit_billing"; @@ -59,17 +58,12 @@ export async function crawlCancelController(req: Request, res: Response) { } try { - // TODO: FIX THIS by doing as a flag on the data? - // await getWebScraperQueue().client.del(job.lockKey()); - // await job.takeLock(); - // await job.discard(); - // await job.moveToFailed(Error("Job cancelled by user"), true); + await (await getWebScraperQueue().client).set("cancelled:" + job.id, "true", "EX", 60 * 60); + await job.discard(); } catch (error) { Logger.error(error); } - const newJobState = await job.getState(); - res.json({ status: "cancelled" }); diff --git a/apps/api/src/controllers/crawl-status.ts b/apps/api/src/controllers/crawl-status.ts index 87f1fca0..13ea20c4 100644 --- a/apps/api/src/controllers/crawl-status.ts +++ b/apps/api/src/controllers/crawl-status.ts @@ -21,6 +21,14 @@ export async function crawlStatusController(req: Request, res: Response) { return res.status(404).json({ error: "Job not found" }); } + const isCancelled = await (await getWebScraperQueue().client).exists("cancelled:" + req.params.jobId); + + if (isCancelled) { + return res.json({ + status: "cancelled", + }); + } + let progress = job.progress; if(typeof progress !== 'object') { progress = { diff --git a/apps/api/src/controllers/scrape.ts b/apps/api/src/controllers/scrape.ts index 36249fa8..7e3840e1 100644 --- a/apps/api/src/controllers/scrape.ts +++ b/apps/api/src/controllers/scrape.ts @@ -1,6 +1,5 @@ import { ExtractorOptions, PageOptions } from './../lib/entities'; import { Request, Response } from "express"; -import { WebScraperDataProvider } from "../scraper/WebScraper"; import { billTeam, checkTeamCredits } from "../services/billing/credit_billing"; import { authenticateUser } from "./auth"; import { RateLimiterMode } from "../types"; @@ -9,9 +8,8 @@ import { Document } from "../lib/entities"; import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function import { numTokensFromString } from '../lib/LLM-extraction/helpers'; import { defaultPageOptions, defaultExtractorOptions, defaultTimeout, defaultOrigin } from '../lib/default-values'; -import { addScrapeJob, addWebScraperJob } from '../services/queue-jobs'; -import { getScrapeQueue, getWebScraperQueue, scrapeQueueEvents } from '../services/queue-service'; -import { supabase_service } from '../services/supabase'; +import { addScrapeJob } from '../services/queue-jobs'; +import { scrapeQueueEvents } from '../services/queue-service'; import { v4 as uuidv4 } from "uuid"; import { Logger } from '../lib/logger'; @@ -39,17 +37,6 @@ export async function scrapeHelper( return { success: false, error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.", returnCode: 403 }; } - // const a = new WebScraperDataProvider(); - // await a.setOptions({ - // mode: "single_urls", - // urls: [url], - // crawlerOptions: { - // ...crawlerOptions, - // }, - // pageOptions: pageOptions, - // extractorOptions: extractorOptions, - // }); - const job = await addScrapeJob({ url, mode: "single_urls", @@ -60,53 +47,16 @@ export async function scrapeHelper( origin: req.body.origin ?? defaultOrigin, }); - - // const docsPromise = new Promise((resolve) => { - // promiseResolve = resolve; - // }); - - // const listener = (j: string, res: any) => { - // console.log("JOB COMPLETED", j, "vs", job.id, res); - // if (j === job.id) { - // promiseResolve([j, res]); - // sq.removeListener("global:completed", listener); - // } - // } - const jobResult = await job.waitUntilFinished(scrapeQueueEvents, 60 * 1000);//60 seconds timeout - - - // wsq.on("global:completed", listener); - - // const timeoutPromise = new Promise<{ success: boolean; error?: string; returnCode: number }>((_, reject) => - // setTimeout(() => reject({ success: false, error: "Request timed out. Increase the timeout by passing `timeout` param to the request.", returnCode: 408 }), timeout) - // ); - - // let j; - // try { - // j = await Promise.race([jobResult, timeoutPromise]); - // } catch (error) { - // // sq.removeListener("global:completed", listener); - // return error; - // } - // console.log("JOB RESULT", j[1]); - - // let j1 = typeof j[1] === "string" ? JSON.parse(j[1]) : j[1]; - - const doc = jobResult !== null ? jobResult[0] : (await supabase_service - .from("firecrawl_jobs") - .select("docs") - .eq("job_id", job.id as string)).data[0]?.docs[0]; + const doc = (await job.waitUntilFinished(scrapeQueueEvents, 60 * 1000))[0]; //60 seconds timeout if (!doc) { + console.error("!!! PANIC DOC IS", doc, job); return { success: true, error: "No page found", returnCode: 200, data: doc }; } delete doc.index; delete doc.provider; - // make sure doc.content is not empty - - // Remove rawHtml if pageOptions.rawHtml is false and extractorOptions.mode is llm-extraction-from-raw-html if (!pageOptions.includeRawHtml && extractorOptions.mode == "llm-extraction-from-raw-html") { delete doc.rawHtml; diff --git a/apps/api/src/main/runWebScraper.ts b/apps/api/src/main/runWebScraper.ts index 3f3a29ef..7af59b45 100644 --- a/apps/api/src/main/runWebScraper.ts +++ b/apps/api/src/main/runWebScraper.ts @@ -12,6 +12,7 @@ import { Document } from "../lib/entities"; import { supabase_service } from "../services/supabase"; import { Logger } from "../lib/logger"; import { ScrapeEvents } from "../lib/scrape-events"; +import { getWebScraperQueue } from "../services/queue-service"; export async function startWebScraperPipeline({ job, @@ -100,16 +101,20 @@ export async function runWebScraper({ } }) : docs.filter((doc) => doc.content.trim().length > 0); + + const isCancelled = await (await getWebScraperQueue().client).exists("cancelled:" + bull_job_id); - const billingResult = await billTeam(team_id, filteredDocs.length); + if (!isCancelled) { + const billingResult = await billTeam(team_id, filteredDocs.length); - if (!billingResult.success) { - // throw new Error("Failed to bill team, no subscription was found"); - return { - success: false, - message: "Failed to bill team, no subscription was found", - docs: [], - }; + if (!billingResult.success) { + // throw new Error("Failed to bill team, no subscription was found"); + return { + success: false, + message: "Failed to bill team, no subscription was found", + docs: [], + }; + } } // This is where the returnvalue from the job is set diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 6776f414..ca9d5977 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -12,7 +12,7 @@ import { startWebScraperPipeline } from "../main/runWebScraper"; import { callWebhook } from "./webhook"; import { logJob } from "./logging/log_job"; import { initSDK } from "@hyperdx/node-opentelemetry"; -import { Job, tryCatch } from "bullmq"; +import { Job, QueueEvents, tryCatch } from "bullmq"; import { Logger } from "../lib/logger"; import { ScrapeEvents } from "../lib/scrape-events"; import { Worker } from "bullmq"; @@ -131,10 +131,18 @@ async function processJob(job: Job, token: string) { const end = Date.now(); const timeTakenInSeconds = (end - start) / 1000; + const isCancelled = await (await getWebScraperQueue().client).exists("cancelled:" + job.id); + + if (isCancelled) { + await job.discard(); + await job.moveToFailed(Error("Job cancelled by user"), job.token); + await job.discard(); + } + const data = { - success: success, + success, result: { - links: docs.map((doc) => { + links: isCancelled ? [] : docs.map((doc) => { return { content: doc, source: doc?.metadata?.sourceURL ?? doc?.url ?? "", @@ -142,20 +150,20 @@ async function processJob(job: Job, token: string) { }), }, project_id: job.data.project_id, - error: message /* etc... */, - docs: docs, + error: isCancelled ? "Job cancelled by user" : message /* etc... */, + docs: isCancelled ? [] : docs, }; - if (job.data.mode === "crawl") { + if (job.data.mode === "crawl" && !isCancelled) { await callWebhook(job.data.team_id, job.id as string, data); } await logJob({ job_id: job.id as string, - success: success, - message: message, - num_docs: docs.length, - docs: docs, + success: success && !isCancelled, + message: isCancelled ? "Job cancelled by user" : message, + num_docs: isCancelled ? 0 : docs.length, + docs: isCancelled ? [] : docs, time_taken: timeTakenInSeconds, team_id: job.data.team_id, mode: job.data.mode, @@ -165,7 +173,6 @@ async function processJob(job: Job, token: string) { origin: job.data.origin, }); Logger.debug(`🐂 Job done ${job.id}`); - // done(null, data); return data; } catch (error) { Logger.error(`🐂 Job errored ${job.id} - ${error}`);