Merge pull request #459 from mendableai/feat/queue-scrapes

feat: Move scraper to queue
This commit is contained in:
Nicolas 2024-08-15 14:19:55 -04:00 committed by GitHub
commit 81b2479db3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 8092 additions and 458 deletions

View File

@ -24,8 +24,8 @@ kill_timeout = '30s'
[http_service.concurrency]
type = "requests"
hard_limit = 100
soft_limit = 50
# hard_limit = 100
soft_limit = 100
[[http_service.checks]]
grace_period = "10s"
@ -51,12 +51,13 @@ kill_timeout = '30s'
[services.concurrency]
type = 'connections'
hard_limit = 25
soft_limit = 20
# hard_limit = 25
soft_limit = 100
[[vm]]
size = 'performance-1x'
size = 'performance-2x'
processes = ['app','worker']
memory = 8192

View File

@ -24,8 +24,8 @@ kill_timeout = '30s'
[http_service.concurrency]
type = "requests"
hard_limit = 200
soft_limit = 75
# hard_limit = 200
soft_limit = 200
[[http_service.checks]]
grace_period = "20s"
@ -50,8 +50,8 @@ kill_timeout = '30s'
[services.concurrency]
type = 'connections'
hard_limit = 30
soft_limit = 12
# hard_limit = 30
soft_limit = 200
[[vm]]
size = 'performance-4x'

View File

@ -29,7 +29,6 @@
"@jest/globals": "^29.7.0",
"@tsconfig/recommended": "^1.0.3",
"@types/body-parser": "^1.19.2",
"@types/bull": "^4.10.0",
"@types/cors": "^2.8.13",
"@types/express": "^4.17.17",
"@types/jest": "^29.5.12",
@ -63,7 +62,7 @@
"async-mutex": "^0.5.0",
"axios": "^1.3.4",
"bottleneck": "^2.19.5",
"bull": "^4.15.0",
"bullmq": "^5.11.0",
"cacheable-lookup": "^6.1.0",
"cheerio": "^1.0.0-rc.12",
"cohere": "^1.1.1",
@ -99,6 +98,7 @@
"robots-parser": "^3.0.1",
"scrapingbee": "^1.7.4",
"stripe": "^16.1.0",
"systeminformation": "^5.22.11",
"turndown": "^7.1.3",
"turndown-plugin-gfm": "^1.0.2",
"typesense": "^1.5.4",

View File

@ -56,9 +56,9 @@ importers:
bottleneck:
specifier: ^2.19.5
version: 2.19.5
bull:
specifier: ^4.15.0
version: 4.15.0
bullmq:
specifier: ^5.11.0
version: 5.11.0
cacheable-lookup:
specifier: ^6.1.0
version: 6.1.0
@ -164,6 +164,9 @@ importers:
stripe:
specifier: ^16.1.0
version: 16.1.0
systeminformation:
specifier: ^5.22.11
version: 5.22.11
turndown:
specifier: ^7.1.3
version: 7.2.0
@ -204,9 +207,6 @@ importers:
'@types/body-parser':
specifier: ^1.19.2
version: 1.19.5
'@types/bull':
specifier: ^4.10.0
version: 4.10.0
'@types/cors':
specifier: ^2.8.13
version: 2.8.17
@ -1538,10 +1538,6 @@ packages:
'@types/body-parser@1.19.5':
resolution: {integrity: sha512-fB3Zu92ucau0iQ0JMCFQE7b/dv8Ot07NI3KaZIkIUNXq82k4eBAqUaneXfleGY9JWskeS9y+u0nXMyspcuQrCg==}
'@types/bull@4.10.0':
resolution: {integrity: sha512-RkYW8K2H3J76HT6twmHYbzJ0GtLDDotpLP9ah9gtiA7zfF6peBH1l5fEiK0oeIZ3/642M7Jcb9sPmor8Vf4w6g==}
deprecated: This is a stub types definition. bull provides its own type definitions, so you do not need this installed.
'@types/bunyan@1.8.9':
resolution: {integrity: sha512-ZqS9JGpBxVOvsawzmVt30sP++gSQMTejCkIAQ3VdadOcRE8izTyW66hufvwLeH+YEGP6Js2AW7Gz+RMyvrEbmw==}
@ -1938,9 +1934,8 @@ packages:
buffer@6.0.3:
resolution: {integrity: sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==}
bull@4.15.0:
resolution: {integrity: sha512-nOEAfUXwUXtFbRPQP3bWCwpQ/NAerAu2Nym/ucv5C1E+Qh2x6RGdKKsYIfZam4mYncayTynTUN/HLhRgGi2N8w==}
engines: {node: '>=12'}
bullmq@5.11.0:
resolution: {integrity: sha512-qVzyWGZqie3VHaYEgRXhId/j8ebfmj6MExEJyUByMsUJA5pVciVle3hKLer5fyMwtQ8lTMP7GwhXV/NZ+HzlRA==}
bytes@3.1.2:
resolution: {integrity: sha512-/Nf7TyzTx6S3yRJObOAV7956r8cr2+Oj8AC5dt8wSP3BQAoeX58NoHyCU8P8zGkNXStjTSi6fzO6F0pBdcYbEg==}
@ -2562,10 +2557,6 @@ packages:
resolution: {integrity: sha512-pjzuKtY64GYfWizNAJ0fr9VqttZkNiK2iS430LtIHzjBEr6bX8Am2zm4sW4Ro5wjWW5cAlRL1qAMTcXbjNAO2Q==}
engines: {node: '>=8.0.0'}
get-port@5.1.1:
resolution: {integrity: sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==}
engines: {node: '>=8'}
get-stream@5.2.0:
resolution: {integrity: sha512-nBF+F1rAZVCu/p7rjzgA+Yb4lfYXrpl7a6VmJrU8wF9I1CKvP/QwPNZHnOlwbTkY6dvtFIzFMSyQXbLoTQPRpA==}
engines: {node: '>=8'}
@ -4268,6 +4259,12 @@ packages:
resolution: {integrity: sha512-SzRP5LQ6Ts2G5NyAa/jg16s8e3R7rfdFjizy1zeoecYWw+nGL+YA1xZvW/+iJmidBGSdLkuvdwTYEyJEb+EiUw==}
engines: {node: '>=0.2.6'}
systeminformation@5.22.11:
resolution: {integrity: sha512-aLws5yi4KCHTb0BVvbodQY5bY8eW4asMRDTxTW46hqw9lGjACX6TlLdJrkdoHYRB0qs+MekqEq1zG7WDnWE8Ug==}
engines: {node: '>=8.0.0'}
os: [darwin, linux, win32, freebsd, openbsd, netbsd, sunos, android]
hasBin: true
tar-fs@3.0.5:
resolution: {integrity: sha512-JOgGAmZyMgbqpLwct7ZV8VzkEB6pxXFBVErLtb+XCOqzc6w1xiWKI9GVd6bwk68EX7eJ4DWmfXVmq8K2ziZTGg==}
@ -4460,10 +4457,6 @@ packages:
resolution: {integrity: sha512-8XkAphELsDnEGrDxUOHB3RGvXz6TeuYSGEZBOjtTtPm2lwhGBjLgOzLHB63IUWfBpNucQjND6d3AOudO+H3RWQ==}
hasBin: true
uuid@8.3.2:
resolution: {integrity: sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==}
hasBin: true
uuid@9.0.1:
resolution: {integrity: sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==}
hasBin: true
@ -6447,12 +6440,6 @@ snapshots:
'@types/connect': 3.4.38
'@types/node': 20.14.1
'@types/bull@4.10.0':
dependencies:
bull: 4.15.0
transitivePeerDependencies:
- supports-color
'@types/bunyan@1.8.9':
dependencies:
'@types/node': 20.14.1
@ -6923,15 +6910,15 @@ snapshots:
base64-js: 1.5.1
ieee754: 1.2.1
bull@4.15.0:
bullmq@5.11.0:
dependencies:
cron-parser: 4.9.0
get-port: 5.1.1
ioredis: 5.4.1
lodash: 4.17.21
msgpackr: 1.10.2
node-abort-controller: 3.1.1
semver: 7.6.2
uuid: 8.3.2
tslib: 2.6.3
uuid: 9.0.1
transitivePeerDependencies:
- supports-color
@ -7532,8 +7519,6 @@ snapshots:
get-package-type@0.1.0: {}
get-port@5.1.1: {}
get-stream@5.2.0:
dependencies:
pump: 3.0.0
@ -9433,6 +9418,8 @@ snapshots:
sylvester@0.0.12: {}
systeminformation@5.22.11: {}
tar-fs@3.0.5:
dependencies:
pump: 3.0.0
@ -9605,8 +9592,6 @@ snapshots:
uuid@10.0.0: {}
uuid@8.3.2: {}
uuid@9.0.1: {}
v8-compile-cache-lib@3.0.1: {}

View File

@ -217,7 +217,6 @@ describe("E2E Tests for API Routes", () => {
expect(response.body.data).toHaveProperty('content');
expect(response.body.data).toHaveProperty('metadata');
expect(response.body.data.metadata.pageStatusCode).toBe(404);
expect(response.body.data.metadata.pageError.toLowerCase()).toContain("not found");
}, 60000); // 60 seconds
it.concurrent('should return a successful response for a scrape with 405 page', async () => {
@ -233,7 +232,6 @@ describe("E2E Tests for API Routes", () => {
expect(response.body.data).toHaveProperty('content');
expect(response.body.data).toHaveProperty('metadata');
expect(response.body.data.metadata.pageStatusCode).toBe(405);
expect(response.body.data.metadata.pageError.toLowerCase()).toContain("method not allowed");
}, 60000); // 60 seconds
it.concurrent('should return a successful response for a scrape with 500 page', async () => {
@ -249,7 +247,6 @@ describe("E2E Tests for API Routes", () => {
expect(response.body.data).toHaveProperty('content');
expect(response.body.data).toHaveProperty('metadata');
expect(response.body.data.metadata.pageStatusCode).toBe(500);
expect(response.body.data.metadata.pageError.toLowerCase()).toContain("internal server error");
}, 60000); // 60 seconds
});

View File

@ -1,8 +1,8 @@
import { Request, Response } from "express";
import { Job } from "bull";
import { Job } from "bullmq";
import { Logger } from "../../lib/logger";
import { getWebScraperQueue } from "../../services/queue-service";
import { getScrapeQueue } from "../../services/queue-service";
import { checkAlerts } from "../../services/alerts";
export async function cleanBefore24hCompleteJobsController(
@ -11,13 +11,13 @@ export async function cleanBefore24hCompleteJobsController(
) {
Logger.info("🐂 Cleaning jobs older than 24h");
try {
const webScraperQueue = getWebScraperQueue();
const scrapeQueue = getScrapeQueue();
const batchSize = 10;
const numberOfBatches = 9; // Adjust based on your needs
const completedJobsPromises: Promise<Job[]>[] = [];
for (let i = 0; i < numberOfBatches; i++) {
completedJobsPromises.push(
webScraperQueue.getJobs(
scrapeQueue.getJobs(
["completed"],
i * batchSize,
i * batchSize + batchSize,
@ -68,10 +68,10 @@ export async function checkQueuesController(req: Request, res: Response) {
// Use this as a "health check" that way we dont destroy the server
export async function queuesController(req: Request, res: Response) {
try {
const webScraperQueue = getWebScraperQueue();
const scrapeQueue = getScrapeQueue();
const [webScraperActive] = await Promise.all([
webScraperQueue.getActiveCount(),
scrapeQueue.getActiveCount(),
]);
const noActiveJobs = webScraperActive === 0;

View File

@ -1,11 +1,9 @@
import { Request, Response } from "express";
import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../../src/types";
import { addWebScraperJob } from "../../src/services/queue-jobs";
import { getWebScraperQueue } from "../../src/services/queue-service";
import { supabase_service } from "../../src/services/supabase";
import { billTeam } from "../../src/services/billing/credit_billing";
import { Logger } from "../../src/lib/logger";
import { getCrawl, saveCrawl } from "../../src/lib/crawl-redis";
export async function crawlCancelController(req: Request, res: Response) {
try {
@ -19,8 +17,9 @@ export async function crawlCancelController(req: Request, res: Response) {
if (!success) {
return res.status(status).json({ error });
}
const job = await getWebScraperQueue().getJob(req.params.jobId);
if (!job) {
const sc = await getCrawl(req.params.jobId);
if (!sc) {
return res.status(404).json({ error: "Job not found" });
}
@ -40,27 +39,13 @@ export async function crawlCancelController(req: Request, res: Response) {
}
}
const jobState = await job.getState();
const { partialDocs } = await job.progress();
if (partialDocs && partialDocs.length > 0 && jobState === "active") {
Logger.info("Billing team for partial docs...");
// Note: the credits that we will bill them here might be lower than the actual
// due to promises that are not yet resolved
await billTeam(team_id, partialDocs.length);
}
try {
await getWebScraperQueue().client.del(job.lockKey());
await job.takeLock();
await job.discard();
await job.moveToFailed(Error("Job cancelled by user"), true);
sc.cancelled = true;
await saveCrawl(req.params.jobId, sc);
} catch (error) {
Logger.error(error);
}
const newJobState = await job.getState();
res.json({
status: "cancelled"
});

View File

@ -1,10 +1,10 @@
import { Request, Response } from "express";
import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../../src/types";
import { addWebScraperJob } from "../../src/services/queue-jobs";
import { getWebScraperQueue } from "../../src/services/queue-service";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
import { getScrapeQueue } from "../../src/services/queue-service";
import { Logger } from "../../src/lib/logger";
import { getCrawl, getCrawlJobs } from "../../src/lib/crawl-redis";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
export async function crawlStatusController(req: Request, res: Response) {
try {
@ -16,33 +16,42 @@ export async function crawlStatusController(req: Request, res: Response) {
if (!success) {
return res.status(status).json({ error });
}
const job = await getWebScraperQueue().getJob(req.params.jobId);
if (!job) {
const sc = await getCrawl(req.params.jobId);
if (!sc) {
return res.status(404).json({ error: "Job not found" });
}
const { current, current_url, total, current_step, partialDocs } = await job.progress();
let data = job.returnvalue;
if (process.env.USE_DB_AUTHENTICATION === "true") {
const supabaseData = await supabaseGetJobById(req.params.jobId);
if (supabaseData) {
data = supabaseData.docs;
}
if (sc.team_id !== team_id) {
return res.status(403).json({ error: "Forbidden" });
}
const jobStatus = await job.getState();
const jobIDs = await getCrawlJobs(req.params.jobId);
const jobs = (await Promise.all(jobIDs.map(async x => {
const job = await getScrapeQueue().getJob(x);
if (process.env.USE_DB_AUTHENTICATION === "true") {
const supabaseData = await supabaseGetJobById(job.id);
if (supabaseData) {
job.returnvalue = supabaseData.docs;
}
}
return job;
}))).sort((a, b) => a.timestamp - b.timestamp);
const jobStatuses = await Promise.all(jobs.map(x => x.getState()));
const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active";
const data = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue);
res.json({
status: jobStatus,
// progress: job.progress(),
current,
current_url,
current_step,
total,
data: data ? data : null,
partial_data: jobStatus == 'completed' ? [] : partialDocs,
current: jobStatuses.filter(x => x === "completed" || x === "failed").length,
total: jobs.length,
data: jobStatus === "completed" ? data : null,
partial_data: jobStatus === "completed" ? [] : data.filter(x => x !== null),
});
} catch (error) {
Logger.error(error);

View File

@ -1,10 +1,8 @@
import { Request, Response } from "express";
import { WebScraperDataProvider } from "../../src/scraper/WebScraper";
import { billTeam } from "../../src/services/billing/credit_billing";
import { checkTeamCredits } from "../../src/services/billing/credit_billing";
import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../../src/types";
import { addWebScraperJob } from "../../src/services/queue-jobs";
import { addScrapeJob } from "../../src/services/queue-jobs";
import { isUrlBlocked } from "../../src/scraper/WebScraper/utils/blocklist";
import { logCrawl } from "../../src/services/logging/crawl_log";
import { validateIdempotencyKey } from "../../src/services/idempotency/validate";
@ -12,6 +10,9 @@ import { createIdempotencyKey } from "../../src/services/idempotency/create";
import { defaultCrawlPageOptions, defaultCrawlerOptions, defaultOrigin } from "../../src/lib/default-values";
import { v4 as uuidv4 } from "uuid";
import { Logger } from "../../src/lib/logger";
import { addCrawlJob, addCrawlJobs, crawlToCrawler, lockURL, lockURLs, saveCrawl, StoredCrawl } from "../../src/lib/crawl-redis";
import { getScrapeQueue } from "../../src/services/queue-service";
import { checkAndUpdateURL } from "../../src/lib/validateUrl";
export async function crawlController(req: Request, res: Response) {
try {
@ -43,10 +44,17 @@ export async function crawlController(req: Request, res: Response) {
return res.status(402).json({ error: "Insufficient credits" });
}
const url = req.body.url;
let url = req.body.url;
if (!url) {
return res.status(400).json({ error: "Url is required" });
}
try {
url = checkAndUpdateURL(url).url;
} catch (e) {
return res
.status(e instanceof Error && e.message === "Invalid URL" ? 400 : 500)
.json({ error: e.message ?? e });
}
if (isUrlBlocked(url)) {
return res
@ -62,47 +70,100 @@ export async function crawlController(req: Request, res: Response) {
const crawlerOptions = { ...defaultCrawlerOptions, ...req.body.crawlerOptions };
const pageOptions = { ...defaultCrawlPageOptions, ...req.body.pageOptions };
if (mode === "single_urls" && !url.includes(",")) { // NOTE: do we need this?
try {
const a = new WebScraperDataProvider();
await a.setOptions({
jobId: uuidv4(),
mode: "single_urls",
urls: [url],
crawlerOptions: { ...crawlerOptions, returnOnlyUrls: true },
pageOptions: pageOptions,
});
// if (mode === "single_urls" && !url.includes(",")) { // NOTE: do we need this?
// try {
// const a = new WebScraperDataProvider();
// await a.setOptions({
// jobId: uuidv4(),
// mode: "single_urls",
// urls: [url],
// crawlerOptions: { ...crawlerOptions, returnOnlyUrls: true },
// pageOptions: pageOptions,
// });
const docs = await a.getDocuments(false, (progress) => {
job.progress({
current: progress.current,
total: progress.total,
current_step: "SCRAPING",
current_url: progress.currentDocumentUrl,
});
});
return res.json({
success: true,
documents: docs,
});
} catch (error) {
Logger.error(error);
return res.status(500).json({ error: error.message });
}
// const docs = await a.getDocuments(false, (progress) => {
// job.updateProgress({
// current: progress.current,
// total: progress.total,
// current_step: "SCRAPING",
// current_url: progress.currentDocumentUrl,
// });
// });
// return res.json({
// success: true,
// documents: docs,
// });
// } catch (error) {
// Logger.error(error);
// return res.status(500).json({ error: error.message });
// }
// }
const id = uuidv4();
await logCrawl(id, team_id);
const sc: StoredCrawl = {
originUrl: url,
crawlerOptions,
pageOptions,
team_id,
createdAt: Date.now(),
};
const crawler = crawlToCrawler(id, sc);
try {
sc.robots = await crawler.getRobotsTxt();
} catch (_) {}
await saveCrawl(id, sc);
const sitemap = sc.crawlerOptions?.ignoreSitemap ? null : await crawler.tryGetSitemap();
if (sitemap !== null) {
const jobs = sitemap.map(x => {
const url = x.url;
const uuid = uuidv4();
return {
name: uuid,
data: {
url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
team_id: team_id,
pageOptions: pageOptions,
origin: req.body.origin ?? defaultOrigin,
crawl_id: id,
sitemapped: true,
},
opts: {
jobId: uuid,
priority: 20,
}
};
})
await lockURLs(id, jobs.map(x => x.data.url));
await addCrawlJobs(id, jobs.map(x => x.opts.jobId));
await getScrapeQueue().addBulk(jobs);
} else {
await lockURL(id, sc, url);
const job = await addScrapeJob({
url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
team_id: team_id,
pageOptions: pageOptions,
origin: req.body.origin ?? defaultOrigin,
crawl_id: id,
}, {
priority: 15, // prioritize request 0 of crawl jobs same as scrape jobs
});
await addCrawlJob(id, job.id);
}
const job = await addWebScraperJob({
url: url,
mode: mode ?? "crawl", // fix for single urls not working
crawlerOptions: crawlerOptions,
team_id: team_id,
pageOptions: pageOptions,
origin: req.body.origin ?? defaultOrigin,
});
await logCrawl(job.id.toString(), team_id);
res.json({ jobId: job.id });
res.json({ jobId: id });
} catch (error) {
Logger.error(error);
return res.status(500).json({ error: error.message });

View File

@ -1,44 +1,125 @@
import { Request, Response } from "express";
import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../../src/types";
import { addWebScraperJob } from "../../src/services/queue-jobs";
import { isUrlBlocked } from "../../src/scraper/WebScraper/utils/blocklist";
import { v4 as uuidv4 } from "uuid";
import { Logger } from "../../src/lib/logger";
import { addCrawlJob, crawlToCrawler, lockURL, saveCrawl, StoredCrawl } from "../../src/lib/crawl-redis";
import { addScrapeJob } from "../../src/services/queue-jobs";
export async function crawlPreviewController(req: Request, res: Response) {
try {
const { success, team_id, error, status } = await authenticateUser(
const { success, error, status } = await authenticateUser(
req,
res,
RateLimiterMode.Preview
);
const team_id = "preview";
if (!success) {
return res.status(status).json({ error });
}
// authenticate on supabase
const url = req.body.url;
if (!url) {
return res.status(400).json({ error: "Url is required" });
}
if (isUrlBlocked(url)) {
return res.status(403).json({ error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it." });
return res
.status(403)
.json({
error:
"Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.",
});
}
const mode = req.body.mode ?? "crawl";
const crawlerOptions = req.body.crawlerOptions ?? {};
const pageOptions = req.body.pageOptions ?? { onlyMainContent: false, includeHtml: false, removeTags: [] };
const job = await addWebScraperJob({
url: url,
mode: mode ?? "crawl", // fix for single urls not working
crawlerOptions: { ...crawlerOptions, limit: 5, maxCrawledLinks: 5 },
team_id: "preview",
pageOptions: pageOptions,
origin: "website-preview",
});
// if (mode === "single_urls" && !url.includes(",")) { // NOTE: do we need this?
// try {
// const a = new WebScraperDataProvider();
// await a.setOptions({
// jobId: uuidv4(),
// mode: "single_urls",
// urls: [url],
// crawlerOptions: { ...crawlerOptions, returnOnlyUrls: true },
// pageOptions: pageOptions,
// });
res.json({ jobId: job.id });
// const docs = await a.getDocuments(false, (progress) => {
// job.updateProgress({
// current: progress.current,
// total: progress.total,
// current_step: "SCRAPING",
// current_url: progress.currentDocumentUrl,
// });
// });
// return res.json({
// success: true,
// documents: docs,
// });
// } catch (error) {
// Logger.error(error);
// return res.status(500).json({ error: error.message });
// }
// }
const id = uuidv4();
let robots;
try {
robots = await this.getRobotsTxt();
} catch (_) {}
const sc: StoredCrawl = {
originUrl: url,
crawlerOptions,
pageOptions,
team_id,
robots,
createdAt: Date.now(),
};
await saveCrawl(id, sc);
const crawler = crawlToCrawler(id, sc);
const sitemap = sc.crawlerOptions?.ignoreSitemap ? null : await crawler.tryGetSitemap();
if (sitemap !== null) {
for (const url of sitemap.map(x => x.url)) {
await lockURL(id, sc, url);
const job = await addScrapeJob({
url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
team_id: team_id,
pageOptions: pageOptions,
origin: "website-preview",
crawl_id: id,
sitemapped: true,
});
await addCrawlJob(id, job.id);
}
} else {
await lockURL(id, sc, url);
const job = await addScrapeJob({
url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
team_id: team_id,
pageOptions: pageOptions,
origin: "website-preview",
crawl_id: id,
});
await addCrawlJob(id, job.id);
}
res.json({ jobId: id });
} catch (error) {
Logger.error(error);
return res.status(500).json({ error: error.message });

View File

@ -1,6 +1,5 @@
import { ExtractorOptions, PageOptions } from './../lib/entities';
import { Request, Response } from "express";
import { WebScraperDataProvider } from "../scraper/WebScraper";
import { billTeam, checkTeamCredits } from "../services/billing/credit_billing";
import { authenticateUser } from "./auth";
import { RateLimiterMode } from "../types";
@ -9,6 +8,8 @@ import { Document } from "../lib/entities";
import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; // Import the isUrlBlocked function
import { numTokensFromString } from '../lib/LLM-extraction/helpers';
import { defaultPageOptions, defaultExtractorOptions, defaultTimeout, defaultOrigin } from '../lib/default-values';
import { addScrapeJob } from '../services/queue-jobs';
import { scrapeQueueEvents } from '../services/queue-service';
import { v4 as uuidv4 } from "uuid";
import { Logger } from '../lib/logger';
@ -36,50 +37,49 @@ export async function scrapeHelper(
return { success: false, error: "Firecrawl currently does not support social media scraping due to policy restrictions. We're actively working on building support for it.", returnCode: 403 };
}
const a = new WebScraperDataProvider();
await a.setOptions({
jobId,
const job = await addScrapeJob({
url,
mode: "single_urls",
urls: [url],
crawlerOptions: {
...crawlerOptions,
},
pageOptions: pageOptions,
extractorOptions: extractorOptions,
});
crawlerOptions,
team_id,
pageOptions,
extractorOptions,
origin: req.body.origin ?? defaultOrigin,
}, {}, jobId);
const timeoutPromise = new Promise<{ success: boolean; error?: string; returnCode: number }>((_, reject) =>
setTimeout(() => reject({ success: false, error: "Request timed out. Increase the timeout by passing `timeout` param to the request.", returnCode: 408 }), timeout)
);
const docsPromise = a.getDocuments(false);
let docs;
let doc;
try {
docs = await Promise.race([docsPromise, timeoutPromise]);
} catch (error) {
return error;
doc = (await job.waitUntilFinished(scrapeQueueEvents, timeout))[0]; //60 seconds timeout
} catch (e) {
if (e instanceof Error && e.message.startsWith("Job wait")) {
return {
success: false,
error: "Request timed out",
returnCode: 408,
}
} else {
throw e;
}
}
// make sure doc.content is not empty
let filteredDocs = docs.filter(
(doc: { content?: string }) => doc.content && doc.content.trim().length > 0
);
if (filteredDocs.length === 0) {
return { success: true, error: "No page found", returnCode: 200, data: docs[0] };
await job.remove();
if (!doc) {
console.error("!!! PANIC DOC IS", doc, job);
return { success: true, error: "No page found", returnCode: 200, data: doc };
}
delete doc.index;
delete doc.provider;
// Remove rawHtml if pageOptions.rawHtml is false and extractorOptions.mode is llm-extraction-from-raw-html
if (!pageOptions.includeRawHtml && extractorOptions.mode == "llm-extraction-from-raw-html") {
filteredDocs.forEach(doc => {
delete doc.rawHtml;
});
delete doc.rawHtml;
}
return {
success: true,
data: filteredDocs[0],
data: doc,
returnCode: 200,
};
}
@ -143,7 +143,7 @@ export async function scrapeController(req: Request, res: Response) {
const numTokens = (result.data && result.data.markdown) ? numTokensFromString(result.data.markdown, "gpt-3.5-turbo") : 0;
if (result.success) {
let creditsToBeBilled = 1; // Assuming 1 credit per document
let creditsToBeBilled = 0; // billing for doc done on queue end
const creditsPerLLMExtract = 50;
if (extractorOptions.mode.includes("llm-extraction")) {

View File

@ -9,6 +9,7 @@ import { search } from "../search";
import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist";
import { v4 as uuidv4 } from "uuid";
import { Logger } from "../lib/logger";
import { getScrapeQueue, scrapeQueueEvents } from "../services/queue-service";
export async function searchHelper(
jobId: string,
@ -75,26 +76,28 @@ export async function searchHelper(
// filter out social media links
const jobDatas = res.map(x => {
const url = x.url;
const uuid = uuidv4();
return {
name: uuid,
data: {
url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
team_id: team_id,
pageOptions: pageOptions,
},
opts: {
jobId: uuid,
priority: 10,
}
};
})
const jobs = await getScrapeQueue().addBulk(jobDatas);
const a = new WebScraperDataProvider();
await a.setOptions({
jobId,
mode: "single_urls",
urls: res.map((r) => r.url).slice(0, Math.min(searchOptions.limit ?? 5, 5)),
crawlerOptions: {
...crawlerOptions,
},
pageOptions: {
...pageOptions,
onlyMainContent: pageOptions?.onlyMainContent ?? true,
fetchPageContent: pageOptions?.fetchPageContent ?? true,
includeHtml: pageOptions?.includeHtml ?? false,
removeTags: pageOptions?.removeTags ?? [],
fallback: false,
},
});
const docs = await a.getDocuments(false);
const docs = (await Promise.all(jobs.map(x => x.waitUntilFinished(scrapeQueueEvents, 60000)))).map(x => x[0]);
if (docs.length === 0) {
return { success: true, error: "No search results found", returnCode: 200 };
@ -109,19 +112,6 @@ export async function searchHelper(
return { success: true, error: "No page found", returnCode: 200, data: docs };
}
const billingResult = await billTeam(
team_id,
filteredDocs.length
);
if (!billingResult.success) {
return {
success: false,
error:
"Failed to bill team. Insufficient credits or subscription not found.",
returnCode: 402,
};
}
return {
success: true,
data: filteredDocs,

View File

@ -1,39 +1,51 @@
import { Request, Response } from "express";
import { getWebScraperQueue } from "../../src/services/queue-service";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
import { Logger } from "../../src/lib/logger";
import { getCrawl, getCrawlJobs } from "../../src/lib/crawl-redis";
import { getScrapeQueue } from "../../src/services/queue-service";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
export async function crawlJobStatusPreviewController(req: Request, res: Response) {
try {
const job = await getWebScraperQueue().getJob(req.params.jobId);
if (!job) {
const sc = await getCrawl(req.params.jobId);
if (!sc) {
return res.status(404).json({ error: "Job not found" });
}
const { current, current_url, total, current_step, partialDocs } = await job.progress();
let data = job.returnvalue;
if (process.env.USE_DB_AUTHENTICATION === "true") {
const supabaseData = await supabaseGetJobById(req.params.jobId);
const jobIDs = await getCrawlJobs(req.params.jobId);
if (supabaseData) {
data = supabaseData.docs;
// let data = job.returnvalue;
// if (process.env.USE_DB_AUTHENTICATION === "true") {
// const supabaseData = await supabaseGetJobById(req.params.jobId);
// if (supabaseData) {
// data = supabaseData.docs;
// }
// }
const jobs = (await Promise.all(jobIDs.map(async x => {
const job = await getScrapeQueue().getJob(x);
if (process.env.USE_DB_AUTHENTICATION === "true") {
const supabaseData = await supabaseGetJobById(job.id);
if (supabaseData) {
job.returnvalue = supabaseData.docs;
}
}
}
let jobStatus = await job.getState();
if (jobStatus === 'waiting' || jobStatus === 'stuck') {
jobStatus = 'active';
}
return job;
}))).sort((a, b) => a.timestamp - b.timestamp);
const jobStatuses = await Promise.all(jobs.map(x => x.getState()));
const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active";
const data = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue);
res.json({
status: jobStatus,
// progress: job.progress(),
current,
current_url,
current_step,
total,
data: data ? data : null,
partial_data: jobStatus == 'completed' ? [] : partialDocs,
current: jobStatuses.filter(x => x === "completed" || x === "failed").length,
total: jobs.length,
data: jobStatus === "completed" ? data : null,
partial_data: jobStatus === "completed" ? [] : data.filter(x => x !== null),
});
} catch (error) {
Logger.error(error);

View File

@ -2,7 +2,7 @@ import express from "express";
import bodyParser from "body-parser";
import cors from "cors";
import "dotenv/config";
import { getWebScraperQueue } from "./services/queue-service";
import { getScrapeQueue } from "./services/queue-service";
import { v0Router } from "./routes/v0";
import { initSDK } from "@hyperdx/node-opentelemetry";
import cluster from "cluster";
@ -58,7 +58,7 @@ if (cluster.isMaster) {
serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`);
const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
queues: [new BullAdapter(getWebScraperQueue())],
queues: [new BullAdapter(getScrapeQueue())],
serverAdapter: serverAdapter,
});
@ -104,9 +104,9 @@ if (cluster.isMaster) {
app.get(`/serverHealthCheck`, async (req, res) => {
try {
const webScraperQueue = getWebScraperQueue();
const scrapeQueue = getScrapeQueue();
const [waitingJobs] = await Promise.all([
webScraperQueue.getWaitingCount(),
scrapeQueue.getWaitingCount(),
]);
const noWaitingJobs = waitingJobs === 0;
@ -126,9 +126,9 @@ if (cluster.isMaster) {
const timeout = 60000; // 1 minute // The timeout value for the check in milliseconds
const getWaitingJobsCount = async () => {
const webScraperQueue = getWebScraperQueue();
const scrapeQueue = getScrapeQueue();
const [waitingJobsCount] = await Promise.all([
webScraperQueue.getWaitingCount(),
scrapeQueue.getWaitingCount(),
]);
return waitingJobsCount;
@ -181,13 +181,12 @@ if (cluster.isMaster) {
Logger.info(`Worker ${process.pid} started`);
}
const wsq = getWebScraperQueue();
wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));
// const sq = getScrapeQueue();
// sq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
// sq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
// sq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
// sq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
// sq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
// sq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));

View File

@ -0,0 +1,91 @@
import { WebCrawler } from "../scraper/WebScraper/crawler";
import { redisConnection } from "../services/queue-service";
export type StoredCrawl = {
originUrl: string;
crawlerOptions: any;
pageOptions: any;
team_id: string;
robots?: string;
cancelled?: boolean;
createdAt: number;
};
export async function saveCrawl(id: string, crawl: StoredCrawl) {
await redisConnection.set("crawl:" + id, JSON.stringify(crawl));
await redisConnection.expire("crawl:" + id, 24 * 60 * 60, "NX");
}
export async function getCrawl(id: string): Promise<StoredCrawl | null> {
const x = await redisConnection.get("crawl:" + id);
if (x === null) {
return null;
}
return JSON.parse(x);
}
export async function addCrawlJob(id: string, job_id: string) {
await redisConnection.sadd("crawl:" + id + ":jobs", job_id);
await redisConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60, "NX");
}
export async function addCrawlJobs(id: string, job_ids: string[]) {
await redisConnection.sadd("crawl:" + id + ":jobs", ...job_ids);
await redisConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60, "NX");
}
export async function addCrawlJobDone(id: string, job_id: string) {
await redisConnection.sadd("crawl:" + id + ":jobs_done", job_id);
await redisConnection.expire("crawl:" + id + ":jobs_done", 24 * 60 * 60, "NX");
}
export async function isCrawlFinished(id: string) {
return (await redisConnection.scard("crawl:" + id + ":jobs_done")) === (await redisConnection.scard("crawl:" + id + ":jobs"));
}
export async function getCrawlJobs(id: string): Promise<string[]> {
return await redisConnection.smembers("crawl:" + id + ":jobs");
}
export async function lockURL(id: string, sc: StoredCrawl, url: string): Promise<boolean> {
if (typeof sc.crawlerOptions?.limit === "number") {
if (await redisConnection.scard("crawl:" + id + ":visited") >= sc.crawlerOptions.limit) {
return false;
}
}
const res = (await redisConnection.sadd("crawl:" + id + ":visited", url)) !== 0
await redisConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60, "NX");
return res;
}
/// NOTE: does not check limit. only use if limit is checked beforehand e.g. with sitemap
export async function lockURLs(id: string, urls: string[]): Promise<boolean> {
const res = (await redisConnection.sadd("crawl:" + id + ":visited", ...urls)) !== 0
await redisConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60, "NX");
return res;
}
export function crawlToCrawler(id: string, sc: StoredCrawl): WebCrawler {
const crawler = new WebCrawler({
jobId: id,
initialUrl: sc.originUrl,
includes: sc.crawlerOptions?.includes ?? [],
excludes: sc.crawlerOptions?.excludes ?? [],
maxCrawledLinks: sc.crawlerOptions?.maxCrawledLinks ?? 1000,
maxCrawledDepth: sc.crawlerOptions?.maxDepth ?? 10,
limit: sc.crawlerOptions?.limit ?? 10000,
generateImgAltText: sc.crawlerOptions?.generateImgAltText ?? false,
allowBackwardCrawling: sc.crawlerOptions?.allowBackwardCrawling ?? false,
allowExternalContentLinks: sc.crawlerOptions?.allowExternalContentLinks ?? false,
});
if (sc.robots !== undefined) {
try {
crawler.importRobotsTxt(sc.robots);
} catch (_) {}
}
return crawler;
}

View File

@ -65,6 +65,7 @@ export type WebScraperOptions = {
extractorOptions?: ExtractorOptions;
concurrentRequests?: number;
bullJobId?: string;
priority?: number;
};
export interface DocumentUrl {

View File

@ -1,4 +1,4 @@
import { Job, JobId } from "bull";
import { Job } from "bullmq";
import type { baseScrapers } from "../scraper/WebScraper/single_url";
import { supabase_service as supabase } from "../services/supabase";
import { Logger } from "./logger";
@ -70,7 +70,7 @@ export class ScrapeEvents {
}
}
static async logJobEvent(job: Job | JobId, event: ScrapeQueueEvent["event"]) {
static async logJobEvent(job: Job | any, event: ScrapeQueueEvent["event"]) {
try {
await this.insert(((job as any).id ? (job as any).id : job) as string, {
type: "queue",

View File

@ -0,0 +1,38 @@
const protocolIncluded = (url: string) => {
// if :// not in the start of the url assume http (maybe https?)
// regex checks if :// appears before any .
return(/^([^.:]+:\/\/)/.test(url));
}
const getURLobj = (s: string) => {
// URL fails if we dont include the protocol ie google.com
let error = false;
let urlObj = {};
try {
urlObj = new URL(s);
} catch (err) {
error = true;
}
return { error, urlObj };
};
export const checkAndUpdateURL = (url: string) => {
if (!protocolIncluded(url)) {
url = `http://${url}`;
}
const { error, urlObj } = getURLobj(url);
if (error) {
throw new Error("Invalid URL");
}
const typedUrlObj = urlObj as URL;
if(typedUrlObj.protocol !== "http:" && typedUrlObj.protocol !== "https:") {
throw new Error("Invalid URL");
}
return { urlObj: typedUrlObj, url: url };
}

View File

@ -1,4 +1,4 @@
import { Job } from "bull";
import { Job } from "bullmq";
import {
CrawlResult,
WebScraperOptions,
@ -12,17 +12,21 @@ import { Document } from "../lib/entities";
import { supabase_service } from "../services/supabase";
import { Logger } from "../lib/logger";
import { ScrapeEvents } from "../lib/scrape-events";
import { getScrapeQueue } from "../services/queue-service";
export async function startWebScraperPipeline({
job,
token,
}: {
job: Job<WebScraperOptions>;
token: string;
}) {
let partialDocs: Document[] = [];
return (await runWebScraper({
url: job.data.url,
mode: job.data.mode,
crawlerOptions: job.data.crawlerOptions,
extractorOptions: job.data.extractorOptions,
pageOptions: job.data.pageOptions,
inProgress: (progress) => {
Logger.debug(`🐂 Job in progress ${job.id}`);
@ -31,20 +35,21 @@ export async function startWebScraperPipeline({
if (partialDocs.length > 50) {
partialDocs = partialDocs.slice(-50);
}
job.progress({ ...progress, partialDocs: partialDocs });
// job.updateProgress({ ...progress, partialDocs: partialDocs });
}
},
onSuccess: (result) => {
onSuccess: (result, mode) => {
Logger.debug(`🐂 Job completed ${job.id}`);
saveJob(job, result);
saveJob(job, result, token, mode);
},
onError: (error) => {
Logger.error(`🐂 Job failed ${job.id}`);
ScrapeEvents.logJobEvent(job, "failed");
job.moveToFailed(error);
job.moveToFailed(error, token, false);
},
team_id: job.data.team_id,
bull_job_id: job.id.toString(),
priority: job.opts.priority,
})) as { success: boolean; message: string; docs: Document[] };
}
export async function runWebScraper({
@ -52,11 +57,13 @@ export async function runWebScraper({
mode,
crawlerOptions,
pageOptions,
extractorOptions,
inProgress,
onSuccess,
onError,
team_id,
bull_job_id,
priority,
}: RunWebScraperParams): Promise<RunWebScraperResult> {
try {
const provider = new WebScraperDataProvider();
@ -65,17 +72,21 @@ export async function runWebScraper({
jobId: bull_job_id,
mode: mode,
urls: [url],
extractorOptions,
crawlerOptions: crawlerOptions,
pageOptions: pageOptions,
bullJobId: bull_job_id,
priority,
});
} else {
await provider.setOptions({
jobId: bull_job_id,
mode: mode,
urls: url.split(","),
extractorOptions,
crawlerOptions: crawlerOptions,
pageOptions: pageOptions,
priority,
});
}
const docs = (await provider.getDocuments(false, (progress: Progress) => {
@ -97,8 +108,8 @@ export async function runWebScraper({
return { url: doc.metadata.sourceURL };
}
})
: docs.filter((doc) => doc.content.trim().length > 0);
: docs;
const billingResult = await billTeam(team_id, filteredDocs.length);
if (!billingResult.success) {
@ -111,7 +122,7 @@ export async function runWebScraper({
}
// This is where the returnvalue from the job is set
onSuccess(filteredDocs);
onSuccess(filteredDocs, mode);
// this return doesn't matter too much for the job completion result
return { success: true, message: "", docs: filteredDocs };
@ -121,7 +132,7 @@ export async function runWebScraper({
}
}
const saveJob = async (job: Job, result: any) => {
const saveJob = async (job: Job, result: any, token: string, mode: string) => {
try {
if (process.env.USE_DB_AUTHENTICATION === "true") {
const { data, error } = await supabase_service
@ -130,17 +141,21 @@ const saveJob = async (job: Job, result: any) => {
.eq("job_id", job.id);
if (error) throw new Error(error.message);
try {
await job.moveToCompleted(null, false, false);
} catch (error) {
// I think the job won't exist here anymore
}
} else {
try {
await job.moveToCompleted(result, false, false);
} catch (error) {
// I think the job won't exist here anymore
}
// try {
// if (mode === "crawl") {
// await job.moveToCompleted(null, token, false);
// } else {
// await job.moveToCompleted(result, token, false);
// }
// } catch (error) {
// // I think the job won't exist here anymore
// }
// } else {
// try {
// await job.moveToCompleted(result, token, false);
// } catch (error) {
// // I think the job won't exist here anymore
// }
}
ScrapeEvents.logJobEvent(job, "completed");
} catch (error) {

175
apps/api/src/run-req.ts Normal file
View File

@ -0,0 +1,175 @@
import axios from "axios";
import { promises as fs } from "fs";
import { v4 as uuidV4 } from "uuid";
interface Result {
start_url: string;
job_id?: string;
idempotency_key?: string;
result_data_jsonb?: any;
}
async function sendCrawl(result: Result): Promise<string | undefined> {
const idempotencyKey = uuidV4();
const url = result.start_url;
try {
const response = await axios.post(
"https://staging-firecrawl-scraper-js.fly.dev/v0/crawl",
{
url: url,
crawlerOptions: {
limit: 75,
},
pageOptions: {
includeHtml: true,
replaceAllPathsWithAbsolutePaths: true,
waitFor: 1000,
},
},
{
headers: {
"Content-Type": "application/json",
Authorization: `Bearer `,
},
}
);
result.idempotency_key = idempotencyKey;
return response.data.jobId;
} catch (error) {
console.error("Error sending crawl:", error);
return undefined;
}
}
async function getContent(result: Result): Promise<boolean> {
let attempts = 0;
while (attempts < 120) {
// Reduce the number of attempts to speed up
try {
const response = await axios.get(
`https://staging-firecrawl-scraper-js.fly.dev/v0/crawl/status/${result.job_id}`,
{
headers: {
"Content-Type": "application/json",
Authorization: `Bearer `,
},
}
);
if (response.data.status === "completed") {
result.result_data_jsonb = response.data.data;
// Job actually completed
return true;
}
} catch (error) {
console.error("Error getting content:", error);
}
const randomSleep = Math.floor(Math.random() * 15000) + 5000;
await new Promise((resolve) => setTimeout(resolve, randomSleep)); // Reduce sleep time to 1.5 seconds
attempts++;
}
// Set result as null if timed out
result.result_data_jsonb = null;
return false;
}
async function processResults(results: Result[]): Promise<void> {
let processedCount = 0;
let starterCount = 0;
const queue: Result[] = [];
const processedUrls = new Set<string>();
// Initialize the queue with the first 1000 results
for (let i = 0; i < Math.min(100, results.length); i++) {
queue.push(results[i]);
processedUrls.add(results[i].start_url);
}
// Function to process a single result
const processSingleResult = async (result: Result) => {
const jobId = await sendCrawl(result);
if (jobId) {
console.log(`Job requested count: ${starterCount}`);
starterCount++;
result.job_id = jobId;
processedCount++;
// Save the result to the file
try {
// Save job id along with the start_url
const resultWithJobId = results.map(r => ({
start_url: r.start_url,
job_id: r.job_id,
}));
await fs.writeFile(
"results_with_job_id_4000_6000.json",
JSON.stringify(resultWithJobId, null, 4)
);
} catch (error) {
console.error("Error writing to results_with_content.json:", error);
}
// Add a new result to the queue if there are more results to process
// if (processedCount < results.length) {
// for (let i = queue.length; i < results.length; i++) {
// if (!processedUrls.has(results[i].start_url)) {
// const nextResult = results[i];
// console.log("Next result:", nextResult.start_url);
// queue.push(nextResult);
// processedUrls.add(nextResult.start_url);
// console.log(`Queue length: ${queue.length}`);
// processSingleResult(nextResult);
// break;
// }
// }
// }
}
};
// Start processing the initial queue concurrently
// for (let i = 0; i < queue.length; i++) {
// processSingleResult(queue[i]);
// if ((i + 1) % 500 === 0) {
// console.log(`Processed ${i + 1} results, waiting for 1 minute before adding the next batch...`);
// await new Promise(resolve => setTimeout(resolve, 60 * 1000)); // Wait for 1 minute
// }
// }
// Start processing the initial queue concurrently
// await Promise.all(queue.map(result => processSingleResult(result)));
for (let i = 0; i < results.length; i += 100) {
const batch = results.slice(i, i + 100);
Promise.all(batch.map((result) => processSingleResult(result)))
.then(() => {
console.log(`Processed ${i + 100} results.`);
})
.catch((error) => {
console.error(`Error processing batch starting at index ${i}:`, error);
});
await new Promise((resolve) => setTimeout(resolve, 60 * 1000)); // Wait for 1 minute
}
}
// Example call
async function getStartUrls(): Promise<Result[]> {
try {
const data = await fs.readFile("starturls.json", "utf-8");
return JSON.parse(data);
} catch (error) {
console.error("Error reading starturls.json:", error);
return [];
}
}
async function main() {
const results: Result[] = (await getStartUrls()).slice(3999, 6000);
// console.log(results.map((r) => r.start_url).slice(0, 3));
processResults(results)
.then(() => {
console.log("All results processed.");
})
.catch((error) => {
console.error("Error processing results:", error);
});
}
main();

View File

@ -24,7 +24,7 @@ describe('scrapSingleUrl', () => {
});
it('should return a list of links on the firecrawl.ai page', async () => {
const url = 'https://example.com';
const url = 'https://flutterbricks.com';
const pageOptions: PageOptions = { includeHtml: true };
const result = await scrapSingleUrl("TEST", url, pageOptions);
@ -33,5 +33,5 @@ it('should return a list of links on the firecrawl.ai page', async () => {
expect(result.linksOnPage).toBeDefined();
expect(Array.isArray(result.linksOnPage)).toBe(true);
expect(result.linksOnPage.length).toBeGreaterThan(0);
expect(result.linksOnPage).toContain('https://www.iana.org/domains/example')
expect(result.linksOnPage).toContain('https://flutterbricks.com/features')
}, 10000);

View File

@ -1,4 +1,4 @@
import axios from "axios";
import axios, { AxiosError } from "axios";
import cheerio, { load } from "cheerio";
import { URL } from "url";
import { getLinksFromSitemap } from "./sitemap";
@ -22,7 +22,7 @@ export class WebCrawler {
private crawledUrls: Map<string, string> = new Map();
private limit: number;
private robotsTxtUrl: string;
private robots: any;
public robots: any;
private generateImgAltText: boolean;
private allowBackwardCrawling: boolean;
private allowExternalContentLinks: boolean;
@ -66,7 +66,7 @@ export class WebCrawler {
this.allowExternalContentLinks = allowExternalContentLinks ?? false;
}
private filterLinks(sitemapLinks: string[], limit: number, maxDepth: number): string[] {
public filterLinks(sitemapLinks: string[], limit: number, maxDepth: number): string[] {
return sitemapLinks
.filter((link) => {
const url = new URL(link.trim(), this.baseUrl);
@ -130,6 +130,25 @@ export class WebCrawler {
.slice(0, limit);
}
public async getRobotsTxt(): Promise<string> {
const response = await axios.get(this.robotsTxtUrl, { timeout: axiosTimeout });
return response.data;
}
public importRobotsTxt(txt: string) {
this.robots = robotsParser(this.robotsTxtUrl, txt);
}
public async tryGetSitemap(): Promise<{ url: string; html: string; }[] | null> {
Logger.debug(`Fetching sitemap links from ${this.initialUrl}`);
const sitemapLinks = await this.tryFetchSitemapLinks(this.initialUrl);
if (sitemapLinks.length > 0) {
let filteredLinks = this.filterLinks(sitemapLinks, this.limit, this.maxCrawledDepth);
return filteredLinks.map(link => ({ url: link, html: "" }));
}
return null;
}
public async start(
inProgress?: (progress: Progress) => void,
pageOptions?: PageOptions,
@ -142,19 +161,17 @@ export class WebCrawler {
Logger.debug(`Crawler starting with ${this.initialUrl}`);
// Fetch and parse robots.txt
try {
const response = await axios.get(this.robotsTxtUrl, { timeout: axiosTimeout });
this.robots = robotsParser(this.robotsTxtUrl, response.data);
const txt = await this.getRobotsTxt();
this.importRobotsTxt(txt);
Logger.debug(`Crawler robots.txt fetched with ${this.robotsTxtUrl}`);
} catch (error) {
Logger.debug(`Failed to fetch robots.txt from ${this.robotsTxtUrl}`);
}
if (!crawlerOptions?.ignoreSitemap){
Logger.debug(`Fetching sitemap links from ${this.initialUrl}`);
const sitemapLinks = await this.tryFetchSitemapLinks(this.initialUrl);
if (sitemapLinks.length > 0) {
let filteredLinks = this.filterLinks(sitemapLinks, limit, maxDepth);
return filteredLinks.map(link => ({ url: link, html: "" }));
const sm = await this.tryGetSitemap();
if (sm !== null) {
return sm;
}
}
@ -241,6 +258,37 @@ export class WebCrawler {
return Array.from(this.crawledUrls.entries()).map(([url, html]) => ({ url, html }));
}
public filterURL(href: string, url: string): string | null {
let fullUrl = href;
if (!href.startsWith("http")) {
fullUrl = new URL(href, this.baseUrl).toString();
}
const urlObj = new URL(fullUrl);
const path = urlObj.pathname;
if (this.isInternalLink(fullUrl)) { // INTERNAL LINKS
if (this.isInternalLink(fullUrl) &&
this.noSections(fullUrl) &&
!this.matchesExcludes(path) &&
this.isRobotsAllowed(fullUrl)
) {
return fullUrl;
}
} else { // EXTERNAL LINKS
if (
this.isInternalLink(url) &&
this.allowExternalContentLinks &&
!this.isSocialMediaOrEmail(fullUrl) &&
!this.matchesExcludes(fullUrl, true) &&
!this.isExternalMainPage(fullUrl)
) {
return fullUrl;
}
}
return null;
}
async crawl(url: string, pageOptions: PageOptions): Promise<{url: string, html: string, pageStatusCode?: number, pageError?: string}[]> {
if (this.visited.has(url) || !this.robots.isAllowed(url, "FireCrawlAgent")) {
return [];
@ -287,31 +335,9 @@ export class WebCrawler {
$("a").each((_, element) => {
const href = $(element).attr("href");
if (href) {
let fullUrl = href;
if (!href.startsWith("http")) {
fullUrl = new URL(href, this.baseUrl).toString();
}
const urlObj = new URL(fullUrl);
const path = urlObj.pathname;
if (this.isInternalLink(fullUrl)) { // INTERNAL LINKS
if (this.isInternalLink(fullUrl) &&
this.noSections(fullUrl) &&
!this.matchesExcludes(path) &&
this.isRobotsAllowed(fullUrl)
) {
links.push({ url: fullUrl, html: content, pageStatusCode, pageError });
}
} else { // EXTERNAL LINKS
if (
this.isInternalLink(url) &&
this.allowExternalContentLinks &&
!this.isSocialMediaOrEmail(fullUrl) &&
!this.matchesExcludes(fullUrl, true) &&
!this.isExternalMainPage(fullUrl)
) {
links.push({ url: fullUrl, html: content, pageStatusCode, pageError });
}
const u = this.filterURL(href, url);
if (u !== null) {
links.push({ url: u, html: content, pageStatusCode, pageError });
}
}
});
@ -465,9 +491,13 @@ export class WebCrawler {
}
} catch (error) {
Logger.debug(`Failed to fetch sitemap with axios from ${sitemapUrl}: ${error}`);
const response = await getLinksFromSitemap({ sitemapUrl, mode: 'fire-engine' });
if (response) {
sitemapLinks = response;
if (error instanceof AxiosError && error.response?.status === 404) {
// ignore 404
} else {
const response = await getLinksFromSitemap({ sitemapUrl, mode: 'fire-engine' });
if (response) {
sitemapLinks = response;
}
}
}
@ -480,7 +510,11 @@ export class WebCrawler {
}
} catch (error) {
Logger.debug(`Failed to fetch sitemap from ${baseUrlSitemap}: ${error}`);
sitemapLinks = await getLinksFromSitemap({ sitemapUrl: baseUrlSitemap, mode: 'fire-engine' });
if (error instanceof AxiosError && error.response?.status === 404) {
// ignore 404
} else {
sitemapLinks = await getLinksFromSitemap({ sitemapUrl: baseUrlSitemap, mode: 'fire-engine' });
}
}
}

View File

@ -16,7 +16,7 @@ import {
replacePathsWithAbsolutePaths,
} from "./utils/replacePaths";
import { generateCompletions } from "../../lib/LLM-extraction";
import { getWebScraperQueue } from "../../../src/services/queue-service";
import { getScrapeQueue } from "../../../src/services/queue-service";
import { fetchAndProcessDocx } from "./utils/docxProcessor";
import { getAdjustedMaxDepth, getURLDepth } from "./utils/maxDepthUtils";
import { Logger } from "../../lib/logger";
@ -44,6 +44,7 @@ export class WebScraperDataProvider {
private crawlerMode: string = "default";
private allowBackwardCrawling: boolean = false;
private allowExternalContentLinks: boolean = false;
private priority?: number;
authorize(): void {
throw new Error("Method not implemented.");
@ -72,7 +73,8 @@ export class WebScraperDataProvider {
url,
this.pageOptions,
this.extractorOptions,
existingHTML
existingHTML,
this.priority,
);
processedUrls++;
if (inProgress) {
@ -88,21 +90,6 @@ export class WebScraperDataProvider {
results[i + index] = result;
})
);
try {
if (this.mode === "crawl" && this.bullJobId) {
const job = await getWebScraperQueue().getJob(this.bullJobId);
const jobStatus = await job.getState();
if (jobStatus === "failed") {
Logger.info(
"Job has failed or has been cancelled by the user. Stopping the job..."
);
return [] as Document[];
}
}
} catch (error) {
Logger.error(error.message);
return [] as Document[];
}
}
return results.filter((result) => result !== null) as Document[];
}
@ -608,6 +595,7 @@ export class WebScraperDataProvider {
options.crawlerOptions?.allowBackwardCrawling ?? false;
this.allowExternalContentLinks =
options.crawlerOptions?.allowExternalContentLinks ?? false;
this.priority = options.priority;
// make sure all urls start with https://
this.urls = this.urls.map((url) => {

View File

@ -26,6 +26,7 @@ export async function scrapWithFireEngine({
fireEngineOptions = {},
headers,
options,
priority,
}: {
url: string;
waitFor?: number;
@ -35,6 +36,7 @@ export async function scrapWithFireEngine({
fireEngineOptions?: FireEngineOptions;
headers?: Record<string, string>;
options?: any;
priority?: number;
}): Promise<FireEngineResponse> {
const logParams = {
url,
@ -78,6 +80,7 @@ export async function scrapWithFireEngine({
fullPageScreenshot: fullPageScreenshotParam,
headers: headers,
pageOptions: pageOptions,
priority,
...fireEngineOptionsParam,
},
{

View File

@ -134,7 +134,8 @@ export async function scrapSingleUrl(
extractorOptions: ExtractorOptions = {
mode: "llm-extraction-from-markdown",
},
existingHtml: string = ""
existingHtml: string = "",
priority?: number,
): Promise<Document> {
urlToScrap = urlToScrap.trim();
@ -177,7 +178,8 @@ export async function scrapSingleUrl(
headers: pageOptions.headers,
fireEngineOptions: {
engine: engine,
}
},
priority,
});
scraperResponse.text = response.html;
scraperResponse.screenshot = response.screenshot;
@ -340,7 +342,7 @@ export async function scrapSingleUrl(
Logger.debug(`⛏️ ${scraper}: Successfully scraped ${urlToScrap} with text length >= 100, breaking`);
break;
}
if (pageStatusCode && pageStatusCode == 404) {
if (pageStatusCode && (pageStatusCode == 404 || pageStatusCode == 500)) {
Logger.debug(`⛏️ ${scraper}: Successfully scraped ${urlToScrap} with status code 404, breaking`);
break;
}

View File

@ -41,10 +41,10 @@ export function extractLinks(html: string, baseUrl: string): string[] {
links.push(href);
} else if (href.startsWith('/')) {
// Relative URL starting with '/', append to origin
links.push(`${origin}${href}`);
links.push(new URL(href, baseUrl).href);
} else if (!href.startsWith('#') && !href.startsWith('mailto:')) {
// Relative URL not starting with '/', append to base URL
links.push(`${baseUrl}/${href}`);
links.push(new URL(href, baseUrl).href);
} else if (href.startsWith('mailto:')) {
// mailto: links, add as is
links.push(href);

View File

@ -1,5 +1,5 @@
import { Logger } from "../../../src/lib/logger";
import { getWebScraperQueue } from "../queue-service";
import { getScrapeQueue } from "../queue-service";
import { sendSlackWebhook } from "./slack";
export async function checkAlerts() {
@ -13,8 +13,8 @@ export async function checkAlerts() {
Logger.info("Initializing alerts");
const checkActiveJobs = async () => {
try {
const webScraperQueue = getWebScraperQueue();
const activeJobs = await webScraperQueue.getActiveCount();
const scrapeQueue = getScrapeQueue();
const activeJobs = await scrapeQueue.getActiveCount();
if (activeJobs > Number(process.env.ALERT_NUM_ACTIVE_JOBS)) {
Logger.warn(
`Alert: Number of active jobs is over ${process.env.ALERT_NUM_ACTIVE_JOBS}. Current active jobs: ${activeJobs}.`
@ -34,11 +34,10 @@ export async function checkAlerts() {
};
const checkWaitingQueue = async () => {
const webScraperQueue = getWebScraperQueue();
const waitingJobs = await webScraperQueue.getWaitingCount();
const paused = await webScraperQueue.getPausedCount();
const scrapeQueue = getScrapeQueue();
const waitingJobs = await scrapeQueue.getWaitingCount();
if (waitingJobs !== paused && waitingJobs > Number(process.env.ALERT_NUM_WAITING_JOBS)) {
if (waitingJobs > Number(process.env.ALERT_NUM_WAITING_JOBS)) {
Logger.warn(
`Alert: Number of waiting jobs is over ${process.env.ALERT_NUM_WAITING_JOBS}. Current waiting jobs: ${waitingJobs}.`
);

View File

@ -40,10 +40,11 @@ export async function logJob(job: FirecrawlJob) {
extractor_options: job.extractor_options,
num_tokens: job.num_tokens,
retry: !!job.retry,
crawl_id: job.crawl_id,
},
]);
if (process.env.POSTHOG_API_KEY) {
if (process.env.POSTHOG_API_KEY && !job.crawl_id) {
let phLog = {
distinctId: "from-api", //* To identify this on the group level, setting distinctid to a static string per posthog docs: https://posthog.com/docs/product-analytics/group-analytics#advanced-server-side-only-capturing-group-events-without-a-user
...(job.team_id !== "preview" && {

View File

@ -1,17 +1,17 @@
import { Job, Queue } from "bull";
import {
getWebScraperQueue,
} from "./queue-service";
import { Job, Queue } from "bullmq";
import { getScrapeQueue } from "./queue-service";
import { v4 as uuidv4 } from "uuid";
import { WebScraperOptions } from "../types";
export async function addWebScraperJob(
export async function addScrapeJob(
webScraperOptions: WebScraperOptions,
options: any = {}
options: any = {},
jobId: string = uuidv4(),
): Promise<Job> {
return await getWebScraperQueue().add(webScraperOptions, {
return await getScrapeQueue().add(jobId, webScraperOptions, {
priority: webScraperOptions.crawl_id ? 20 : 10,
...options,
jobId: uuidv4(),
jobId,
});
}

View File

@ -1,23 +1,40 @@
import Queue from "bull";
import { Queue as BullQueue } from "bull";
import { Queue } from "bullmq";
import { Logger } from "../lib/logger";
import IORedis from "ioredis";
let webScraperQueue: BullQueue;
let scrapeQueue: Queue;
export function getWebScraperQueue() {
if (!webScraperQueue) {
webScraperQueue = new Queue("web-scraper", process.env.REDIS_URL, {
settings: {
lockDuration: 1 * 60 * 1000, // 1 minute in milliseconds,
lockRenewTime: 15 * 1000, // 15 seconds in milliseconds
stalledInterval: 30 * 1000,
maxStalledCount: 10,
},
defaultJobOptions:{
attempts: 2
export const redisConnection = new IORedis(process.env.REDIS_URL, {
maxRetriesPerRequest: null,
});
export const scrapeQueueName = "{scrapeQueue}";
export function getScrapeQueue() {
if (!scrapeQueue) {
scrapeQueue = new Queue(
scrapeQueueName,
{
connection: redisConnection,
}
});
// {
// settings: {
// lockDuration: 1 * 60 * 1000, // 1 minute in milliseconds,
// lockRenewTime: 15 * 1000, // 15 seconds in milliseconds
// stalledInterval: 30 * 1000,
// maxStalledCount: 10,
// },
// defaultJobOptions:{
// attempts: 5
// }
// }
);
Logger.info("Web scraper queue created");
}
return webScraperQueue;
return scrapeQueue;
}
import { QueueEvents } from 'bullmq';
export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection });

View File

@ -1,74 +1,273 @@
import { CustomError } from "../lib/custom-error";
import { getWebScraperQueue } from "./queue-service";
import {
getScrapeQueue,
redisConnection,
scrapeQueueName,
} from "./queue-service";
import "dotenv/config";
import { logtail } from "./logtail";
import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook";
import { logJob } from "./logging/log_job";
import { initSDK } from '@hyperdx/node-opentelemetry';
import { Job } from "bull";
import { initSDK } from "@hyperdx/node-opentelemetry";
import { Job } from "bullmq";
import { Logger } from "../lib/logger";
import { ScrapeEvents } from "../lib/scrape-events";
import { Worker } from "bullmq";
import systemMonitor from "./system-monitor";
import { v4 as uuidv4 } from "uuid";
import { addCrawlJob, addCrawlJobDone, crawlToCrawler, getCrawl, getCrawlJobs, isCrawlFinished, lockURL } from "../lib/crawl-redis";
import { StoredCrawl } from "../lib/crawl-redis";
import { addScrapeJob } from "./queue-jobs";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
if (process.env.ENV === 'production') {
if (process.env.ENV === "production") {
initSDK({
consoleCapture: true,
additionalInstrumentations: [],
});
}
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
const wsq = getWebScraperQueue();
const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000;
const workerStalledCheckInterval =
Number(process.env.WORKER_STALLED_CHECK_INTERVAL) || 30000;
const jobLockExtendInterval =
Number(process.env.JOB_LOCK_EXTEND_INTERVAL) || 15000;
const jobLockExtensionTime =
Number(process.env.JOB_LOCK_EXTENSION_TIME) || 60000;
async function processJob(job: Job, done) {
const cantAcceptConnectionInterval =
Number(process.env.CANT_ACCEPT_CONNECTION_INTERVAL) || 2000;
const connectionMonitorInterval =
Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10;
const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20;
const processJobInternal = async (token: string, job: Job) => {
const extendLockInterval = setInterval(async () => {
Logger.info(`🐂 Worker extending lock on job ${job.id}`);
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);
try {
const result = await processJob(job, token);
try{
if (job.data.crawl_id && process.env.USE_DB_AUTHENTICATION === "true") {
await job.moveToCompleted(null, token, false);
} else {
await job.moveToCompleted(result.docs, token, false);
}
}catch(e){
}
} catch (error) {
console.log("Job failed, error:", error);
await job.moveToFailed(error, token, false);
} finally {
clearInterval(extendLockInterval);
}
};
let isShuttingDown = false;
process.on("SIGINT", () => {
console.log("Received SIGINT. Shutting down gracefully...");
isShuttingDown = true;
});
const workerFun = async (queueName: string, processJobInternal: (token: string, job: Job) => Promise<void>) => {
const worker = new Worker(queueName, null, {
connection: redisConnection,
lockDuration: 1 * 60 * 1000, // 1 minute
// lockRenewTime: 15 * 1000, // 15 seconds
stalledInterval: 30 * 1000, // 30 seconds
maxStalledCount: 10, // 10 times
});
worker.startStalledCheckTimer();
const monitor = await systemMonitor;
while (true) {
if (isShuttingDown) {
console.log("No longer accepting new jobs. SIGINT");
break;
}
const token = uuidv4();
const canAcceptConnection = await monitor.acceptConnection();
if (!canAcceptConnection) {
console.log("Cant accept connection");
await sleep(cantAcceptConnectionInterval); // more sleep
continue;
}
const job = await worker.getNextJob(token);
if (job) {
processJobInternal(token, job);
await sleep(gotJobInterval);
} else {
await sleep(connectionMonitorInterval);
}
}
};
workerFun(scrapeQueueName, processJobInternal);
async function processJob(job: Job, token: string) {
Logger.info(`🐂 Worker taking job ${job.id}`);
try {
job.progress({
job.updateProgress({
current: 1,
total: 100,
current_step: "SCRAPING",
current_url: "",
});
const start = Date.now();
const { success, message, docs } = await startWebScraperPipeline({ job });
const { success, message, docs } = await startWebScraperPipeline({
job,
token,
});
const end = Date.now();
const timeTakenInSeconds = (end - start) / 1000;
const data = {
success: success,
success,
result: {
links: docs.map((doc) => {
return { content: doc, source: doc?.metadata?.sourceURL ?? doc?.url ?? "" };
return {
content: doc,
source: doc?.metadata?.sourceURL ?? doc?.url ?? "",
};
}),
},
project_id: job.data.project_id,
error: message /* etc... */,
docs,
};
await callWebhook(job.data.team_id, job.id as string, data);
if (job.data.mode === "crawl") {
await callWebhook(job.data.team_id, job.id as string, data);
}
if (job.data.crawl_id) {
await logJob({
job_id: job.id as string,
success: success,
message: message,
num_docs: docs.length,
docs: docs,
time_taken: timeTakenInSeconds,
team_id: job.data.team_id,
mode: job.data.mode,
url: job.data.url,
crawlerOptions: job.data.crawlerOptions,
pageOptions: job.data.pageOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
});
await addCrawlJobDone(job.data.crawl_id, job.id);
const sc = await getCrawl(job.data.crawl_id) as StoredCrawl;
if (!job.data.sitemapped) {
if (!sc.cancelled) {
const crawler = crawlToCrawler(job.data.crawl_id, sc);
const links = crawler.filterLinks((data.docs[0].linksOnPage as string[])
.map(href => crawler.filterURL(href.trim(), sc.originUrl))
.filter(x => x !== null),
Infinity,
sc.crawlerOptions?.maxDepth ?? 10
)
for (const link of links) {
if (await lockURL(job.data.crawl_id, sc, link)) {
const newJob = await addScrapeJob({
url: link,
mode: "single_urls",
crawlerOptions: sc.crawlerOptions,
team_id: sc.team_id,
pageOptions: sc.pageOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
});
await addCrawlJob(job.data.crawl_id, newJob.id);
}
}
}
}
if (await isCrawlFinished(job.data.crawl_id)) {
const jobIDs = await getCrawlJobs(job.data.crawl_id);
const jobs = (await Promise.all(jobIDs.map(async x => {
if (x === job.id) {
return {
async getState() {
return "completed"
},
timestamp: Date.now(),
returnvalue: docs,
}
}
const j = await getScrapeQueue().getJob(x);
if (process.env.USE_DB_AUTHENTICATION === "true") {
const supabaseData = await supabaseGetJobById(j.id);
if (supabaseData) {
j.returnvalue = supabaseData.docs;
}
}
return j;
}))).sort((a, b) => a.timestamp - b.timestamp);
const jobStatuses = await Promise.all(jobs.map(x => x.getState()));
const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active";
const fullDocs = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue);
await logJob({
job_id: job.data.crawl_id,
success: jobStatus === "completed",
message: message,
num_docs: fullDocs.length,
docs: [],
time_taken: (Date.now() - sc.createdAt) / 1000,
team_id: job.data.team_id,
mode: "crawl",
url: sc.originUrl,
crawlerOptions: sc.crawlerOptions,
pageOptions: sc.pageOptions,
origin: job.data.origin,
});
const data = {
success: jobStatus !== "failed",
result: {
links: fullDocs.map((doc) => {
return {
content: doc,
source: doc?.metadata?.sourceURL ?? doc?.url ?? "",
};
}),
},
project_id: job.data.project_id,
error: message /* etc... */,
docs: fullDocs,
};
await callWebhook(job.data.team_id, job.id as string, data);
}
}
await logJob({
job_id: job.id as string,
success: success,
message: message,
num_docs: docs.length,
docs: docs,
time_taken: timeTakenInSeconds,
team_id: job.data.team_id,
mode: "crawl",
url: job.data.url,
crawlerOptions: job.data.crawlerOptions,
pageOptions: job.data.pageOptions,
origin: job.data.origin,
});
Logger.info(`🐂 Job done ${job.id}`);
done(null, data);
return data;
} catch (error) {
Logger.error(`🐂 Job errored ${job.id} - ${error}`);
if (await getWebScraperQueue().isPaused(false)) {
Logger.debug("🐂Queue is paused, ignoring");
return;
}
if (error instanceof CustomError) {
// Here we handle the error, then save the failed job
@ -81,6 +280,9 @@ async function processJob(job: Job, done) {
});
}
Logger.error(error);
if (error.stack) {
Logger.error(error.stack);
}
logtail.error("Overall error ingesting", {
job_id: job.id,
@ -89,37 +291,69 @@ async function processJob(job: Job, done) {
const data = {
success: false,
docs: [],
project_id: job.data.project_id,
error:
"Something went wrong... Contact help@mendable.ai or try again." /* etc... */,
};
await callWebhook(job.data.team_id, job.id as string, data);
await logJob({
job_id: job.id as string,
success: false,
message: typeof error === 'string' ? error : (error.message ?? "Something went wrong... Contact help@mendable.ai"),
num_docs: 0,
docs: [],
time_taken: 0,
team_id: job.data.team_id,
mode: "crawl",
url: job.data.url,
crawlerOptions: job.data.crawlerOptions,
pageOptions: job.data.pageOptions,
origin: job.data.origin,
});
done(null, data);
if (job.data.mode === "crawl" || job.data.crawl_id) {
await callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data);
}
if (job.data.crawl_id) {
await logJob({
job_id: job.id as string,
success: false,
message:
typeof error === "string"
? error
: error.message ?? "Something went wrong... Contact help@mendable.ai",
num_docs: 0,
docs: [],
time_taken: 0,
team_id: job.data.team_id,
mode: job.data.mode,
url: job.data.url,
crawlerOptions: job.data.crawlerOptions,
pageOptions: job.data.pageOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
});
const sc = await getCrawl(job.data.crawl_id);
await logJob({
job_id: job.data.crawl_id,
success: false,
message:
typeof error === "string"
? error
: error.message ?? "Something went wrong... Contact help@mendable.ai",
num_docs: 0,
docs: [],
time_taken: 0,
team_id: job.data.team_id,
mode: "crawl",
url: sc ? sc.originUrl : job.data.url,
crawlerOptions: sc ? sc.crawlerOptions : job.data.crawlerOptions,
pageOptions: sc ? sc.pageOptions : job.data.pageOptions,
origin: job.data.origin,
});
}
// done(null, data);
return data;
}
}
wsq.process(
Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
processJob
);
// wsq.process(
// Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
// processJob
// );
wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));
// wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
// wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
// wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
// wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
// wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
// wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));

View File

@ -0,0 +1,81 @@
import si from 'systeminformation';
import { Mutex } from "async-mutex";
const MAX_CPU = process.env.MAX_CPU ? parseFloat(process.env.MAX_CPU) : 0.8;
const MAX_RAM = process.env.MAX_RAM ? parseFloat(process.env.MAX_RAM) : 0.8;
const CACHE_DURATION = process.env.SYS_INFO_MAX_CACHE_DURATION ? parseFloat(process.env.SYS_INFO_MAX_CACHE_DURATION) : 150;
class SystemMonitor {
private static instance: SystemMonitor;
private static instanceMutex = new Mutex();
private cpuUsageCache: number | null = null;
private memoryUsageCache: number | null = null;
private lastCpuCheck: number = 0;
private lastMemoryCheck: number = 0;
private constructor() {}
public static async getInstance(): Promise<SystemMonitor> {
if (SystemMonitor.instance) {
return SystemMonitor.instance;
}
await this.instanceMutex.runExclusive(async () => {
if (!SystemMonitor.instance) {
SystemMonitor.instance = new SystemMonitor();
}
});
return SystemMonitor.instance;
}
private async checkMemoryUsage() {
const now = Date.now();
if (this.memoryUsageCache !== null && (now - this.lastMemoryCheck) < CACHE_DURATION) {
return this.memoryUsageCache;
}
const memoryData = await si.mem();
const totalMemory = memoryData.total;
const availableMemory = memoryData.available;
const usedMemory = totalMemory - availableMemory;
const usedMemoryPercentage = (usedMemory / totalMemory);
this.memoryUsageCache = usedMemoryPercentage;
this.lastMemoryCheck = now;
return usedMemoryPercentage;
}
private async checkCpuUsage() {
const now = Date.now();
if (this.cpuUsageCache !== null && (now - this.lastCpuCheck) < CACHE_DURATION) {
return this.cpuUsageCache;
}
const cpuData = await si.currentLoad();
const cpuLoad = cpuData.currentLoad / 100;
this.cpuUsageCache = cpuLoad;
this.lastCpuCheck = now;
return cpuLoad;
}
public async acceptConnection() {
const cpuUsage = await this.checkCpuUsage();
const memoryUsage = await this.checkMemoryUsage();
return cpuUsage < MAX_CPU && memoryUsage < MAX_RAM;
}
public clearCache() {
this.cpuUsageCache = null;
this.memoryUsageCache = null;
this.lastCpuCheck = 0;
this.lastMemoryCheck = 0;
}
}
export default SystemMonitor.getInstance();

View File

@ -25,8 +25,11 @@ export interface WebScraperOptions {
mode: Mode;
crawlerOptions: any;
pageOptions: any;
extractorOptions?: any;
team_id: string;
origin?: string;
crawl_id?: string;
sitemapped?: boolean;
}
export interface RunWebScraperParams {
@ -34,11 +37,13 @@ export interface RunWebScraperParams {
mode: Mode;
crawlerOptions: any;
pageOptions?: any;
extractorOptions?: any;
inProgress: (progress: any) => void;
onSuccess: (result: any) => void;
onSuccess: (result: any, mode: string) => void;
onError: (error: Error) => void;
team_id: string;
bull_job_id: string;
priority?: number;
}
export interface RunWebScraperResult {
@ -63,6 +68,7 @@ export interface FirecrawlJob {
extractor_options?: ExtractorOptions,
num_tokens?: number,
retry?: boolean,
crawl_id?: string;
}
export interface FirecrawlScrapeResponse {

View File

@ -24,6 +24,7 @@
"devDependencies": {
"@types/jest": "^29.5.12",
"@types/supertest": "^6.0.2",
"artillery": "^2.0.19",
"typescript": "^5.4.5"
}
}

File diff suppressed because it is too large Load Diff