diff --git a/apps/api/src/controllers/v1/crawl.ts b/apps/api/src/controllers/v1/crawl.ts index fd72c8cf..c2d5bdca 100644 --- a/apps/api/src/controllers/v1/crawl.ts +++ b/apps/api/src/controllers/v1/crawl.ts @@ -22,6 +22,7 @@ import { getScrapeQueue } from "../../services/queue-service"; import { addScrapeJob } from "../../services/queue-jobs"; import { Logger } from "../../lib/logger"; import { getJobPriority } from "../../lib/job-priority"; +import { callWebhook } from "../../services/webhook"; export async function crawlController( req: RequestWithAuth<{}, CrawlResponse, CrawlRequest>, @@ -150,6 +151,10 @@ export async function crawlController( await addCrawlJob(id, job.id); } + if(req.body.webhook) { + await callWebhook(req.auth.team_id, id, null, req.body.webhook, true, "crawl.started"); + } + return res.status(200).json({ success: true, id, diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index af2ec851..6488759f 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -1,5 +1,5 @@ import "dotenv/config"; -import "./sentry" +import "./sentry"; import * as Sentry from "@sentry/node"; import { CustomError } from "../lib/custom-error"; import { @@ -17,12 +17,25 @@ import { Logger } from "../lib/logger"; import { Worker } from "bullmq"; import systemMonitor from "./system-monitor"; import { v4 as uuidv4 } from "uuid"; -import { addCrawlJob, addCrawlJobDone, crawlToCrawler, finishCrawl, getCrawl, getCrawlJobs, lockURL } from "../lib/crawl-redis"; +import { + addCrawlJob, + addCrawlJobDone, + crawlToCrawler, + finishCrawl, + getCrawl, + getCrawlJobs, + lockURL, +} from "../lib/crawl-redis"; import { StoredCrawl } from "../lib/crawl-redis"; import { addScrapeJob } from "./queue-jobs"; import { supabaseGetJobById } from "../../src/lib/supabase-jobs"; -import { addJobPriority, deleteJobPriority, getJobPriority } from "../../src/lib/job-priority"; +import { + addJobPriority, + deleteJobPriority, + getJobPriority, +} from "../../src/lib/job-priority"; import { PlanType } from "../types"; +import { getJobs } from "../../src/controllers/v1/crawl-status"; if (process.env.ENV === "production") { initSDK({ @@ -52,25 +65,24 @@ const processJobInternal = async (token: string, job: Job) => { await job.extendLock(token, jobLockExtensionTime); }, jobLockExtendInterval); - await addJobPriority(job.data.team_id, job.id ); + await addJobPriority(job.data.team_id, job.id); let err = null; try { const result = await processJob(job, token); - try{ + try { if (job.data.crawl_id && process.env.USE_DB_AUTHENTICATION === "true") { await job.moveToCompleted(null, token, false); } else { await job.moveToCompleted(result.docs, token, false); } - }catch(e){ - } + } catch (e) {} } catch (error) { console.log("Job failed, error:", error); Sentry.captureException(error); err = error; await job.moveToFailed(error, token, false); } finally { - await deleteJobPriority(job.data.team_id, job.id ); + await deleteJobPriority(job.data.team_id, job.id); clearInterval(extendLockInterval); } @@ -84,7 +96,10 @@ process.on("SIGINT", () => { isShuttingDown = true; }); -const workerFun = async (queueName: string, processJobInternal: (token: string, job: Job) => Promise) => { +const workerFun = async ( + queueName: string, + processJobInternal: (token: string, job: Job) => Promise +) => { const worker = new Worker(queueName, null, { connection: redisConnection, lockDuration: 1 * 60 * 1000, // 1 minute @@ -113,46 +128,62 @@ const workerFun = async (queueName: string, processJobInternal: (token: string, const job = await worker.getNextJob(token); if (job) { if (job.data && job.data.sentry && Sentry.isInitialized()) { - Sentry.continueTrace({ sentryTrace: job.data.sentry.trace, baggage: job.data.sentry.baggage }, () => { - Sentry.startSpan({ + Sentry.continueTrace( + { + sentryTrace: job.data.sentry.trace, + baggage: job.data.sentry.baggage, + }, + () => { + Sentry.startSpan( + { + name: "Scrape job", + attributes: { + job: job.id, + worker: process.env.FLY_MACHINE_ID ?? worker.id, + }, + }, + async (span) => { + await Sentry.startSpan( + { + name: "Process scrape job", + op: "queue.process", + attributes: { + "messaging.message.id": job.id, + "messaging.destination.name": getScrapeQueue().name, + "messaging.message.body.size": job.data.sentry.size, + "messaging.message.receive.latency": + Date.now() - (job.processedOn ?? job.timestamp), + "messaging.message.retry.count": job.attemptsMade, + }, + }, + async () => { + const res = await processJobInternal(token, job); + if (res !== null) { + span.setStatus({ code: 2 }); // ERROR + } else { + span.setStatus({ code: 1 }); // OK + } + } + ); + } + ); + } + ); + } else { + Sentry.startSpan( + { name: "Scrape job", attributes: { job: job.id, worker: process.env.FLY_MACHINE_ID ?? worker.id, }, - }, async (span) => { - await Sentry.startSpan({ - name: "Process scrape job", - op: "queue.process", - attributes: { - "messaging.message.id": job.id, - "messaging.destination.name": getScrapeQueue().name, - "messaging.message.body.size": job.data.sentry.size, - "messaging.message.receive.latency": Date.now() - (job.processedOn ?? job.timestamp), - "messaging.message.retry.count": job.attemptsMade, - } - }, async () => { - const res = await processJobInternal(token, job); - if (res !== null) { - span.setStatus({ code: 2 }); // ERROR - } else { - span.setStatus({ code: 1 }); // OK - } - }); - }); - }); - } else { - Sentry.startSpan({ - name: "Scrape job", - attributes: { - job: job.id, - worker: process.env.FLY_MACHINE_ID ?? worker.id, }, - }, () => { - processJobInternal(token, job); - }); + () => { + processJobInternal(token, job); + } + ); } - + await sleep(gotJobInterval); } else { await sleep(connectionMonitorInterval); @@ -167,13 +198,20 @@ async function processJob(job: Job, token: string) { // Check if the job URL is researchhub and block it immediately // TODO: remove this once solve the root issue - if (job.data.url && (job.data.url.includes("researchhub.com") || job.data.url.includes("ebay.com") || job.data.url.includes("youtube.com") || job.data.url.includes("microsoft.com") )) { + if ( + job.data.url && + (job.data.url.includes("researchhub.com") || + job.data.url.includes("ebay.com") || + job.data.url.includes("youtube.com") || + job.data.url.includes("microsoft.com")) + ) { Logger.info(`🐂 Blocking job ${job.id} with URL ${job.data.url}`); const data = { success: false, docs: [], project_id: job.data.project_id, - error: "URL is blocked. Suspecious activity detected. Please contact hello@firecrawl.com if you believe this is an error.", + error: + "URL is blocked. Suspecious activity detected. Please contact hello@firecrawl.com if you believe this is an error.", }; await job.moveToCompleted(data.docs, token, false); return data; @@ -187,14 +225,14 @@ async function processJob(job: Job, token: string) { current_url: "", }); const start = Date.now(); - + const { success, message, docs } = await startWebScraperPipeline({ job, token, }); // Better if we throw here so we capture with the correct error - if(!success) { + if (!success) { throw new Error(message); } const end = Date.now(); @@ -217,8 +255,26 @@ async function processJob(job: Job, token: string) { docs, }; - if (job.data.mode === "crawl") { - await callWebhook(job.data.team_id, job.id as string, data, job.data.webhook, job.data.v1); + // No idea what this does and when it is called. + if (job.data.mode === "crawl" && !job.data.v1) { + callWebhook( + job.data.team_id, + job.id as string, + data, + job.data.webhook, + job.data.v1 + ); + } + if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) { + await callWebhook( + job.data.team_id, + job.data.crawl_id, + data, + job.data.webhook, + job.data.v1, + "crawl.page", + true + ); } if (job.data.crawl_id) { @@ -240,7 +296,7 @@ async function processJob(job: Job, token: string) { await addCrawlJobDone(job.data.crawl_id, job.id); - const sc = await getCrawl(job.data.crawl_id) as StoredCrawl; + const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl; if (!job.data.sitemapped) { if (!sc.cancelled) { @@ -250,13 +306,16 @@ async function processJob(job: Job, token: string) { crawler.extractLinksFromHTML(rawHtml ?? "", sc.originUrl), Infinity, sc.crawlerOptions?.maxDepth ?? 10 - ) - + ); + for (const link of links) { if (await lockURL(job.data.crawl_id, sc, link)) { - // This seems to work really welel - const jobPriority = await getJobPriority({plan:sc.plan as PlanType, team_id: sc.team_id, basePriority: job.data.crawl_id ? 20 : 10}) + const jobPriority = await getJobPriority({ + plan: sc.plan as PlanType, + team_id: sc.team_id, + basePriority: job.data.crawl_id ? 20 : 10, + }); const jobId = uuidv4(); // console.log("plan: ", sc.plan); @@ -264,16 +323,21 @@ async function processJob(job: Job, token: string) { // console.log("base priority: ", job.data.crawl_id ? 20 : 10) // console.log("job priority: " , jobPriority, "\n\n\n") - const newJob = await addScrapeJob({ - url: link, - mode: "single_urls", - crawlerOptions: sc.crawlerOptions, - team_id: sc.team_id, - pageOptions: sc.pageOptions, - origin: job.data.origin, - crawl_id: job.data.crawl_id, - v1: job.data.v1, - }, {}, jobId, jobPriority); + const newJob = await addScrapeJob( + { + url: link, + mode: "single_urls", + crawlerOptions: sc.crawlerOptions, + team_id: sc.team_id, + pageOptions: sc.pageOptions, + origin: job.data.origin, + crawl_id: job.data.crawl_id, + v1: job.data.v1, + }, + {}, + jobId, + jobPriority + ); await addCrawlJob(job.data.crawl_id, newJob.id); } @@ -282,67 +346,98 @@ async function processJob(job: Job, token: string) { } if (await finishCrawl(job.data.crawl_id)) { - const jobIDs = await getCrawlJobs(job.data.crawl_id); + - const jobs = (await Promise.all(jobIDs.map(async x => { - if (x === job.id) { - return { - async getState() { - return "completed" - }, - timestamp: Date.now(), - returnvalue: docs, - } + if (!job.data.v1) { + const jobIDs = await getCrawlJobs(job.data.crawl_id); + + const jobs = (await getJobs(jobIDs)).sort((a, b) => a.timestamp - b.timestamp); + const jobStatuses = await Promise.all(jobs.map((x) => x.getState())); + const jobStatus = + sc.cancelled || jobStatuses.some((x) => x === "failed") + ? "failed" + : "completed"; + + const fullDocs = jobs.map((x) => + Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue + ); + + await logJob({ + job_id: job.data.crawl_id, + success: jobStatus === "completed", + message: sc.cancelled ? "Cancelled" : message, + num_docs: fullDocs.length, + docs: [], + time_taken: (Date.now() - sc.createdAt) / 1000, + team_id: job.data.team_id, + mode: "crawl", + url: sc.originUrl, + crawlerOptions: sc.crawlerOptions, + pageOptions: sc.pageOptions, + origin: job.data.origin, + }); + + const data = { + success: jobStatus !== "failed", + result: { + links: fullDocs.map((doc) => { + return { + content: doc, + source: doc?.metadata?.sourceURL ?? doc?.url ?? "", + }; + }), + }, + project_id: job.data.project_id, + error: message /* etc... */, + docs: fullDocs, + }; + + // v0 web hooks, call when done with all the data + if (!job.data.v1) { + callWebhook( + job.data.team_id, + job.data.crawl_id, + data, + job.data.webhook, + job.data.v1, + "crawl.completed" + ); } + } else { + const jobIDs = await getCrawlJobs(job.data.crawl_id); + const jobStatuses = await Promise.all(jobIDs.map((x) => getScrapeQueue().getJobState(x))); + const jobStatus = + sc.cancelled || jobStatuses.some((x) => x === "failed") + ? "failed" + : "completed"; - const j = await getScrapeQueue().getJob(x); - - if (process.env.USE_DB_AUTHENTICATION === "true") { - const supabaseData = await supabaseGetJobById(j.id); - - if (supabaseData) { - j.returnvalue = supabaseData.docs; + // v1 web hooks, call when done with no data, but with event completed + if (job.data.v1 && job.data.webhook) { + callWebhook( + job.data.team_id, + job.data.crawl_id, + [], + job.data.webhook, + job.data.v1, + "crawl.completed" + ); } - } - - return j; - }))).sort((a, b) => a.timestamp - b.timestamp); - const jobStatuses = await Promise.all(jobs.map(x => x.getState())); - const jobStatus = sc.cancelled || jobStatuses.some(x => x === "failed") ? "failed" : "completed"; - - const fullDocs = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue); - await logJob({ - job_id: job.data.crawl_id, - success: jobStatus === "completed", - message: sc.cancelled ? "Cancelled" : message, - num_docs: fullDocs.length, - docs: [], - time_taken: (Date.now() - sc.createdAt) / 1000, - team_id: job.data.team_id, - mode: "crawl", - url: sc.originUrl, - crawlerOptions: sc.crawlerOptions, - pageOptions: sc.pageOptions, - origin: job.data.origin, - }); - - const data = { - success: jobStatus !== "failed", - result: { - links: fullDocs.map((doc) => { - return { - content: doc, - source: doc?.metadata?.sourceURL ?? doc?.url ?? "", - }; - }), - }, - project_id: job.data.project_id, - error: message /* etc... */, - docs: fullDocs, - }; - - await callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1); + await logJob({ + job_id: job.data.crawl_id, + success: jobStatus === "completed", + message: sc.cancelled ? "Cancelled" : message, + num_docs: jobIDs.length, + docs: [], + time_taken: (Date.now() - sc.createdAt) / 1000, + team_id: job.data.team_id, + mode: "crawl", + url: sc.originUrl, + crawlerOptions: sc.crawlerOptions, + pageOptions: sc.pageOptions, + origin: job.data.origin, + }); + } } } @@ -353,9 +448,9 @@ async function processJob(job: Job, token: string) { Sentry.captureException(error, { data: { - job: job.id + job: job.id, }, - }) + }); if (error instanceof CustomError) { // Here we handle the error, then save the failed job @@ -384,11 +479,27 @@ async function processJob(job: Job, token: string) { error: "Something went wrong... Contact help@mendable.ai or try again." /* etc... */, }; - - if (job.data.mode === "crawl" || job.data.crawl_id) { - await callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data, job.data.webhook, job.data.v1); + + if (!job.data.v1 && (job.data.mode === "crawl" || job.data.crawl_id)) { + callWebhook( + job.data.team_id, + job.data.crawl_id ?? (job.id as string), + data, + job.data.webhook, + job.data.v1 + ); } - + if (job.data.v1) { + callWebhook( + job.data.team_id, + job.id as string, + [], + job.data.webhook, + job.data.v1, + "crawl.failed" + ); + } + if (job.data.crawl_id) { await logJob({ job_id: job.id as string, @@ -396,7 +507,8 @@ async function processJob(job: Job, token: string) { message: typeof error === "string" ? error - : error.message ?? "Something went wrong... Contact help@mendable.ai", + : error.message ?? + "Something went wrong... Contact help@mendable.ai", num_docs: 0, docs: [], time_taken: 0, @@ -417,7 +529,8 @@ async function processJob(job: Job, token: string) { message: typeof error === "string" ? error - : error.message ?? "Something went wrong... Contact help@mendable.ai", + : error.message ?? + "Something went wrong... Contact help@mendable.ai", num_docs: 0, docs: [], time_taken: 0, diff --git a/apps/api/src/services/webhook.ts b/apps/api/src/services/webhook.ts index b60774e0..56dd5c58 100644 --- a/apps/api/src/services/webhook.ts +++ b/apps/api/src/services/webhook.ts @@ -1,11 +1,24 @@ +import axios from "axios"; import { legacyDocumentConverter } from "../../src/controllers/v1/types"; import { Logger } from "../../src/lib/logger"; import { supabase_service } from "./supabase"; +import { WebhookEventType } from "../types"; -export const callWebhook = async (teamId: string, jobId: string, data: any, specified?: string, v1 = false) => { +export const callWebhook = async ( + teamId: string, + id: string, + data: any | null, + specified?: string, + v1 = false, + eventType: WebhookEventType = "crawl.page", + awaitWebhook: boolean = false +) => { try { - const selfHostedUrl = process.env.SELF_HOSTED_WEBHOOK_URL?.replace("{{JOB_ID}}", jobId); - const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === 'true'; + const selfHostedUrl = process.env.SELF_HOSTED_WEBHOOK_URL?.replace( + "{{JOB_ID}}", + id + ); + const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true"; let webhookUrl = specified ?? selfHostedUrl; // Only fetch the webhook URL from the database if the self-hosted webhook URL and specified webhook are not set @@ -17,7 +30,9 @@ export const callWebhook = async (teamId: string, jobId: string, data: any, spec .eq("team_id", teamId) .limit(1); if (error) { - Logger.error(`Error fetching webhook URL for team ID: ${teamId}, error: ${error.message}`); + Logger.error( + `Error fetching webhook URL for team ID: ${teamId}, error: ${error.message}` + ); return null; } @@ -29,10 +44,17 @@ export const callWebhook = async (teamId: string, jobId: string, data: any, spec } let dataToSend = []; - if (data.result.links && data.result.links.length !== 0) { + if ( + data && + data.result && + data.result.links && + data.result.links.length !== 0 + ) { for (let i = 0; i < data.result.links.length; i++) { if (v1) { - dataToSend.push(legacyDocumentConverter(data.result.links[i].content)) + dataToSend.push( + legacyDocumentConverter(data.result.links[i].content) + ); } else { dataToSend.push({ content: data.result.links[i].content.content, @@ -43,19 +65,72 @@ export const callWebhook = async (teamId: string, jobId: string, data: any, spec } } - await fetch(webhookUrl, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - success: data.success, - jobId: jobId, - data: dataToSend, - error: data.error || undefined, - }), - }); + if (awaitWebhook) { + try { + await axios.post( + webhookUrl, + { + success: !v1 + ? data.success + : eventType === "crawl.page" + ? data.success + : true, + type: eventType, + [v1 ? "id" : "jobId"]: id, + data: dataToSend, + error: !v1 + ? data?.error || undefined + : eventType === "crawl.page" + ? data?.error || undefined + : undefined, + }, + { + headers: { + "Content-Type": "application/json", + }, + timeout: v1 ? 10000 : 30000, // 10 seconds timeout (v1) + } + ); + } catch (error) { + Logger.error( + `Axios error (0) sending webhook for team ID: ${teamId}, error: ${error.message}` + ); + } + } else { + axios + .post( + webhookUrl, + { + success: !v1 + ? data.success + : eventType === "crawl.page" + ? data.success + : true, + type: eventType, + [v1 ? "id" : "jobId"]: id, + data: dataToSend, + error: !v1 + ? data?.error || undefined + : eventType === "crawl.page" + ? data?.error || undefined + : undefined, + }, + { + headers: { + "Content-Type": "application/json", + }, + timeout: v1 ? 10000 : 30000, // 10 seconds timeout (v1) + } + ) + .catch((error) => { + Logger.error( + `Axios error sending webhook for team ID: ${teamId}, error: ${error.message}` + ); + }); + } } catch (error) { - Logger.debug(`Error sending webhook for team ID: ${teamId}, error: ${error.message}`); + Logger.debug( + `Error sending webhook for team ID: ${teamId}, error: ${error.message}` + ); } }; diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index 431c0126..50fb6eef 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -153,4 +153,7 @@ export type PlanType = | "growth" | "growthdouble" | "free" - | ""; \ No newline at end of file + | ""; + + +export type WebhookEventType = "crawl.page" | "crawl.started" | "crawl.completed" | "crawl.failed"; \ No newline at end of file diff --git a/apps/js-sdk/firecrawl/package.json b/apps/js-sdk/firecrawl/package.json index 002e10d2..e68b3014 100644 --- a/apps/js-sdk/firecrawl/package.json +++ b/apps/js-sdk/firecrawl/package.json @@ -1,6 +1,6 @@ { "name": "@mendable/firecrawl-js", - "version": "1.2.0", + "version": "1.2.1", "description": "JavaScript SDK for Firecrawl API", "main": "build/cjs/index.js", "types": "types/index.d.ts", diff --git a/apps/js-sdk/firecrawl/src/index.ts b/apps/js-sdk/firecrawl/src/index.ts index ee55343c..1d1bb4ee 100644 --- a/apps/js-sdk/firecrawl/src/index.ts +++ b/apps/js-sdk/firecrawl/src/index.ts @@ -111,6 +111,7 @@ export interface CrawlParams { allowExternalLinks?: boolean; ignoreSitemap?: boolean; scrapeOptions?: ScrapeParams; + webhook?: string; } /** diff --git a/apps/js-sdk/firecrawl/types/index.d.ts b/apps/js-sdk/firecrawl/types/index.d.ts index 8b620f85..36356c4e 100644 --- a/apps/js-sdk/firecrawl/types/index.d.ts +++ b/apps/js-sdk/firecrawl/types/index.d.ts @@ -103,6 +103,7 @@ export interface CrawlParams { allowExternalLinks?: boolean; ignoreSitemap?: boolean; scrapeOptions?: ScrapeParams; + webhook?: string; } /** * Response interface for crawling operations.