feat: remove webScraperQueue

This commit is contained in:
Gergo Moricz 2024-08-13 21:03:24 +02:00
parent 4a2c37dcf5
commit d7549d4dc5
12 changed files with 151 additions and 163 deletions

View File

@ -2,7 +2,7 @@ import { Request, Response } from "express";
import { Job } from "bullmq"; import { Job } from "bullmq";
import { Logger } from "../../lib/logger"; import { Logger } from "../../lib/logger";
import { getWebScraperQueue } from "../../services/queue-service"; import { getScrapeQueue } from "../../services/queue-service";
import { checkAlerts } from "../../services/alerts"; import { checkAlerts } from "../../services/alerts";
export async function cleanBefore24hCompleteJobsController( export async function cleanBefore24hCompleteJobsController(
@ -11,13 +11,13 @@ export async function cleanBefore24hCompleteJobsController(
) { ) {
Logger.info("🐂 Cleaning jobs older than 24h"); Logger.info("🐂 Cleaning jobs older than 24h");
try { try {
const webScraperQueue = getWebScraperQueue(); const scrapeQueue = getScrapeQueue();
const batchSize = 10; const batchSize = 10;
const numberOfBatches = 9; // Adjust based on your needs const numberOfBatches = 9; // Adjust based on your needs
const completedJobsPromises: Promise<Job[]>[] = []; const completedJobsPromises: Promise<Job[]>[] = [];
for (let i = 0; i < numberOfBatches; i++) { for (let i = 0; i < numberOfBatches; i++) {
completedJobsPromises.push( completedJobsPromises.push(
webScraperQueue.getJobs( scrapeQueue.getJobs(
["completed"], ["completed"],
i * batchSize, i * batchSize,
i * batchSize + batchSize, i * batchSize + batchSize,
@ -68,10 +68,10 @@ export async function checkQueuesController(req: Request, res: Response) {
// Use this as a "health check" that way we dont destroy the server // Use this as a "health check" that way we dont destroy the server
export async function queuesController(req: Request, res: Response) { export async function queuesController(req: Request, res: Response) {
try { try {
const webScraperQueue = getWebScraperQueue(); const scrapeQueue = getScrapeQueue();
const [webScraperActive] = await Promise.all([ const [webScraperActive] = await Promise.all([
webScraperQueue.getActiveCount(), scrapeQueue.getActiveCount(),
]); ]);
const noActiveJobs = webScraperActive === 0; const noActiveJobs = webScraperActive === 0;

View File

@ -15,12 +15,6 @@ export async function crawlStatusController(req: Request, res: Response) {
if (!success) { if (!success) {
return res.status(status).json({ error }); return res.status(status).json({ error });
} }
// const job = await getWebScraperQueue().getJob(req.params.jobId);
// if (!job) {
// return res.status(404).json({ error: "Job not found" });
// }
// const isCancelled = await (await getWebScraperQueue().client).exists("cancelled:" + req.params.jobId);
const sc = await getCrawl(req.params.jobId); const sc = await getCrawl(req.params.jobId);
if (!sc) { if (!sc) {

View File

@ -1,10 +1,8 @@
import { Request, Response } from "express"; import { Request, Response } from "express";
import { WebScraperDataProvider } from "../../src/scraper/WebScraper";
import { billTeam } from "../../src/services/billing/credit_billing";
import { checkTeamCredits } from "../../src/services/billing/credit_billing"; import { checkTeamCredits } from "../../src/services/billing/credit_billing";
import { authenticateUser } from "./auth"; import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../../src/types"; import { RateLimiterMode } from "../../src/types";
import { addScrapeJob, addWebScraperJob } from "../../src/services/queue-jobs"; import { addScrapeJob } from "../../src/services/queue-jobs";
import { isUrlBlocked } from "../../src/scraper/WebScraper/utils/blocklist"; import { isUrlBlocked } from "../../src/scraper/WebScraper/utils/blocklist";
import { logCrawl } from "../../src/services/logging/crawl_log"; import { logCrawl } from "../../src/services/logging/crawl_log";
import { validateIdempotencyKey } from "../../src/services/idempotency/validate"; import { validateIdempotencyKey } from "../../src/services/idempotency/validate";

View File

@ -1,44 +1,124 @@
import { Request, Response } from "express"; import { Request, Response } from "express";
import { authenticateUser } from "./auth"; import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../../src/types"; import { RateLimiterMode } from "../../src/types";
import { addWebScraperJob } from "../../src/services/queue-jobs";
import { isUrlBlocked } from "../../src/scraper/WebScraper/utils/blocklist"; import { isUrlBlocked } from "../../src/scraper/WebScraper/utils/blocklist";
import { v4 as uuidv4 } from "uuid";
import { Logger } from "../../src/lib/logger"; import { Logger } from "../../src/lib/logger";
import { addCrawlJob, crawlToCrawler, lockURL, saveCrawl, StoredCrawl } from "../../src/lib/crawl-redis";
import { addScrapeJob } from "../../src/services/queue-jobs";
export async function crawlPreviewController(req: Request, res: Response) { export async function crawlPreviewController(req: Request, res: Response) {
try { try {
const { success, team_id, error, status } = await authenticateUser( const { success, error, status } = await authenticateUser(
req, req,
res, res,
RateLimiterMode.Preview RateLimiterMode.Preview
); );
const team_id = "preview";
if (!success) { if (!success) {
return res.status(status).json({ error }); return res.status(status).json({ error });
} }
// authenticate on supabase
const url = req.body.url; const url = req.body.url;
if (!url) { if (!url) {
return res.status(400).json({ error: "Url is required" }); return res.status(400).json({ error: "Url is required" });
} }
if (isUrlBlocked(url)) { if (isUrlBlocked(url)) {
return res.status(403).json({ error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it." }); return res
.status(403)
.json({
error:
"Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.",
});
} }
const mode = req.body.mode ?? "crawl";
const crawlerOptions = req.body.crawlerOptions ?? {}; const crawlerOptions = req.body.crawlerOptions ?? {};
const pageOptions = req.body.pageOptions ?? { onlyMainContent: false, includeHtml: false, removeTags: [] }; const pageOptions = req.body.pageOptions ?? { onlyMainContent: false, includeHtml: false, removeTags: [] };
const job = await addWebScraperJob({ // if (mode === "single_urls" && !url.includes(",")) { // NOTE: do we need this?
url: url, // try {
mode: mode ?? "crawl", // fix for single urls not working // const a = new WebScraperDataProvider();
crawlerOptions: { ...crawlerOptions, limit: 5, maxCrawledLinks: 5 }, // await a.setOptions({
team_id: "preview", // jobId: uuidv4(),
// mode: "single_urls",
// urls: [url],
// crawlerOptions: { ...crawlerOptions, returnOnlyUrls: true },
// pageOptions: pageOptions,
// });
// const docs = await a.getDocuments(false, (progress) => {
// job.updateProgress({
// current: progress.current,
// total: progress.total,
// current_step: "SCRAPING",
// current_url: progress.currentDocumentUrl,
// });
// });
// return res.json({
// success: true,
// documents: docs,
// });
// } catch (error) {
// Logger.error(error);
// return res.status(500).json({ error: error.message });
// }
// }
const id = uuidv4();
let robots;
try {
robots = await this.getRobotsTxt();
} catch (_) {}
const sc: StoredCrawl = {
originUrl: url,
crawlerOptions,
pageOptions,
team_id,
robots,
};
await saveCrawl(id, sc);
const crawler = crawlToCrawler(id, sc);
const sitemap = sc.crawlerOptions?.ignoreSitemap ? null : await crawler.tryGetSitemap();
if (sitemap !== null) {
for (const url of sitemap.map(x => x.url)) {
await lockURL(id, sc, url);
const job = await addScrapeJob({
url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
team_id: team_id,
pageOptions: pageOptions, pageOptions: pageOptions,
origin: "website-preview", origin: "website-preview",
crawl_id: id,
sitemapped: true,
}); });
await addCrawlJob(id, job.id);
}
} else {
await lockURL(id, sc, url);
const job = await addScrapeJob({
url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
team_id: team_id,
pageOptions: pageOptions,
origin: "website-preview",
crawl_id: id,
});
await addCrawlJob(id, job.id);
}
res.json({ jobId: job.id }); res.json({ jobId: id });
} catch (error) { } catch (error) {
Logger.error(error); Logger.error(error);
return res.status(500).json({ error: error.message }); return res.status(500).json({ error: error.message });

View File

@ -1,56 +1,38 @@
import { Request, Response } from "express"; import { Request, Response } from "express";
import { getWebScraperQueue } from "../../src/services/queue-service";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
import { Logger } from "../../src/lib/logger"; import { Logger } from "../../src/lib/logger";
import { getCrawl, getCrawlJobs } from "../../src/lib/crawl-redis";
import { getScrapeQueue } from "../../src/services/queue-service";
export async function crawlJobStatusPreviewController(req: Request, res: Response) { export async function crawlJobStatusPreviewController(req: Request, res: Response) {
try { try {
const job = await getWebScraperQueue().getJob(req.params.jobId); const sc = await getCrawl(req.params.jobId);
if (!job) { if (!sc) {
return res.status(404).json({ error: "Job not found" }); return res.status(404).json({ error: "Job not found" });
} }
let progress = job.progress; const jobIDs = await getCrawlJobs(req.params.jobId);
if(typeof progress !== 'object') {
progress = {
current: 0,
current_url: '',
total: 0,
current_step: '',
partialDocs: []
}
}
const {
current = 0,
current_url = '',
total = 0,
current_step = '',
partialDocs = []
} = progress as { current: number, current_url: string, total: number, current_step: string, partialDocs: any[] };
let data = job.returnvalue; // let data = job.returnvalue;
if (process.env.USE_DB_AUTHENTICATION === "true") { // if (process.env.USE_DB_AUTHENTICATION === "true") {
const supabaseData = await supabaseGetJobById(req.params.jobId); // const supabaseData = await supabaseGetJobById(req.params.jobId);
if (supabaseData) { // if (supabaseData) {
data = supabaseData.docs; // data = supabaseData.docs;
} // }
} // }
let jobStatus = await job.getState(); const jobs = await Promise.all(jobIDs.map(x => getScrapeQueue().getJob(x)));
if (jobStatus === 'waiting' || jobStatus === 'delayed' || jobStatus === 'waiting-children' || jobStatus === 'unknown' || jobStatus === 'prioritized') { const jobStatuses = await Promise.all(jobs.map(x => x.getState()));
jobStatus = 'active'; const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active";
}
const data = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue);
res.json({ res.json({
status: jobStatus, status: jobStatus,
// progress: job.progress(), current: jobStatuses.filter(x => x === "completed" || x === "failed").length,
current, total: jobs.length,
current_url, data: jobStatus === "completed" ? data : null,
current_step, partial_data: jobStatus === "completed" ? [] : data.filter(x => x !== null),
total,
data: data ? data : null,
partial_data: jobStatus == 'completed' ? [] : partialDocs,
}); });
} catch (error) { } catch (error) {
Logger.error(error); Logger.error(error);

View File

@ -2,7 +2,7 @@ import express from "express";
import bodyParser from "body-parser"; import bodyParser from "body-parser";
import cors from "cors"; import cors from "cors";
import "dotenv/config"; import "dotenv/config";
import { getScrapeQueue, getWebScraperQueue } from "./services/queue-service"; import { getScrapeQueue } from "./services/queue-service";
import { v0Router } from "./routes/v0"; import { v0Router } from "./routes/v0";
import { initSDK } from "@hyperdx/node-opentelemetry"; import { initSDK } from "@hyperdx/node-opentelemetry";
import cluster from "cluster"; import cluster from "cluster";
@ -58,7 +58,7 @@ if (cluster.isMaster) {
serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`); serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`);
const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({ const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
queues: [new BullAdapter(getWebScraperQueue()), new BullAdapter(getScrapeQueue())], queues: [new BullAdapter(getScrapeQueue())],
serverAdapter: serverAdapter, serverAdapter: serverAdapter,
}); });
@ -104,9 +104,9 @@ if (cluster.isMaster) {
app.get(`/serverHealthCheck`, async (req, res) => { app.get(`/serverHealthCheck`, async (req, res) => {
try { try {
const webScraperQueue = getWebScraperQueue(); const scrapeQueue = getScrapeQueue();
const [waitingJobs] = await Promise.all([ const [waitingJobs] = await Promise.all([
webScraperQueue.getWaitingCount(), scrapeQueue.getWaitingCount(),
]); ]);
const noWaitingJobs = waitingJobs === 0; const noWaitingJobs = waitingJobs === 0;
@ -126,9 +126,9 @@ if (cluster.isMaster) {
const timeout = 60000; // 1 minute // The timeout value for the check in milliseconds const timeout = 60000; // 1 minute // The timeout value for the check in milliseconds
const getWaitingJobsCount = async () => { const getWaitingJobsCount = async () => {
const webScraperQueue = getWebScraperQueue(); const scrapeQueue = getScrapeQueue();
const [waitingJobsCount] = await Promise.all([ const [waitingJobsCount] = await Promise.all([
webScraperQueue.getWaitingCount(), scrapeQueue.getWaitingCount(),
]); ]);
return waitingJobsCount; return waitingJobsCount;
@ -181,12 +181,12 @@ if (cluster.isMaster) {
Logger.info(`Worker ${process.pid} started`); Logger.info(`Worker ${process.pid} started`);
} }
// const wsq = getWebScraperQueue(); // const sq = getScrapeQueue();
// wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); // sq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
// wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active")); // sq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
// wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed")); // sq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
// wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused")); // sq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
// wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); // sq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
// wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); // sq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));

View File

@ -12,7 +12,7 @@ import { Document } from "../lib/entities";
import { supabase_service } from "../services/supabase"; import { supabase_service } from "../services/supabase";
import { Logger } from "../lib/logger"; import { Logger } from "../lib/logger";
import { ScrapeEvents } from "../lib/scrape-events"; import { ScrapeEvents } from "../lib/scrape-events";
import { getWebScraperQueue } from "../services/queue-service"; import { getScrapeQueue } from "../services/queue-service";
export async function startWebScraperPipeline({ export async function startWebScraperPipeline({
job, job,
@ -106,9 +106,6 @@ export async function runWebScraper({
}) })
: docs; : docs;
const isCancelled = await (await getWebScraperQueue().client).exists("cancelled:" + bull_job_id);
if (!isCancelled) {
const billingResult = await billTeam(team_id, filteredDocs.length); const billingResult = await billTeam(team_id, filteredDocs.length);
if (!billingResult.success) { if (!billingResult.success) {
@ -119,7 +116,6 @@ export async function runWebScraper({
docs: [], docs: [],
}; };
} }
}
// This is where the returnvalue from the job is set // This is where the returnvalue from the job is set
onSuccess(filteredDocs, mode); onSuccess(filteredDocs, mode);

View File

@ -16,7 +16,7 @@ import {
replacePathsWithAbsolutePaths, replacePathsWithAbsolutePaths,
} from "./utils/replacePaths"; } from "./utils/replacePaths";
import { generateCompletions } from "../../lib/LLM-extraction"; import { generateCompletions } from "../../lib/LLM-extraction";
import { getWebScraperQueue } from "../../../src/services/queue-service"; import { getScrapeQueue } from "../../../src/services/queue-service";
import { fetchAndProcessDocx } from "./utils/docxProcessor"; import { fetchAndProcessDocx } from "./utils/docxProcessor";
import { getAdjustedMaxDepth, getURLDepth } from "./utils/maxDepthUtils"; import { getAdjustedMaxDepth, getURLDepth } from "./utils/maxDepthUtils";
import { Logger } from "../../lib/logger"; import { Logger } from "../../lib/logger";
@ -88,21 +88,6 @@ export class WebScraperDataProvider {
results[i + index] = result; results[i + index] = result;
}) })
); );
try {
if (this.mode === "crawl" && this.bullJobId) {
const job = await getWebScraperQueue().getJob(this.bullJobId);
const jobStatus = await job.getState();
if (jobStatus === "failed") {
Logger.info(
"Job has failed or has been cancelled by the user. Stopping the job..."
);
return [] as Document[];
}
}
} catch (error) {
Logger.error(error.message);
return [] as Document[];
}
} }
return results.filter((result) => result !== null) as Document[]; return results.filter((result) => result !== null) as Document[];
} }

View File

@ -1,5 +1,5 @@
import { Logger } from "../../../src/lib/logger"; import { Logger } from "../../../src/lib/logger";
import { getWebScraperQueue } from "../queue-service"; import { getScrapeQueue } from "../queue-service";
import { sendSlackWebhook } from "./slack"; import { sendSlackWebhook } from "./slack";
export async function checkAlerts() { export async function checkAlerts() {
@ -13,8 +13,8 @@ export async function checkAlerts() {
Logger.info("Initializing alerts"); Logger.info("Initializing alerts");
const checkActiveJobs = async () => { const checkActiveJobs = async () => {
try { try {
const webScraperQueue = getWebScraperQueue(); const scrapeQueue = getScrapeQueue();
const activeJobs = await webScraperQueue.getActiveCount(); const activeJobs = await scrapeQueue.getActiveCount();
if (activeJobs > Number(process.env.ALERT_NUM_ACTIVE_JOBS)) { if (activeJobs > Number(process.env.ALERT_NUM_ACTIVE_JOBS)) {
Logger.warn( Logger.warn(
`Alert: Number of active jobs is over ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}.` `Alert: Number of active jobs is over ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}.`
@ -34,8 +34,8 @@ export async function checkAlerts() {
}; };
const checkWaitingQueue = async () => { const checkWaitingQueue = async () => {
const webScraperQueue = getWebScraperQueue(); const scrapeQueue = getScrapeQueue();
const waitingJobs = await webScraperQueue.getWaitingCount(); const waitingJobs = await scrapeQueue.getWaitingCount();
if (waitingJobs > Number(process.env.ALERT_NUM_WAITING_JOBS)) { if (waitingJobs > Number(process.env.ALERT_NUM_WAITING_JOBS)) {
Logger.warn( Logger.warn(

View File

@ -1,22 +1,8 @@
import { Job, Queue } from "bullmq"; import { Job, Queue } from "bullmq";
import { import { getScrapeQueue } from "./queue-service";
getScrapeQueue,
getWebScraperQueue,
} from "./queue-service";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { WebScraperOptions } from "../types"; import { WebScraperOptions } from "../types";
export async function addWebScraperJob(
webScraperOptions: WebScraperOptions,
options: any = {},
jobId: string = uuidv4(),
): Promise<Job> {
return await getWebScraperQueue().add(jobId, webScraperOptions, {
...options,
jobId,
});
}
export async function addScrapeJob( export async function addScrapeJob(
webScraperOptions: WebScraperOptions, webScraperOptions: WebScraperOptions,
options: any = {}, options: any = {},

View File

@ -2,38 +2,13 @@ import { Queue } from "bullmq";
import { Logger } from "../lib/logger"; import { Logger } from "../lib/logger";
import IORedis from "ioredis"; import IORedis from "ioredis";
let webScraperQueue: Queue;
let scrapeQueue: Queue; let scrapeQueue: Queue;
export const redisConnection = new IORedis(process.env.REDIS_URL, { export const redisConnection = new IORedis(process.env.REDIS_URL, {
maxRetriesPerRequest: null, maxRetriesPerRequest: null,
}); });
export const webScraperQueueName = "{crawlQueue}";
export const scrapeQueueName = "{scrapeQueue}"; export const scrapeQueueName = "{scrapeQueue}";
export function getWebScraperQueue() {
if (!webScraperQueue) {
webScraperQueue = new Queue(
webScraperQueueName,
{
connection: redisConnection,
}
// {
// settings: {
// lockDuration: 1 * 60 * 1000, // 1 minute in milliseconds,
// lockRenewTime: 15 * 1000, // 15 seconds in milliseconds
// stalledInterval: 30 * 1000,
// maxStalledCount: 10,
// },
// defaultJobOptions:{
// attempts: 5
// }
// }
);
Logger.info("Web scraper queue created");
}
return webScraperQueue;
}
export function getScrapeQueue() { export function getScrapeQueue() {
if (!scrapeQueue) { if (!scrapeQueue) {
@ -63,4 +38,3 @@ export function getScrapeQueue() {
import { QueueEvents } from 'bullmq'; import { QueueEvents } from 'bullmq';
export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection }); export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection });
export const webScraperQueueEvents = new QueueEvents(webScraperQueueName, { connection: redisConnection });

View File

@ -1,9 +1,7 @@
import { CustomError } from "../lib/custom-error"; import { CustomError } from "../lib/custom-error";
import { import {
getWebScraperQueue,
getScrapeQueue, getScrapeQueue,
redisConnection, redisConnection,
webScraperQueueName,
scrapeQueueName, scrapeQueueName,
} from "./queue-service"; } from "./queue-service";
import "dotenv/config"; import "dotenv/config";
@ -110,7 +108,6 @@ const workerFun = async (queueName: string, processJobInternal: (token: string,
} }
}; };
workerFun(webScraperQueueName, processJobInternal);
workerFun(scrapeQueueName, processJobInternal); workerFun(scrapeQueueName, processJobInternal);
async function processJob(job: Job, token: string) { async function processJob(job: Job, token: string) {
@ -205,10 +202,6 @@ async function processJob(job: Job, token: string) {
return data; return data;
} catch (error) { } catch (error) {
Logger.error(`🐂 Job errored ${job.id} - ${error}`); Logger.error(`🐂 Job errored ${job.id} - ${error}`);
if (await getWebScraperQueue().isPaused()) {
Logger.debug("🐂Queue is paused, ignoring");
return;
}
if (error instanceof CustomError) { if (error instanceof CustomError) {
// Here we handle the error, then save the failed job // Here we handle the error, then save the failed job