Merge branch 'main' into mog/ongoing-crawls

This commit is contained in:
Nicolas 2025-06-03 16:40:40 -03:00
commit 782702d536
31 changed files with 1046 additions and 244 deletions

34
.github/workflows/deploy-playwright.yml vendored Normal file
View File

@ -0,0 +1,34 @@
name: Deploy Playwright to GHCR
env:
DOTNET_VERSION: '6.0.x'
on:
push:
branches:
- main
paths:
- apps/playwright-service-ts/**
workflow_dispatch:
jobs:
push-app-image:
runs-on: ubuntu-latest
defaults:
run:
working-directory: './apps/playwright-service-ts'
steps:
- name: 'Checkout GitHub Action'
uses: actions/checkout@main
- name: 'Login to GitHub Container Registry'
uses: docker/login-action@v1
with:
registry: ghcr.io
username: ${{github.actor}}
password: ${{secrets.GITHUB_TOKEN}}
- name: 'Build Inventory Image'
run: |
docker build . --tag ghcr.io/mendableai/playwright-service:latest
docker push ghcr.io/mendableai/playwright-service:latest

View File

@ -16,6 +16,7 @@ env:
TEST_SUITE_SELF_HOSTED: true
USE_GO_MARKDOWN_PARSER: true
FIRECRAWL_DEBUG_FILTER_LINKS: true
SENTRY_ENVIRONMENT: dev
jobs:
test:

View File

@ -21,6 +21,9 @@ env:
SUPABASE_SERVICE_TOKEN: ${{ secrets.SUPABASE_SERVICE_TOKEN }}
SUPABASE_URL: ${{ secrets.SUPABASE_URL }}
SUPABASE_REPLICA_URL: ${{ secrets.SUPABASE_REPLICA_URL }}
INDEX_SUPABASE_SERVICE_TOKEN: ${{ secrets.INDEX_SUPABASE_SERVICE_TOKEN }}
INDEX_SUPABASE_ANON_TOKEN: ${{ secrets.INDEX_SUPABASE_ANON_TOKEN }}
INDEX_SUPABASE_URL: ${{ secrets.INDEX_SUPABASE_URL }}
TEST_API_KEY: ${{ secrets.TEST_API_KEY }}
FIRE_ENGINE_BETA_URL: ${{ secrets.FIRE_ENGINE_BETA_URL }}
USE_DB_AUTHENTICATION: true
@ -30,6 +33,7 @@ env:
RUNPOD_MU_API_KEY: ${{ secrets.RUNPOD_MU_API_KEY }}
GCS_CREDENTIALS: ${{ secrets.GCS_CREDENTIALS }}
GCS_BUCKET_NAME: ${{ secrets.GCS_BUCKET_NAME }}
GCS_INDEX_BUCKET_NAME: ${{ secrets.GCS_INDEX_BUCKET_NAME }}
GOOGLE_GENERATIVE_AI_API_KEY: ${{ secrets.GOOGLE_GENERATIVE_AI_API_KEY }}
GROQ_API_KEY: ${{ secrets.GROQ_API_KEY }}
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
@ -100,11 +104,11 @@ jobs:
working-directory: ./apps/api
id: start_workers
- name: Start index worker
run: npm run index-worker &
run: npm run index-worker > index-worker.log 2>&1 &
working-directory: ./apps/api
id: start_index_worker
- name: Wait for API
run: pnpx wait-on tcp:3002 -t 15s
run: pnpx wait-on tcp:3002 -t 30s
- name: Run snippet tests
run: |
npm run test:snips
@ -118,4 +122,5 @@ jobs:
# name: Logs
# path: |
# ./apps/api/api.log
# ./apps/api/worker.log
# ./apps/api/worker.log
# ./apps/api/index-worker.log

View File

@ -124,7 +124,7 @@ pub unsafe extern "C" fn extract_metadata(html: *const libc::c_char) -> *mut lib
let meta = meta.as_node().as_element().unwrap();
let attrs = meta.attributes.borrow();
if let Some(name) = attrs.get("name").or_else(|| attrs.get("property")) {
if let Some(name) = attrs.get("name").or_else(|| attrs.get("property")).or_else(|| attrs.get("itemprop")) {
if let Some(content) = attrs.get("content") {
if let Some(v) = out.get(name) {
match v {

View File

@ -64,6 +64,8 @@ describe("Queue Concurrency Integration", () => {
removeBase64Images: true,
fastMode: false,
blockAds: true,
maxAge: 0,
storeInCache: true,
};
beforeEach(() => {

View File

@ -1,197 +1,211 @@
// import { batchScrape, crawl, creditUsage, extract, map, scrape, search, tokenUsage } from "./lib";
import { batchScrape, crawl, creditUsage, extract, map, scrape, search, tokenUsage } from "./lib";
// const sleep = (ms: number) => new Promise(x => setTimeout(() => x(true), ms));
// const sleepForBatchBilling = () => sleep(20000);
const sleep = (ms: number) => new Promise(x => setTimeout(() => x(true), ms));
const sleepForBatchBilling = () => sleep(40000);
// beforeAll(async () => {
// // Wait for previous test runs to stop billing processing
// if (!process.env.TEST_SUITE_SELF_HOSTED) {
// await sleep(40000);
// }
// }, 50000);
beforeAll(async () => {
// Wait for previous test runs to stop billing processing
if (!process.env.TEST_SUITE_SELF_HOSTED) {
await sleep(40000);
}
}, 50000);
// describe("Billing tests", () => {
// if (process.env.TEST_SUITE_SELF_HOSTED) {
// it("dummy", () => {
// expect(true).toBe(true);
// });
// } else {
// it("bills scrape correctly", async () => {
// const rc1 = (await creditUsage()).remaining_credits;
describe("Billing tests", () => {
if (process.env.TEST_SUITE_SELF_HOSTED) {
it("dummy", () => {
expect(true).toBe(true);
});
} else {
it("bills scrape correctly", async () => {
const rc1 = (await creditUsage()).remaining_credits;
// // Run all scrape operations in parallel with Promise.all
// await Promise.all([
// // scrape 1: regular fc.dev scrape (1 credit)
// scrape({
// url: "https://firecrawl.dev"
// }),
// Run all scrape operations in parallel with Promise.all
await Promise.all([
// scrape 1: regular fc.dev scrape (1 credit)
scrape({
url: "https://firecrawl.dev"
}),
// // scrape 1.1: regular fc.dev scrape (1 credit)
// scrape({
// url: "https://firecrawl.dev"
// }),
// scrape 1.1: regular fc.dev scrape (1 credit)
scrape({
url: "https://firecrawl.dev"
}),
// // scrape 2: fc.dev with json (5 credits)
// scrape({
// url: "https://firecrawl.dev",
// formats: ["json"],
// jsonOptions: {
// schema: {
// type: "object",
// properties: {
// is_open_source: { type: "boolean" },
// },
// required: ["is_open_source"],
// },
// },
// })
// ]);
// scrape 2: fc.dev with json (5 credits)
scrape({
url: "https://firecrawl.dev",
formats: ["json"],
jsonOptions: {
schema: {
type: "object",
properties: {
is_open_source: { type: "boolean" },
},
required: ["is_open_source"],
},
},
})
]);
// // sum: 7 credits
// sum: 7 credits
// await sleepForBatchBilling();
await sleepForBatchBilling();
// const rc2 = (await creditUsage()).remaining_credits;
const rc2 = (await creditUsage()).remaining_credits;
// expect(rc1 - rc2).toBe(7);
// }, 120000);
expect(rc1 - rc2).toBe(7);
}, 120000);
// it("bills batch scrape correctly", async () => {
// const rc1 = (await creditUsage()).remaining_credits;
it("bills batch scrape correctly", async () => {
const rc1 = (await creditUsage()).remaining_credits;
// // Run both scrape operations in parallel with Promise.all
// const [scrape1, scrape2] = await Promise.all([
// // scrape 1: regular batch scrape with failing domain (2 credits)
// batchScrape({
// urls: [
// "https://firecrawl.dev",
// "https://mendable.ai",
// "https://thisdomaindoesnotexistandwillfail.fcr",
// ],
// }),
// Run both scrape operations in parallel with Promise.all
const [scrape1, scrape2] = await Promise.all([
// scrape 1: regular batch scrape with failing domain (2 credits)
batchScrape({
urls: [
"https://firecrawl.dev",
"https://mendable.ai",
"https://thisdomaindoesnotexistandwillfail.fcr",
],
}),
// // scrape 2: batch scrape with json (10 credits)
// batchScrape({
// urls: [
// "https://firecrawl.dev",
// "https://mendable.ai",
// "https://thisdomaindoesnotexistandwillfail.fcr",
// ],
// formats: ["json"],
// jsonOptions: {
// schema: {
// type: "object",
// properties: {
// four_word_summary: { type: "string" },
// },
// required: ["four_word_summary"],
// },
// },
// })
// ]);
// scrape 2: batch scrape with json (10 credits)
batchScrape({
urls: [
"https://firecrawl.dev",
"https://mendable.ai",
"https://thisdomaindoesnotexistandwillfail.fcr",
],
formats: ["json"],
jsonOptions: {
schema: {
type: "object",
properties: {
four_word_summary: { type: "string" },
},
required: ["four_word_summary"],
},
},
})
]);
// // sum: 12 credits
// sum: 12 credits
// await sleepForBatchBilling();
await sleepForBatchBilling();
// const rc2 = (await creditUsage()).remaining_credits;
const rc2 = (await creditUsage()).remaining_credits;
// expect(rc1 - rc2).toBe(12);
// }, 600000);
expect(rc1 - rc2).toBe(12);
}, 600000);
// it("bills crawl correctly", async () => {
// const rc1 = (await creditUsage()).remaining_credits;
it("bills crawl correctly", async () => {
const rc1 = (await creditUsage()).remaining_credits;
// // Run both crawl operations in parallel with Promise.all
// const [crawl1, crawl2] = await Promise.all([
// // crawl 1: regular fc.dev crawl (x credits)
// crawl({
// url: "https://firecrawl.dev",
// }),
// Run both crawl operations in parallel with Promise.all
const [crawl1, crawl2] = await Promise.all([
// crawl 1: regular fc.dev crawl (x credits)
crawl({
url: "https://firecrawl.dev",
limit: 10,
}),
// // crawl 2: fc.dev crawl with json (5y credits)
// crawl({
// url: "https://firecrawl.dev",
// scrapeOptions: {
// formats: ["json"],
// jsonOptions: {
// schema: {
// type: "object",
// properties: {
// four_word_summary: { type: "string" },
// },
// required: ["four_word_summary"],
// },
// },
// }
// })
// ]);
// crawl 2: fc.dev crawl with json (5y credits)
crawl({
url: "https://firecrawl.dev",
scrapeOptions: {
formats: ["json"],
jsonOptions: {
schema: {
type: "object",
properties: {
four_word_summary: { type: "string" },
},
required: ["four_word_summary"],
},
},
},
limit: 10,
})
]);
// expect(crawl1.success).toBe(true);
// expect(crawl2.success).toBe(true);
expect(crawl1.success).toBe(true);
expect(crawl2.success).toBe(true);
// // sum: x+5y credits
// sum: x+5y credits
// await sleepForBatchBilling();
await sleepForBatchBilling();
// const rc2 = (await creditUsage()).remaining_credits;
const rc2 = (await creditUsage()).remaining_credits;
// if (crawl1.success && crawl2.success) {
// expect(rc1 - rc2).toBe(crawl1.completed + crawl2.completed * 5);
// }
// }, 600000);
if (crawl1.success && crawl2.success) {
expect(rc1 - rc2).toBe(crawl1.completed + crawl2.completed * 5);
}
}, 600000);
// it("bills map correctly", async () => {
// const rc1 = (await creditUsage()).remaining_credits;
// await map({ url: "https://firecrawl.dev" });
// await sleepForBatchBilling();
// const rc2 = (await creditUsage()).remaining_credits;
// expect(rc1 - rc2).toBe(1);
// }, 60000);
it("bills map correctly", async () => {
const rc1 = (await creditUsage()).remaining_credits;
await map({ url: "https://firecrawl.dev" });
await sleepForBatchBilling();
const rc2 = (await creditUsage()).remaining_credits;
expect(rc1 - rc2).toBe(1);
}, 60000);
// it("bills search correctly", async () => {
// const rc1 = (await creditUsage()).remaining_credits;
it("bills search correctly", async () => {
const rc1 = (await creditUsage()).remaining_credits;
// const results = await search({
// query: "firecrawl"
// });
const results = await search({
query: "firecrawl"
});
// await sleepForBatchBilling();
await sleepForBatchBilling();
// const rc2 = (await creditUsage()).remaining_credits;
const rc2 = (await creditUsage()).remaining_credits;
// expect(rc1 - rc2).toBe(results.length);
// }, 60000);
expect(rc1 - rc2).toBe(results.length);
}, 60000);
// it("bills extract correctly", async () => {
// const rc1 = (await tokenUsage()).remaining_tokens;
it("bills search with scrape correctly", async () => {
const rc1 = (await creditUsage()).remaining_credits;
const results = await search({
query: "firecrawl",
scrapeOptions: {
formats: ["markdown"],
},
});
await sleepForBatchBilling();
const rc2 = (await creditUsage()).remaining_credits;
expect(rc1 - rc2).toBe(results.length);
}, 600000);
it("bills extract correctly", async () => {
const rc1 = (await tokenUsage()).remaining_tokens;
// await extract({
// urls: ["https://firecrawl.dev"],
// schema: {
// "type": "object",
// "properties": {
// "is_open_source": {
// "type": "boolean"
// }
// },
// "required": [
// "is_open_source"
// ]
// },
// origin: "api-sdk",
// });
await extract({
urls: ["https://firecrawl.dev"],
schema: {
"type": "object",
"properties": {
"is_open_source": {
"type": "boolean"
}
},
"required": [
"is_open_source"
]
},
origin: "api-sdk",
});
// await sleepForBatchBilling();
await sleepForBatchBilling();
// const rc2 = (await tokenUsage()).remaining_tokens;
const rc2 = (await tokenUsage()).remaining_tokens;
// expect(rc1 - rc2).toBe(305);
// }, 300000);
// }
// });
// temporarily disabled
it("is mocked", () => {
expect(true).toBe(true);
});
expect(rc1 - rc2).toBe(305);
}, 300000);
}
});

View File

@ -1,4 +1,5 @@
import { scrape, scrapeStatus, scrapeWithFailure } from "./lib";
import crypto from "crypto";
describe("Scrape tests", () => {
it.concurrent("mocking works properly", async () => {
@ -72,28 +73,72 @@ describe("Scrape tests", () => {
});
expect(response.markdown).toContain("Firecrawl");
// Give time to propagate to read replica
await new Promise(resolve => setTimeout(resolve, 1000));
const status = await scrapeStatus(response.metadata.scrapeId!);
expect(JSON.stringify(status)).toBe(JSON.stringify(response));
}, 60000);
describe("Ad blocking (f-e dependant)", () => {
it.concurrent("blocks ads by default", async () => {
const response = await scrape({
url: "https://www.allrecipes.com/recipe/18185/yum/",
// describe("Ad blocking (f-e dependant)", () => {
// it.concurrent("blocks ads by default", async () => {
// const response = await scrape({
// url: "https://www.allrecipes.com/recipe/18185/yum/",
// });
// expect(response.markdown).not.toContain(".g.doubleclick.net/");
// }, 30000);
// it.concurrent("doesn't block ads if explicitly disabled", async () => {
// const response = await scrape({
// url: "https://www.allrecipes.com/recipe/18185/yum/",
// blockAds: false,
// });
// expect(response.markdown).toMatch(/(\.g\.doubleclick\.net|amazon-adsystem\.com)\//);
// }, 30000);
// });
describe("Index", () => {
it.concurrent("caches properly", async () => {
const id = crypto.randomUUID();
const url = "https://firecrawl.dev/?testId=" + id;
const response1 = await scrape({
url,
maxAge: 120000,
storeInCache: false,
});
expect(response.markdown).not.toContain(".g.doubleclick.net/");
}, 30000);
expect(response1.metadata.cacheState).toBe("miss");
it.concurrent("doesn't block ads if explicitly disabled", async () => {
const response = await scrape({
url: "https://www.allrecipes.com/recipe/18185/yum/",
blockAds: false,
await new Promise(resolve => setTimeout(resolve, 17000));
const response2 = await scrape({
url,
maxAge: 120000,
});
expect(response.markdown).toMatch(/(\.g\.doubleclick\.net|amazon-adsystem\.com)\//);
}, 30000);
expect(response2.metadata.cacheState).toBe("miss");
await new Promise(resolve => setTimeout(resolve, 17000));
const response3 = await scrape({
url,
maxAge: 120000,
});
expect(response3.metadata.cacheState).toBe("hit");
expect(response3.metadata.cachedAt).toBeDefined();
const response4 = await scrape({
url,
maxAge: 1,
});
expect(response4.metadata.cacheState).toBe("miss");
}, 150000 + 2 * 17000);
});
describe("Change Tracking format", () => {

View File

@ -0,0 +1,12 @@
import type { Request, Response } from "express";
import { getIndexInsertQueueLength } from "../../../services";
export async function indexQueuePrometheus(req: Request, res: Response) {
const queueLength = await getIndexInsertQueueLength();
res.setHeader("Content-Type", "text/plain");
res.send(`\
# HELP firecrawl_index_queue_length The number of items in the index insert queue
# TYPE firecrawl_index_queue_length gauge
firecrawl_index_queue_length ${queueLength}
`);
}

View File

@ -2,6 +2,7 @@ import { Request, Response } from "express";
import { RequestWithAuth } from "./types";
import { getACUCTeam } from "../auth";
import { logger } from "../../lib/logger";
import { RateLimiterMode } from "../../types";
export async function creditUsageController(
req: RequestWithAuth,
@ -20,7 +21,7 @@ export async function creditUsageController(
}
// Otherwise fetch fresh data
const chunk = await getACUCTeam(req.auth.team_id);
const chunk = await getACUCTeam(req.auth.team_id, false, false, RateLimiterMode.Scrape);
if (!chunk) {
res.status(404).json({
success: false,

View File

@ -25,6 +25,7 @@ import { logger } from "../../lib/logger";
import Redis from "ioredis";
import { querySitemapIndex } from "../../scraper/WebScraper/sitemap-index";
import { getIndexQueue } from "../../services/queue-service";
import { queryIndexAtSplitLevel } from "../../services/index";
configDotenv();
const redis = new Redis(process.env.REDIS_URL!);
@ -43,6 +44,14 @@ interface MapResult {
mapResults: MapDocument[];
}
async function queryIndex(url: string, limit: number, useIndex: boolean): Promise<string[]> {
if (!useIndex) {
return [];
}
return await queryIndexAtSplitLevel(url, limit);
}
export async function getMapResults({
url,
search,
@ -58,6 +67,7 @@ export async function getMapResults({
mock,
filterByPath = true,
flags,
useIndex = true,
}: {
url: string;
search?: string;
@ -73,6 +83,7 @@ export async function getMapResults({
mock?: string;
filterByPath?: boolean;
flags: TeamFlags;
useIndex?: boolean;
}): Promise<MapResult> {
const id = uuidv4();
let links: string[] = [url];
@ -165,11 +176,16 @@ export async function getMapResults({
}
// Parallelize sitemap index query with search results
const [sitemapIndexResult, ...searchResults] = await Promise.all([
const [sitemapIndexResult, indexResults, ...searchResults] = await Promise.all([
querySitemapIndex(url, abort),
queryIndex(url, limit, useIndex),
...(cachedResult ? [] : pagePromises),
]);
if (indexResults.length > 0) {
links.push(...indexResults);
}
const twoDaysAgo = new Date();
twoDaysAgo.setDate(twoDaysAgo.getDate() - 2);
@ -333,6 +349,7 @@ export async function mapController(
mock: req.body.useMock,
filterByPath: req.body.filterByPath !== false,
flags: req.acuc?.flags ?? null,
useIndex: req.body.useIndex,
}),
...(req.body.timeout !== undefined ? [
new Promise((resolve, reject) => setTimeout(() => {

View File

@ -42,6 +42,8 @@ export async function scrapeController(
});
//
const isDirectToBullMQ = process.env.SEARCH_PREVIEW_TOKEN !== undefined && process.env.SEARCH_PREVIEW_TOKEN === req.body.__searchPreviewToken;
await addScrapeJob(
{
url: req.body.url,
@ -52,6 +54,8 @@ export async function scrapeController(
teamId: req.auth.team_id,
saveScrapeResultToGCS: process.env.GCS_FIRE_ENGINE_BUCKET_NAME ? true : false,
unnormalizedSourceURL: preNormalizedBody.url,
useCache: req.body.__experimental_cache ? true : false,
bypassBilling: isDirectToBullMQ,
},
origin: req.body.origin,
startTime,
@ -59,6 +63,7 @@ export async function scrapeController(
{},
jobId,
jobPriority,
isDirectToBullMQ,
);
const totalWait =
@ -130,6 +135,7 @@ export async function scrapeController(
}
}
return res.status(200).json({
success: true,
data: doc,

View File

@ -40,24 +40,24 @@ export async function searchAndScrapeSearchResult(
try {
const searchResults = await search({
query,
num_results: 5
});
num_results: 5,
});
const documents = await Promise.all(
searchResults.map(result =>
scrapeSearchResult(
{
url: result.url,
title: result.title,
description: result.description
},
options,
logger,
costTracking,
flags
)
)
);
const documents = await Promise.all(
searchResults.map((result) =>
scrapeSearchResult(
{
url: result.url,
title: result.title,
description: result.description,
},
options,
logger,
costTracking,
flags,
),
),
);
return documents;
} catch (error) {
@ -77,6 +77,7 @@ async function scrapeSearchResult(
costTracking: CostTracking,
flags: TeamFlags,
directToBullMQ: boolean = false,
isSearchPreview: boolean = false,
): Promise<Document> {
const jobId = uuidv4();
const jobPriority = await getJobPriority({
@ -100,7 +101,7 @@ async function scrapeSearchResult(
mode: "single_urls" as Mode,
team_id: options.teamId,
scrapeOptions: options.scrapeOptions,
internalOptions: { teamId: options.teamId, useCache: true },
internalOptions: { teamId: options.teamId, useCache: true, bypassBilling: true },
origin: options.origin,
is_scrape: true,
startTime: Date.now(),
@ -112,7 +113,7 @@ async function scrapeSearchResult(
);
const doc: Document = await waitForJob(jobId, options.timeout);
logger.info("Scrape job completed", {
scrapeId: jobId,
url: searchResult.url,
@ -171,6 +172,7 @@ export async function searchController(
};
const startTime = new Date().getTime();
const costTracking = new CostTracking();
const isSearchPreview = process.env.SEARCH_PREVIEW_TOKEN !== undefined && process.env.SEARCH_PREVIEW_TOKEN === req.body.__searchPreviewToken;
try {
req.body = searchRequestSchema.parse(req.body);
@ -199,7 +201,9 @@ export async function searchController(
});
if (req.body.ignoreInvalidURLs) {
searchResults = searchResults.filter((result) => !isUrlBlocked(result.url, req.acuc?.flags ?? null));
searchResults = searchResults.filter(
(result) => !isUrlBlocked(result.url, req.acuc?.flags ?? null),
);
}
logger.info("Searching completed", {
@ -226,12 +230,20 @@ export async function searchController(
} else {
logger.info("Scraping search results");
const scrapePromises = searchResults.map((result) =>
scrapeSearchResult(result, {
teamId: req.auth.team_id,
origin: req.body.origin,
timeout: req.body.timeout,
scrapeOptions: req.body.scrapeOptions,
}, logger, costTracking, req.acuc?.flags ?? null, (req.acuc?.price_credits ?? 0) <= 3000),
scrapeSearchResult(
result,
{
teamId: req.auth.team_id,
origin: req.body.origin,
timeout: req.body.timeout,
scrapeOptions: req.body.scrapeOptions,
},
logger,
costTracking,
req.acuc?.flags ?? null,
(req.acuc?.price_credits ?? 0) <= 3000,
isSearchPreview,
),
);
const docs = await Promise.all(scrapePromises);
@ -257,17 +269,23 @@ export async function searchController(
}
// Bill team once for all successful results
billTeam(req.auth.team_id, req.acuc?.sub_id, responseData.data.reduce((a,x) => {
if (x.metadata?.numPages !== undefined && x.metadata.numPages > 0) {
return a + x.metadata.numPages;
} else {
return a + 1;
}
}, 0)).catch((error) => {
logger.error(
`Failed to bill team ${req.auth.team_id} for ${responseData.data.length} credits: ${error}`,
);
});
if (!isSearchPreview) {
billTeam(
req.auth.team_id,
req.acuc?.sub_id,
responseData.data.reduce((a, x) => {
if (x.metadata?.numPages !== undefined && x.metadata.numPages > 0) {
return a + x.metadata.numPages;
} else {
return a + 1;
}
}, 0),
).catch((error) => {
logger.error(
`Failed to bill team ${req.auth.team_id} for ${responseData.data.length} credits: ${error}`,
);
});
}
const endTime = new Date().getTime();
const timeTakenInSeconds = (endTime - startTime) / 1000;
@ -277,22 +295,25 @@ export async function searchController(
time_taken: timeTakenInSeconds,
});
logJob({
job_id: jobId,
success: true,
num_docs: responseData.data.length,
docs: responseData.data,
time_taken: timeTakenInSeconds,
team_id: req.auth.team_id,
mode: "search",
url: req.body.query,
scrapeOptions: req.body.scrapeOptions,
origin: req.body.origin,
cost_tracking: costTracking,
});
logJob(
{
job_id: jobId,
success: true,
num_docs: responseData.data.length,
docs: responseData.data,
time_taken: timeTakenInSeconds,
team_id: req.auth.team_id,
mode: "search",
url: req.body.query,
scrapeOptions: req.body.scrapeOptions,
origin: req.body.origin,
cost_tracking: costTracking,
},
false,
isSearchPreview,
);
return res.status(200).json(responseData);
} catch (error) {
if (
error instanceof Error &&

View File

@ -21,7 +21,7 @@ export async function tokenUsageController(
}
// Otherwise fetch fresh data
const chunk = await getACUCTeam(req.auth.team_id, false, true, RateLimiterMode.Extract);
const chunk = await getACUCTeam(req.auth.team_id, false, false, RateLimiterMode.Extract);
if (!chunk) {
res.status(404).json({
success: false,

View File

@ -309,6 +309,10 @@ const baseScrapeOptions = z
useMock: z.string().optional(),
blockAds: z.boolean().default(true),
proxy: z.enum(["basic", "stealth", "auto"]).optional(),
maxAge: z.number().int().gte(0).safe().default(0),
storeInCache: z.boolean().default(true),
__experimental_cache: z.boolean().default(false).optional(),
__searchPreviewToken: z.string().optional(),
})
.strict(strictMessage);
@ -656,6 +660,7 @@ export const mapRequestSchema = crawlerOptions
timeout: z.number().positive().finite().optional(),
useMock: z.string().optional(),
filterByPath: z.boolean().default(true),
useIndex: z.boolean().default(true),
})
.strict(strictMessage);
@ -752,6 +757,8 @@ export type Document = {
numPages?: number;
contentType?: string;
proxyUsed: "basic" | "stealth";
cacheState?: "hit" | "miss";
cachedAt?: string;
// [key: string]: string | string[] | number | { smartScrape: number; other: number; total: number } | undefined;
};
serpResults?: {
@ -1198,6 +1205,7 @@ export const searchRequestSchema = z
origin: z.string().optional().default("api"),
timeout: z.number().int().positive().finite().safe().default(60000),
ignoreInvalidURLs: z.boolean().optional().default(false),
__searchPreviewToken: z.string().optional(),
scrapeOptions: baseScrapeOptions
.extend({
formats: z

View File

@ -131,6 +131,9 @@ export async function performDeepResearch(options: DeepResearchServiceOptions) {
removeBase64Images: false,
fastMode: false,
blockAds: false,
maxAge: 0,
storeInCache: true,
__experimental_cache: true,
},
}, logger, costTracking, acuc?.flags ?? null);
return response.length > 0 ? response : [];

View File

@ -10,6 +10,7 @@ import { wrap } from "./v1";
import { acucCacheClearController } from "../controllers/v0/admin/acuc-cache-clear";
import { checkFireEngine } from "../controllers/v0/admin/check-fire-engine";
import { cclogController } from "../controllers/v0/admin/cclog";
import { indexQueuePrometheus } from "../controllers/v0/admin/index-queue-prometheus";
export const adminRouter = express.Router();
@ -49,3 +50,8 @@ adminRouter.get(
`/admin/${process.env.BULL_AUTH_KEY}/cclog`,
wrap(cclogController),
);
adminRouter.get(
`/admin/${process.env.BULL_AUTH_KEY}/index-queue-prometheus`,
wrap(indexQueuePrometheus),
);

View File

@ -1,14 +1,14 @@
import { cacheKey, getEntryFromCache } from "../../../../lib/cache";
import { EngineScrapeResult } from "..";
import { Meta } from "../..";
import { EngineError } from "../../error";
import { EngineError, IndexMissError } from "../../error";
export async function scrapeCache(meta: Meta): Promise<EngineScrapeResult> {
const key = cacheKey(meta.url, meta.options, meta.internalOptions);
if (key === null) throw new EngineError("Scrape not eligible for caching");
const entry = await getEntryFromCache(key);
if (entry === null) throw new EngineError("Cache missed");
if (entry === null) throw new IndexMissError();
if (!entry.html) {
throw new EngineError("Cache hit but HTML is missing");

View File

@ -10,6 +10,8 @@ import { scrapePDF } from "./pdf";
import { scrapeURLWithFetch } from "./fetch";
import { scrapeURLWithPlaywright } from "./playwright";
import { scrapeCache } from "./cache";
import { scrapeURLWithIndex } from "./index/index";
import { useIndex } from "../../../services";
export type Engine =
| "fire-engine;chrome-cdp"
@ -24,7 +26,9 @@ export type Engine =
| "fetch"
| "pdf"
| "docx"
| "cache";
| "cache"
| "index"
| "index;documents";
const useFireEngine =
process.env.FIRE_ENGINE_BETA_URL !== "" &&
@ -38,6 +42,7 @@ const useCache =
export const engines: Engine[] = [
...(useCache ? ["cache" as const] : []),
...(useIndex ? ["index" as const, "index;documents" as const] : []),
...(useFireEngine
? [
"fire-engine;chrome-cdp" as const,
@ -114,6 +119,10 @@ export type EngineScrapeResult = {
numPages?: number;
cacheInfo?: {
created_at: Date;
};
contentType?: string;
};
@ -124,6 +133,8 @@ const engineHandlers: {
) => Promise<EngineScrapeResult>;
} = {
cache: scrapeCache,
index: scrapeURLWithIndex,
"index;documents": scrapeURLWithIndex,
"fire-engine;chrome-cdp": scrapeURLWithFireEngineChromeCDP,
"fire-engine(retry);chrome-cdp": scrapeURLWithFireEngineChromeCDP,
"fire-engine;chrome-cdp;stealth": scrapeURLWithFireEngineChromeCDP,
@ -166,6 +177,24 @@ export const engineOptions: {
},
quality: 1000, // cache should always be tried first
},
index: {
features: {
actions: false,
waitFor: true,
screenshot: true,
"screenshot@fullScreen": true,
pdf: false,
docx: false,
atsv: false,
mobile: true,
location: true,
skipTlsVerification: true,
useFastMode: true,
stealthProxy: false,
disableAdblock: false,
},
quality: 999, // index should always be tried second ? - MG
},
"fire-engine;chrome-cdp": {
features: {
actions: true,
@ -202,6 +231,24 @@ export const engineOptions: {
},
quality: 45,
},
"index;documents": {
features: {
actions: false,
waitFor: true,
screenshot: true,
"screenshot@fullScreen": true,
pdf: true,
docx: true,
atsv: false,
location: true,
mobile: true,
skipTlsVerification: true,
useFastMode: true,
stealthProxy: false,
disableAdblock: false,
},
quality: -1,
},
"fire-engine;chrome-cdp;stealth": {
features: {
actions: true,
@ -218,7 +265,7 @@ export const engineOptions: {
stealthProxy: true,
disableAdblock: false,
},
quality: -1,
quality: -2,
},
"fire-engine(retry);chrome-cdp;stealth": {
features: {
@ -401,6 +448,41 @@ export function buildFallbackList(meta: Meta): {
_engines.splice(cacheIndex, 1);
}
}
const shouldUseIndex =
useIndex
&& process.env.FIRECRAWL_INDEX_WRITE_ONLY !== "true"
&& !meta.options.formats.includes("changeTracking")
&& meta.options.maxAge !== 0
&& (
meta.options.headers === undefined
|| Object.keys(meta.options.headers).length === 0
)
&& (
meta.options.actions === undefined
|| meta.options.actions.length === 0
)
&& meta.options.proxy !== "stealth";
meta.logger.warn("shouldUseIndex", {
shouldUseIndex,
formatsNoChangeTracking: !meta.options.formats.includes("changeTracking"),
maxAge: meta.options.maxAge !== 0,
headers: meta.options.headers === undefined || Object.keys(meta.options.headers).length === 0,
actions: meta.options.actions === undefined || meta.options.actions.length === 0,
proxy: meta.options.proxy !== "stealth",
});
if (!shouldUseIndex) {
const indexIndex = _engines.indexOf("index");
if (indexIndex !== -1) {
_engines.splice(indexIndex, 1);
}
const indexDocumentsIndex = _engines.indexOf("index;documents");
if (indexDocumentsIndex !== -1) {
_engines.splice(indexDocumentsIndex, 1);
}
}
const prioritySum = [...meta.featureFlags].reduce(
(a, x) => a + featureFlagOptions[x].priority,

View File

@ -0,0 +1,165 @@
import { Document } from "../../../../controllers/v1/types";
import { EngineScrapeResult } from "..";
import { Meta } from "../..";
import { getIndexFromGCS, hashURL, index_supabase_service, normalizeURLForIndex, saveIndexToGCS, generateURLSplits, addIndexInsertJob } from "../../../../services";
import { EngineError, IndexMissError } from "../../error";
import crypto from "crypto";
export async function sendDocumentToIndex(meta: Meta, document: Document) {
const shouldCache = meta.options.storeInCache
&& meta.winnerEngine !== "cache"
&& meta.winnerEngine !== "index"
&& meta.winnerEngine !== "index;documents"
&& !meta.featureFlags.has("actions")
&& (
meta.options.headers === undefined
|| Object.keys(meta.options.headers).length === 0
);
if (!shouldCache) {
return document;
}
(async () => {
try {
const normalizedURL = normalizeURLForIndex(meta.url);
const urlHash = await hashURL(normalizedURL);
const urlSplits = generateURLSplits(normalizedURL);
const urlSplitsHash = await Promise.all(urlSplits.map(split => hashURL(split)));
const indexId = crypto.randomUUID();
try {
await saveIndexToGCS(indexId, {
url: normalizedURL,
html: document.rawHtml!,
statusCode: document.metadata.statusCode,
error: document.metadata.error,
screenshot: document.screenshot,
numPages: document.metadata.numPages,
});
} catch (error) {
meta.logger.error("Failed to save document to index", {
error,
});
return document;
}
try {
await addIndexInsertJob({
id: indexId,
url: normalizedURL,
url_hash: urlHash,
url_splits: urlSplits,
url_splits_hash: urlSplitsHash,
original_url: document.metadata.sourceURL ?? meta.url,
resolved_url: document.metadata.url ?? document.metadata.sourceURL ?? meta.url,
has_screenshot: document.screenshot !== undefined && meta.featureFlags.has("screenshot"),
has_screenshot_fullscreen: document.screenshot !== undefined && meta.featureFlags.has("screenshot@fullScreen"),
is_mobile: meta.options.mobile,
block_ads: meta.options.blockAds,
location_country: meta.options.location?.country ?? null,
location_languages: meta.options.location?.languages ?? null,
status: document.metadata.statusCode,
...(urlSplitsHash.slice(0, 10).reduce((a,x,i) => ({
...a,
[`url_split_${i}_hash`]: x,
}), {})),
});
} catch (error) {
meta.logger.error("Failed to add document to index insert queue", {
error,
});
}
} catch (error) {
meta.logger.error("Failed to save document to index (outer)", {
error,
});
}
})();
return document;
}
const errorCountToRegister = 3;
export async function scrapeURLWithIndex(meta: Meta): Promise<EngineScrapeResult> {
const normalizedURL = normalizeURLForIndex(meta.url);
const urlHash = await hashURL(normalizedURL);
let selector = index_supabase_service
.from("index")
.select("id, created_at, status")
.eq("url_hash", urlHash)
.gte("created_at", new Date(Date.now() - meta.options.maxAge).toISOString())
.eq("is_mobile", meta.options.mobile)
.eq("block_ads", meta.options.blockAds);
if (meta.featureFlags.has("screenshot")) {
selector = selector.eq("has_screenshot", true);
}
if (meta.featureFlags.has("screenshot@fullScreen")) {
selector = selector.eq("has_screenshot_fullscreen", true);
}
if (meta.options.location?.country) {
selector = selector.eq("location_country", meta.options.location.country);
} else {
selector = selector.is("location_country", null);
}
if (meta.options.location?.languages) {
selector = selector.eq("location_languages", meta.options.location.languages);
} else {
selector = selector.is("location_languages", null);
}
const { data, error } = await selector
.order("created_at", { ascending: false })
.limit(5);
if (error) {
throw new EngineError("Failed to retrieve URL from DB index", {
cause: error,
});
}
let selectedRow: {
id: string;
created_at: string;
status: number;
} | null = null;
if (data.length > 0) {
const newest200Index = data.findIndex(x => x.status >= 200 && x.status < 300);
// If the newest 200 index is further back than the allowed error count, we should display the errored index entry
if (newest200Index >= errorCountToRegister || newest200Index === -1) {
selectedRow = data[0];
} else {
selectedRow = data[newest200Index];
}
}
if (selectedRow === null || selectedRow === undefined) {
throw new IndexMissError();
}
const id = data[0].id;
const doc = await getIndexFromGCS(id + ".json");
if (!doc) {
throw new EngineError("Document not found in GCS");
}
return {
url: doc.url,
html: doc.html,
statusCode: doc.statusCode,
error: doc.error,
screenshot: doc.screenshot,
numPages: doc.numPages,
cacheInfo: {
created_at: new Date(data[0].created_at),
}
};
}

View File

@ -92,3 +92,9 @@ export class PDFInsufficientTimeError extends Error {
super(`Insufficient time to process PDF of ${pageCount} pages. Please increase the timeout parameter in your scrape request to at least ${minTimeout}ms.`);
}
}
export class IndexMissError extends Error {
constructor() {
super("Index doesn't have the page we're looking for");
}
}

View File

@ -23,6 +23,7 @@ import {
UnsupportedFileError,
SSLError,
PDFInsufficientTimeError,
IndexMissError,
} from "./error";
import { executeTransformers } from "./transformers";
import { LLMRefusalError } from "./transformers/llmExtract";
@ -59,6 +60,7 @@ export type Meta = {
status: number;
} | null | undefined; // undefined: no prefetch yet, null: prefetch came back empty
costTracking: CostTracking;
winnerEngine?: Engine;
};
function buildFeatureFlags(
@ -189,6 +191,7 @@ export type InternalOptions = {
unnormalizedSourceURL?: string;
saveScrapeResultToGCS?: boolean; // Passed along to fire-engine
bypassBilling?: boolean;
};
export type EngineResultsTracker = {
@ -295,11 +298,23 @@ async function scrapeURLLoop(meta: Meta): Promise<ScrapeUrlResponse> {
unsupportedFeatures,
result: engineResult as EngineScrapeResult & { markdown: string },
};
meta.winnerEngine = engine;
break;
}
} catch (error) {
if (error instanceof EngineError) {
meta.logger.info("Engine " + engine + " could not scrape the page.", {
meta.logger.warn("Engine " + engine + " could not scrape the page.", {
error,
});
results[engine] = {
state: "error",
error: safeguardCircularError(error),
unexpected: false,
startedAt,
finishedAt: Date.now(),
};
} else if (error instanceof IndexMissError) {
meta.logger.info("Engine " + engine + " could not find the page in the index.", {
error,
});
results[engine] = {
@ -385,6 +400,14 @@ async function scrapeURLLoop(meta: Meta): Promise<ScrapeUrlResponse> {
numPages: result.result.numPages,
contentType: result.result.contentType,
proxyUsed: meta.featureFlags.has("stealthProxy") ? "stealth" : "basic",
...(results["index"] ? (
result.result.cacheInfo ? {
cacheState: "hit",
cachedAt: result.result.cacheInfo.created_at.toISOString(),
} : {
cacheState: "miss",
}
) : {})
},
};

View File

@ -133,7 +133,7 @@ export async function extractMetadata(
// Extract all meta tags for custom metadata
soup("meta").each((i, elem) => {
try {
const name = soup(elem).attr("name") || soup(elem).attr("property");
const name = soup(elem).attr("name") || soup(elem).attr("property") || soup(elem).attr("itemprop");
const content = soup(elem).attr("content");
if (name && content) {

View File

@ -11,6 +11,9 @@ import { saveToCache } from "./cache";
import { performAgent } from "./agent";
import { deriveDiff } from "./diff";
import { useIndex } from "../../../services/index";
import { sendDocumentToIndex } from "../engines/index/index";
export type Transformer = (
meta: Meta,
document: Document,
@ -205,6 +208,7 @@ export const transformerStack: Transformer[] = [
deriveLinksFromHTML,
deriveMetadataFromRawHTML,
uploadScreenshot,
...(useIndex ? [sendDocumentToIndex] : []),
performLLMExtract,
performAgent,
deriveDiff,

View File

@ -0,0 +1,241 @@
import { createClient, SupabaseClient } from "@supabase/supabase-js";
import { logger } from "../lib/logger";
import { configDotenv } from "dotenv";
import { Storage } from "@google-cloud/storage";
import crypto from "crypto";
import { redisEvictConnection } from "./redis";
configDotenv();
// SupabaseService class initializes the Supabase client conditionally based on environment variables.
class IndexSupabaseService {
private client: SupabaseClient | null = null;
constructor() {
const supabaseUrl = process.env.INDEX_SUPABASE_URL;
const supabaseServiceToken = process.env.INDEX_SUPABASE_SERVICE_TOKEN;
// Only initialize the Supabase client if both URL and Service Token are provided.
if (!supabaseUrl || !supabaseServiceToken) {
// Warn the user that Authentication is disabled by setting the client to null
logger.warn(
"Index supabase client will not be initialized.",
);
this.client = null;
} else {
this.client = createClient(supabaseUrl, supabaseServiceToken);
}
}
// Provides access to the initialized Supabase client, if available.
getClient(): SupabaseClient | null {
return this.client;
}
}
const serv = new IndexSupabaseService();
// Using a Proxy to handle dynamic access to the Supabase client or service methods.
// This approach ensures that if Supabase is not configured, any attempt to use it will result in a clear error.
export const index_supabase_service: SupabaseClient = new Proxy(
serv,
{
get: function (target, prop, receiver) {
const client = target.getClient();
// If the Supabase client is not initialized, intercept property access to provide meaningful error feedback.
if (client === null) {
return () => {
throw new Error("Index supabase client is not configured.");
};
}
// Direct access to SupabaseService properties takes precedence.
if (prop in target) {
return Reflect.get(target, prop, receiver);
}
// Otherwise, delegate access to the Supabase client.
return Reflect.get(client, prop, receiver);
},
},
) as unknown as SupabaseClient;
const credentials = process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined;
export async function getIndexFromGCS(url: string): Promise<any | null> {
// logger.info(`Getting f-engine document from GCS`, {
// url,
// });
try {
if (!process.env.GCS_INDEX_BUCKET_NAME) {
return null;
}
const storage = new Storage({ credentials });
const bucket = storage.bucket(process.env.GCS_INDEX_BUCKET_NAME);
const blob = bucket.file(`${url}`);
const [exists] = await blob.exists();
if (!exists) {
return null;
}
const [blobContent] = await blob.download();
const parsed = JSON.parse(blobContent.toString());
return parsed;
} catch (error) {
logger.error(`Error getting f-engine document from GCS`, {
error,
url,
});
return null;
}
}
export async function saveIndexToGCS(id: string, doc: {
url: string;
html: string;
statusCode: number;
error?: string;
screenshot?: string;
numPages?: number;
}): Promise<void> {
try {
if (!process.env.GCS_INDEX_BUCKET_NAME) {
return;
}
const storage = new Storage({ credentials });
const bucket = storage.bucket(process.env.GCS_INDEX_BUCKET_NAME);
const blob = bucket.file(`${id}.json`);
for (let i = 0; i < 3; i++) {
try {
await blob.save(JSON.stringify(doc), {
contentType: "application/json",
});
break;
} catch (error) {
if (i === 2) {
throw error;
} else {
logger.error(`Error saving index document to GCS, retrying`, {
error,
indexId: id,
i,
});
}
}
}
} catch (error) {
throw new Error("Error saving index document to GCS", {
cause: error,
});
}
}
export const useIndex =
process.env.INDEX_SUPABASE_URL !== "" &&
process.env.INDEX_SUPABASE_URL !== undefined;
export function normalizeURLForIndex(url: string): string {
const urlObj = new URL(url);
urlObj.hash = "";
urlObj.protocol = "https";
if (urlObj.port === "80" || urlObj.port === "443") {
urlObj.port = "";
}
if (urlObj.hostname.startsWith("www.")) {
urlObj.hostname = urlObj.hostname.slice(4);
}
if (urlObj.pathname.endsWith("/index.html")) {
urlObj.pathname = urlObj.pathname.slice(0, -10);
} else if (urlObj.pathname.endsWith("/index.php")) {
urlObj.pathname = urlObj.pathname.slice(0, -9);
} else if (urlObj.pathname.endsWith("/index.htm")) {
urlObj.pathname = urlObj.pathname.slice(0, -9);
} else if (urlObj.pathname.endsWith("/index.shtml")) {
urlObj.pathname = urlObj.pathname.slice(0, -11);
} else if (urlObj.pathname.endsWith("/index.xml")) {
urlObj.pathname = urlObj.pathname.slice(0, -9);
}
if (urlObj.pathname.endsWith("/")) {
urlObj.pathname = urlObj.pathname.slice(0, -1);
}
return urlObj.toString();
}
export async function hashURL(url: string): Promise<string> {
return "\\x" + crypto.createHash("sha256").update(url).digest("hex");
}
export function generateURLSplits(url: string): string[] {
const urls: string[] = [];
const urlObj = new URL(url);
urlObj.hash = "";
urlObj.search = "";
const pathnameParts = urlObj.pathname.split("/");
for (let i = 0; i <= pathnameParts.length; i++) {
urlObj.pathname = pathnameParts.slice(0, i).join("/");
urls.push(urlObj.href);
}
urls.push(url);
return [...new Set(urls.map(x => normalizeURLForIndex(x)))];
}
const INDEX_INSERT_QUEUE_KEY = "index-insert-queue";
const INDEX_INSERT_BATCH_SIZE = 1000;
export async function addIndexInsertJob(data: any) {
await redisEvictConnection.rpush(INDEX_INSERT_QUEUE_KEY, JSON.stringify(data));
}
export async function getIndexInsertJobs(): Promise<any[]> {
const jobs = (await redisEvictConnection.lpop(INDEX_INSERT_QUEUE_KEY, INDEX_INSERT_BATCH_SIZE)) ?? [];
return jobs.map(x => JSON.parse(x));
}
export async function processIndexInsertJobs() {
const jobs = await getIndexInsertJobs();
if (jobs.length === 0) {
return;
}
logger.info(`Index inserter found jobs to insert`, { jobCount: jobs.length });
try {
await index_supabase_service.from("index").insert(jobs);
logger.info(`Index inserter inserted jobs`, { jobCount: jobs.length });
} catch (error) {
logger.error(`Index inserter failed to insert jobs`, { error, jobCount: jobs.length });
}
}
export async function getIndexInsertQueueLength(): Promise<number> {
return await redisEvictConnection.llen(INDEX_INSERT_QUEUE_KEY) ?? 0;
}
export async function queryIndexAtSplitLevel(url: string, limit: number): Promise<string[]> {
if (!useIndex || process.env.FIRECRAWL_INDEX_WRITE_ONLY === "true") {
return [];
}
const urlObj = new URL(url);
urlObj.search = "";
const urlSplitsHash = generateURLSplits(urlObj.href).map(x => hashURL(x));
const { data, error } = await index_supabase_service
.from("index")
.select("resolved_url")
.eq("url_split_" + (urlSplitsHash.length - 1) + "_hash", urlSplitsHash[urlSplitsHash.length - 1])
.gte("created_at", new Date(Date.now() - 2 * 24 * 60 * 60 * 1000).toISOString())
.limit(limit)
if (error) {
logger.warn("Error querying index", { error, url, limit });
return [];
}
return [...new Set((data ?? []).map((x) => x.resolved_url))];
}

View File

@ -14,6 +14,7 @@ import { saveCrawlMap } from "./crawl-maps-index";
import { processBillingBatch, queueBillingOperation, startBillingBatchProcessing } from "../billing/batch_billing";
import systemMonitor from "../system-monitor";
import { v4 as uuidv4 } from "uuid";
import { processIndexInsertJobs } from "..";
const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000;
const workerStalledCheckInterval =
@ -226,6 +227,8 @@ const workerFun = async (queue: Queue, jobProcessor: (token: string, job: Job) =
process.exit(0);
};
const INDEX_INSERT_INTERVAL = 15000;
// Start the workers
(async () => {
// Start index worker
@ -234,7 +237,17 @@ const workerFun = async (queue: Queue, jobProcessor: (token: string, job: Job) =
// Start billing worker and batch processing
startBillingBatchProcessing();
const billingWorkerPromise = workerFun(getBillingQueue(), processBillingJobInternal);
const indexInserterInterval = setInterval(async () => {
if (isShuttingDown) {
return;
}
await processIndexInsertJobs();
}, INDEX_INSERT_INTERVAL);
// Wait for both workers to complete (which should only happen on shutdown)
await Promise.all([indexWorkerPromise, billingWorkerPromise]);
clearInterval(indexInserterInterval);
})();

View File

@ -21,12 +21,13 @@ function cleanOfNull<T>(x: T): T {
}
}
export async function logJob(job: FirecrawlJob, force: boolean = false) {
export async function logJob(job: FirecrawlJob, force: boolean = false, bypassLogging: boolean = false) {
try {
const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true";
if (!useDbAuthentication) {
return;
}
// Redact any pages that have an authorization header
// actually, Don't. we use the db to retrieve results now. this breaks authed crawls - mogery
@ -70,6 +71,10 @@ export async function logJob(job: FirecrawlJob, force: boolean = false) {
await saveJobToGCS(job);
}
if (bypassLogging) {
return;
}
if (force) {
let i = 0,
done = false;

View File

@ -194,7 +194,7 @@ export async function addScrapeJob(
},
);
} else {
await addScrapeJobRaw(webScraperOptions, options, jobId, jobPriority);
await addScrapeJobRaw(webScraperOptions, options, jobId, jobPriority, directToBullMQ);
}
}

View File

@ -86,6 +86,8 @@ import { robustFetch } from "../scraper/scrapeURL/lib/fetch";
import { RateLimiterMode } from "../types";
import { calculateCreditsToBeBilled } from "../lib/scrape-billing";
import { redisEvictConnection } from "./redis";
import { generateURLSplits, queryIndexAtSplitLevel } from "./index";
import { WebCrawler } from "../scraper/WebScraper/crawler";
import type { Logger } from "winston";
configDotenv();
@ -319,7 +321,7 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) {
scrapeOptions: sc.scrapeOptions,
crawlerOptions: sc.crawlerOptions,
origin: job.data.origin,
});
}, false, job.data.internalOptions?.bypassBilling ?? false);
logger.info("Logged crawl!");
const data = {
@ -371,8 +373,10 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) {
origin: job.data.origin,
},
true,
job.data.internalOptions?.bypassBilling ?? false,
);
// v1 web hooks, call when done with no data, but with event completed
if (job.data.v1 && job.data.webhook) {
callWebhook(
@ -911,6 +915,29 @@ const workerFun = async (
}
};
async function kickoffGetIndexLinks(sc: StoredCrawl, crawler: WebCrawler, url: string) {
if (sc.crawlerOptions.ignoreSitemap) {
return [];
}
const trimmedURL = new URL(url);
trimmedURL.search = "";
const index = await queryIndexAtSplitLevel(
sc.crawlerOptions.allowBackwardCrawling ? generateURLSplits(trimmedURL.href)[0] : trimmedURL.href,
sc.crawlerOptions.limit ?? 100,
);
const validIndexLinks = crawler.filterLinks(
index.filter(x => crawler.filterURL(x, trimmedURL.href) !== null),
sc.crawlerOptions.limit ?? 100,
sc.crawlerOptions.maxDepth ?? 10,
false,
);
return validIndexLinks;
}
async function processKickoffJob(job: Job & { id: string }, token: string) {
const logger = _logger.child({
module: "queue-worker",
@ -1028,6 +1055,61 @@ async function processKickoffJob(job: Job & { id: string }, token: string) {
});
}
const indexLinks = await kickoffGetIndexLinks(sc, crawler, job.data.url);
if (indexLinks.length > 0) {
logger.debug("Using index links of length " + indexLinks.length, {
indexLinksLength: indexLinks.length,
});
let jobPriority = await getJobPriority({
team_id: job.data.team_id,
basePriority: 21,
});
logger.debug("Using job priority " + jobPriority, { jobPriority });
const jobs = indexLinks.map((url) => {
const uuid = uuidv4();
return {
name: uuid,
data: {
url,
mode: "single_urls" as const,
team_id: job.data.team_id,
crawlerOptions: job.data.crawlerOptions,
scrapeOptions: job.data.scrapeOptions,
internalOptions: sc.internalOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
sitemapped: true,
webhook: job.data.webhook,
v1: job.data.v1,
},
opts: {
jobId: uuid,
priority: 20,
},
};
});
logger.debug("Locking URLs...");
const lockedIds = await lockURLsIndividually(
job.data.crawl_id,
sc,
jobs.map((x) => ({ id: x.opts.jobId, url: x.data.url })),
);
const lockedJobs = jobs.filter((x) =>
lockedIds.find((y) => y.id === x.opts.jobId),
);
logger.debug("Adding scrape jobs to Redis...");
await addCrawlJobs(
job.data.crawl_id,
lockedJobs.map((x) => x.opts.jobId),
);
logger.debug("Adding scrape jobs to BullMQ...");
await addScrapeJobs(lockedJobs);
}
logger.debug("Done queueing jobs!");
await finishCrawlKickoff(job.data.crawl_id);
@ -1048,7 +1130,7 @@ async function processKickoffJob(job: Job & { id: string }, token: string) {
async function billScrapeJob(job: Job & { id: string }, document: Document, logger: Logger, costTracking?: CostTracking) {
let creditsToBeBilled: number | null = null;
if (job.data.is_scrape !== true) {
if (job.data.is_scrape !== true && !job.data.internalOptions?.bypassBilling) {
creditsToBeBilled = await calculateCreditsToBeBilled(job.data.scrapeOptions, document, job.id, costTracking);
if (
@ -1378,6 +1460,7 @@ async function processJob(job: Job & { id: string }, token: string) {
credits_billed,
},
true,
job.data.internalOptions?.bypassBilling ?? false,
);
if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) {
@ -1424,7 +1507,7 @@ async function processJob(job: Job & { id: string }, token: string) {
cost_tracking: costTracking,
pdf_num_pages: doc.metadata.numPages,
credits_billed,
});
}, false, job.data.internalOptions?.bypassBilling ?? false);
}
logger.info(`🐂 Job done ${job.id}`);
@ -1523,6 +1606,7 @@ async function processJob(job: Job & { id: string }, token: string) {
cost_tracking: costTracking,
},
true,
job.data.internalOptions?.bypassBilling ?? false,
);
return data;
}

View File

@ -1,6 +1,6 @@
{
"name": "@mendable/firecrawl-js",
"version": "1.25.2",
"version": "1.25.3",
"description": "JavaScript SDK for Firecrawl API",
"main": "dist/index.js",
"types": "dist/index.d.ts",

View File

@ -120,6 +120,7 @@ export interface CrawlScrapeOptions {
removeBase64Images?: boolean;
blockAds?: boolean;
proxy?: "basic" | "stealth" | "auto";
storeInCache?: boolean;
}
export type Action = {

View File

@ -46,6 +46,9 @@ x-common-env: &common-env
services:
playwright-service:
# NOTE: If you don't want to build the service locally,
# uncomment the build: statement and comment out the image: statement
# image: ghcr.io/mendableai/playwright-service:latest
build: apps/playwright-service-ts
environment:
PORT: 3000