diff --git a/.github/workflows/deploy-playwright.yml b/.github/workflows/deploy-playwright.yml new file mode 100644 index 00000000..0c7bf465 --- /dev/null +++ b/.github/workflows/deploy-playwright.yml @@ -0,0 +1,34 @@ +name: Deploy Playwright to GHCR + +env: + DOTNET_VERSION: '6.0.x' + +on: + push: + branches: + - main + paths: + - apps/playwright-service-ts/** + workflow_dispatch: + +jobs: + push-app-image: + runs-on: ubuntu-latest + defaults: + run: + working-directory: './apps/playwright-service-ts' + steps: + - name: 'Checkout GitHub Action' + uses: actions/checkout@main + + - name: 'Login to GitHub Container Registry' + uses: docker/login-action@v1 + with: + registry: ghcr.io + username: ${{github.actor}} + password: ${{secrets.GITHUB_TOKEN}} + + - name: 'Build Inventory Image' + run: | + docker build . --tag ghcr.io/mendableai/playwright-service:latest + docker push ghcr.io/mendableai/playwright-service:latest \ No newline at end of file diff --git a/.github/workflows/test-server-self-host.yml b/.github/workflows/test-server-self-host.yml index 39a260ab..56232fb8 100644 --- a/.github/workflows/test-server-self-host.yml +++ b/.github/workflows/test-server-self-host.yml @@ -16,6 +16,7 @@ env: TEST_SUITE_SELF_HOSTED: true USE_GO_MARKDOWN_PARSER: true FIRECRAWL_DEBUG_FILTER_LINKS: true + SENTRY_ENVIRONMENT: dev jobs: test: diff --git a/.github/workflows/test-server.yml b/.github/workflows/test-server.yml index 8d043ba3..c0b5cb84 100644 --- a/.github/workflows/test-server.yml +++ b/.github/workflows/test-server.yml @@ -21,6 +21,9 @@ env: SUPABASE_SERVICE_TOKEN: ${{ secrets.SUPABASE_SERVICE_TOKEN }} SUPABASE_URL: ${{ secrets.SUPABASE_URL }} SUPABASE_REPLICA_URL: ${{ secrets.SUPABASE_REPLICA_URL }} + INDEX_SUPABASE_SERVICE_TOKEN: ${{ secrets.INDEX_SUPABASE_SERVICE_TOKEN }} + INDEX_SUPABASE_ANON_TOKEN: ${{ secrets.INDEX_SUPABASE_ANON_TOKEN }} + INDEX_SUPABASE_URL: ${{ secrets.INDEX_SUPABASE_URL }} TEST_API_KEY: ${{ secrets.TEST_API_KEY }} FIRE_ENGINE_BETA_URL: ${{ secrets.FIRE_ENGINE_BETA_URL }} USE_DB_AUTHENTICATION: true @@ -30,6 +33,7 @@ env: RUNPOD_MU_API_KEY: ${{ secrets.RUNPOD_MU_API_KEY }} GCS_CREDENTIALS: ${{ secrets.GCS_CREDENTIALS }} GCS_BUCKET_NAME: ${{ secrets.GCS_BUCKET_NAME }} + GCS_INDEX_BUCKET_NAME: ${{ secrets.GCS_INDEX_BUCKET_NAME }} GOOGLE_GENERATIVE_AI_API_KEY: ${{ secrets.GOOGLE_GENERATIVE_AI_API_KEY }} GROQ_API_KEY: ${{ secrets.GROQ_API_KEY }} ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} @@ -100,11 +104,11 @@ jobs: working-directory: ./apps/api id: start_workers - name: Start index worker - run: npm run index-worker & + run: npm run index-worker > index-worker.log 2>&1 & working-directory: ./apps/api id: start_index_worker - name: Wait for API - run: pnpx wait-on tcp:3002 -t 15s + run: pnpx wait-on tcp:3002 -t 30s - name: Run snippet tests run: | npm run test:snips @@ -118,4 +122,5 @@ jobs: # name: Logs # path: | # ./apps/api/api.log - # ./apps/api/worker.log \ No newline at end of file + # ./apps/api/worker.log + # ./apps/api/index-worker.log \ No newline at end of file diff --git a/apps/api/sharedLibs/html-transformer/src/lib.rs b/apps/api/sharedLibs/html-transformer/src/lib.rs index 1e4f8302..4018746b 100644 --- a/apps/api/sharedLibs/html-transformer/src/lib.rs +++ b/apps/api/sharedLibs/html-transformer/src/lib.rs @@ -124,7 +124,7 @@ pub unsafe extern "C" fn extract_metadata(html: *const libc::c_char) -> *mut lib let meta = meta.as_node().as_element().unwrap(); let attrs = meta.attributes.borrow(); - if let Some(name) = attrs.get("name").or_else(|| attrs.get("property")) { + if let Some(name) = attrs.get("name").or_else(|| attrs.get("property")).or_else(|| attrs.get("itemprop")) { if let Some(content) = attrs.get("content") { if let Some(v) = out.get(name) { match v { diff --git a/apps/api/src/__tests__/queue-concurrency-integration.test.ts b/apps/api/src/__tests__/queue-concurrency-integration.test.ts index 940efaaa..0f427e48 100644 --- a/apps/api/src/__tests__/queue-concurrency-integration.test.ts +++ b/apps/api/src/__tests__/queue-concurrency-integration.test.ts @@ -64,6 +64,8 @@ describe("Queue Concurrency Integration", () => { removeBase64Images: true, fastMode: false, blockAds: true, + maxAge: 0, + storeInCache: true, }; beforeEach(() => { diff --git a/apps/api/src/__tests__/snips/billing.test.ts b/apps/api/src/__tests__/snips/billing.test.ts index 9d183e6d..ce037fc9 100644 --- a/apps/api/src/__tests__/snips/billing.test.ts +++ b/apps/api/src/__tests__/snips/billing.test.ts @@ -1,197 +1,211 @@ -// import { batchScrape, crawl, creditUsage, extract, map, scrape, search, tokenUsage } from "./lib"; +import { batchScrape, crawl, creditUsage, extract, map, scrape, search, tokenUsage } from "./lib"; -// const sleep = (ms: number) => new Promise(x => setTimeout(() => x(true), ms)); -// const sleepForBatchBilling = () => sleep(20000); +const sleep = (ms: number) => new Promise(x => setTimeout(() => x(true), ms)); +const sleepForBatchBilling = () => sleep(40000); -// beforeAll(async () => { -// // Wait for previous test runs to stop billing processing -// if (!process.env.TEST_SUITE_SELF_HOSTED) { -// await sleep(40000); -// } -// }, 50000); +beforeAll(async () => { + // Wait for previous test runs to stop billing processing + if (!process.env.TEST_SUITE_SELF_HOSTED) { + await sleep(40000); + } +}, 50000); -// describe("Billing tests", () => { -// if (process.env.TEST_SUITE_SELF_HOSTED) { -// it("dummy", () => { -// expect(true).toBe(true); -// }); -// } else { -// it("bills scrape correctly", async () => { -// const rc1 = (await creditUsage()).remaining_credits; +describe("Billing tests", () => { + if (process.env.TEST_SUITE_SELF_HOSTED) { + it("dummy", () => { + expect(true).toBe(true); + }); + } else { + it("bills scrape correctly", async () => { + const rc1 = (await creditUsage()).remaining_credits; -// // Run all scrape operations in parallel with Promise.all -// await Promise.all([ -// // scrape 1: regular fc.dev scrape (1 credit) -// scrape({ -// url: "https://firecrawl.dev" -// }), + // Run all scrape operations in parallel with Promise.all + await Promise.all([ + // scrape 1: regular fc.dev scrape (1 credit) + scrape({ + url: "https://firecrawl.dev" + }), -// // scrape 1.1: regular fc.dev scrape (1 credit) -// scrape({ -// url: "https://firecrawl.dev" -// }), + // scrape 1.1: regular fc.dev scrape (1 credit) + scrape({ + url: "https://firecrawl.dev" + }), -// // scrape 2: fc.dev with json (5 credits) -// scrape({ -// url: "https://firecrawl.dev", -// formats: ["json"], -// jsonOptions: { -// schema: { -// type: "object", -// properties: { -// is_open_source: { type: "boolean" }, -// }, -// required: ["is_open_source"], -// }, -// }, -// }) -// ]); + // scrape 2: fc.dev with json (5 credits) + scrape({ + url: "https://firecrawl.dev", + formats: ["json"], + jsonOptions: { + schema: { + type: "object", + properties: { + is_open_source: { type: "boolean" }, + }, + required: ["is_open_source"], + }, + }, + }) + ]); -// // sum: 7 credits + // sum: 7 credits -// await sleepForBatchBilling(); + await sleepForBatchBilling(); -// const rc2 = (await creditUsage()).remaining_credits; + const rc2 = (await creditUsage()).remaining_credits; -// expect(rc1 - rc2).toBe(7); -// }, 120000); + expect(rc1 - rc2).toBe(7); + }, 120000); -// it("bills batch scrape correctly", async () => { -// const rc1 = (await creditUsage()).remaining_credits; + it("bills batch scrape correctly", async () => { + const rc1 = (await creditUsage()).remaining_credits; -// // Run both scrape operations in parallel with Promise.all -// const [scrape1, scrape2] = await Promise.all([ -// // scrape 1: regular batch scrape with failing domain (2 credits) -// batchScrape({ -// urls: [ -// "https://firecrawl.dev", -// "https://mendable.ai", -// "https://thisdomaindoesnotexistandwillfail.fcr", -// ], -// }), + // Run both scrape operations in parallel with Promise.all + const [scrape1, scrape2] = await Promise.all([ + // scrape 1: regular batch scrape with failing domain (2 credits) + batchScrape({ + urls: [ + "https://firecrawl.dev", + "https://mendable.ai", + "https://thisdomaindoesnotexistandwillfail.fcr", + ], + }), -// // scrape 2: batch scrape with json (10 credits) -// batchScrape({ -// urls: [ -// "https://firecrawl.dev", -// "https://mendable.ai", -// "https://thisdomaindoesnotexistandwillfail.fcr", -// ], -// formats: ["json"], -// jsonOptions: { -// schema: { -// type: "object", -// properties: { -// four_word_summary: { type: "string" }, -// }, -// required: ["four_word_summary"], -// }, -// }, -// }) -// ]); + // scrape 2: batch scrape with json (10 credits) + batchScrape({ + urls: [ + "https://firecrawl.dev", + "https://mendable.ai", + "https://thisdomaindoesnotexistandwillfail.fcr", + ], + formats: ["json"], + jsonOptions: { + schema: { + type: "object", + properties: { + four_word_summary: { type: "string" }, + }, + required: ["four_word_summary"], + }, + }, + }) + ]); -// // sum: 12 credits + // sum: 12 credits -// await sleepForBatchBilling(); + await sleepForBatchBilling(); -// const rc2 = (await creditUsage()).remaining_credits; + const rc2 = (await creditUsage()).remaining_credits; -// expect(rc1 - rc2).toBe(12); -// }, 600000); + expect(rc1 - rc2).toBe(12); + }, 600000); -// it("bills crawl correctly", async () => { -// const rc1 = (await creditUsage()).remaining_credits; + it("bills crawl correctly", async () => { + const rc1 = (await creditUsage()).remaining_credits; -// // Run both crawl operations in parallel with Promise.all -// const [crawl1, crawl2] = await Promise.all([ -// // crawl 1: regular fc.dev crawl (x credits) -// crawl({ -// url: "https://firecrawl.dev", -// }), + // Run both crawl operations in parallel with Promise.all + const [crawl1, crawl2] = await Promise.all([ + // crawl 1: regular fc.dev crawl (x credits) + crawl({ + url: "https://firecrawl.dev", + limit: 10, + }), -// // crawl 2: fc.dev crawl with json (5y credits) -// crawl({ -// url: "https://firecrawl.dev", -// scrapeOptions: { -// formats: ["json"], -// jsonOptions: { -// schema: { -// type: "object", -// properties: { -// four_word_summary: { type: "string" }, -// }, -// required: ["four_word_summary"], -// }, -// }, -// } -// }) -// ]); + // crawl 2: fc.dev crawl with json (5y credits) + crawl({ + url: "https://firecrawl.dev", + scrapeOptions: { + formats: ["json"], + jsonOptions: { + schema: { + type: "object", + properties: { + four_word_summary: { type: "string" }, + }, + required: ["four_word_summary"], + }, + }, + }, + limit: 10, + }) + ]); -// expect(crawl1.success).toBe(true); -// expect(crawl2.success).toBe(true); + expect(crawl1.success).toBe(true); + expect(crawl2.success).toBe(true); -// // sum: x+5y credits + // sum: x+5y credits -// await sleepForBatchBilling(); + await sleepForBatchBilling(); -// const rc2 = (await creditUsage()).remaining_credits; + const rc2 = (await creditUsage()).remaining_credits; -// if (crawl1.success && crawl2.success) { -// expect(rc1 - rc2).toBe(crawl1.completed + crawl2.completed * 5); -// } -// }, 600000); + if (crawl1.success && crawl2.success) { + expect(rc1 - rc2).toBe(crawl1.completed + crawl2.completed * 5); + } + }, 600000); -// it("bills map correctly", async () => { -// const rc1 = (await creditUsage()).remaining_credits; -// await map({ url: "https://firecrawl.dev" }); -// await sleepForBatchBilling(); -// const rc2 = (await creditUsage()).remaining_credits; -// expect(rc1 - rc2).toBe(1); -// }, 60000); + it("bills map correctly", async () => { + const rc1 = (await creditUsage()).remaining_credits; + await map({ url: "https://firecrawl.dev" }); + await sleepForBatchBilling(); + const rc2 = (await creditUsage()).remaining_credits; + expect(rc1 - rc2).toBe(1); + }, 60000); -// it("bills search correctly", async () => { -// const rc1 = (await creditUsage()).remaining_credits; + it("bills search correctly", async () => { + const rc1 = (await creditUsage()).remaining_credits; -// const results = await search({ -// query: "firecrawl" -// }); + const results = await search({ + query: "firecrawl" + }); -// await sleepForBatchBilling(); + await sleepForBatchBilling(); -// const rc2 = (await creditUsage()).remaining_credits; + const rc2 = (await creditUsage()).remaining_credits; -// expect(rc1 - rc2).toBe(results.length); -// }, 60000); + expect(rc1 - rc2).toBe(results.length); + }, 60000); -// it("bills extract correctly", async () => { -// const rc1 = (await tokenUsage()).remaining_tokens; + it("bills search with scrape correctly", async () => { + const rc1 = (await creditUsage()).remaining_credits; + + const results = await search({ + query: "firecrawl", + scrapeOptions: { + formats: ["markdown"], + }, + }); + + await sleepForBatchBilling(); + + const rc2 = (await creditUsage()).remaining_credits; + + expect(rc1 - rc2).toBe(results.length); + }, 600000); + + it("bills extract correctly", async () => { + const rc1 = (await tokenUsage()).remaining_tokens; -// await extract({ -// urls: ["https://firecrawl.dev"], -// schema: { -// "type": "object", -// "properties": { -// "is_open_source": { -// "type": "boolean" -// } -// }, -// "required": [ -// "is_open_source" -// ] -// }, -// origin: "api-sdk", -// }); + await extract({ + urls: ["https://firecrawl.dev"], + schema: { + "type": "object", + "properties": { + "is_open_source": { + "type": "boolean" + } + }, + "required": [ + "is_open_source" + ] + }, + origin: "api-sdk", + }); -// await sleepForBatchBilling(); + await sleepForBatchBilling(); -// const rc2 = (await tokenUsage()).remaining_tokens; + const rc2 = (await tokenUsage()).remaining_tokens; -// expect(rc1 - rc2).toBe(305); -// }, 300000); -// } -// }); - -// temporarily disabled -it("is mocked", () => { - expect(true).toBe(true); -}); \ No newline at end of file + expect(rc1 - rc2).toBe(305); + }, 300000); + } +}); diff --git a/apps/api/src/__tests__/snips/scrape.test.ts b/apps/api/src/__tests__/snips/scrape.test.ts index 156e8e15..ccb38712 100644 --- a/apps/api/src/__tests__/snips/scrape.test.ts +++ b/apps/api/src/__tests__/snips/scrape.test.ts @@ -1,4 +1,5 @@ import { scrape, scrapeStatus, scrapeWithFailure } from "./lib"; +import crypto from "crypto"; describe("Scrape tests", () => { it.concurrent("mocking works properly", async () => { @@ -72,28 +73,72 @@ describe("Scrape tests", () => { }); expect(response.markdown).toContain("Firecrawl"); + + // Give time to propagate to read replica + await new Promise(resolve => setTimeout(resolve, 1000)); const status = await scrapeStatus(response.metadata.scrapeId!); expect(JSON.stringify(status)).toBe(JSON.stringify(response)); }, 60000); - describe("Ad blocking (f-e dependant)", () => { - it.concurrent("blocks ads by default", async () => { - const response = await scrape({ - url: "https://www.allrecipes.com/recipe/18185/yum/", + // describe("Ad blocking (f-e dependant)", () => { + // it.concurrent("blocks ads by default", async () => { + // const response = await scrape({ + // url: "https://www.allrecipes.com/recipe/18185/yum/", + // }); + + // expect(response.markdown).not.toContain(".g.doubleclick.net/"); + // }, 30000); + + // it.concurrent("doesn't block ads if explicitly disabled", async () => { + // const response = await scrape({ + // url: "https://www.allrecipes.com/recipe/18185/yum/", + // blockAds: false, + // }); + + // expect(response.markdown).toMatch(/(\.g\.doubleclick\.net|amazon-adsystem\.com)\//); + // }, 30000); + // }); + + describe("Index", () => { + it.concurrent("caches properly", async () => { + const id = crypto.randomUUID(); + const url = "https://firecrawl.dev/?testId=" + id; + + const response1 = await scrape({ + url, + maxAge: 120000, + storeInCache: false, }); - expect(response.markdown).not.toContain(".g.doubleclick.net/"); - }, 30000); + expect(response1.metadata.cacheState).toBe("miss"); - it.concurrent("doesn't block ads if explicitly disabled", async () => { - const response = await scrape({ - url: "https://www.allrecipes.com/recipe/18185/yum/", - blockAds: false, + await new Promise(resolve => setTimeout(resolve, 17000)); + + const response2 = await scrape({ + url, + maxAge: 120000, }); - expect(response.markdown).toMatch(/(\.g\.doubleclick\.net|amazon-adsystem\.com)\//); - }, 30000); + expect(response2.metadata.cacheState).toBe("miss"); + + await new Promise(resolve => setTimeout(resolve, 17000)); + + const response3 = await scrape({ + url, + maxAge: 120000, + }); + + expect(response3.metadata.cacheState).toBe("hit"); + expect(response3.metadata.cachedAt).toBeDefined(); + + const response4 = await scrape({ + url, + maxAge: 1, + }); + + expect(response4.metadata.cacheState).toBe("miss"); + }, 150000 + 2 * 17000); }); describe("Change Tracking format", () => { diff --git a/apps/api/src/controllers/v0/admin/index-queue-prometheus.ts b/apps/api/src/controllers/v0/admin/index-queue-prometheus.ts new file mode 100644 index 00000000..d1476869 --- /dev/null +++ b/apps/api/src/controllers/v0/admin/index-queue-prometheus.ts @@ -0,0 +1,12 @@ +import type { Request, Response } from "express"; +import { getIndexInsertQueueLength } from "../../../services"; + +export async function indexQueuePrometheus(req: Request, res: Response) { + const queueLength = await getIndexInsertQueueLength(); + res.setHeader("Content-Type", "text/plain"); + res.send(`\ +# HELP firecrawl_index_queue_length The number of items in the index insert queue +# TYPE firecrawl_index_queue_length gauge +firecrawl_index_queue_length ${queueLength} +`); +} \ No newline at end of file diff --git a/apps/api/src/controllers/v1/credit-usage.ts b/apps/api/src/controllers/v1/credit-usage.ts index fc070b24..794861a5 100644 --- a/apps/api/src/controllers/v1/credit-usage.ts +++ b/apps/api/src/controllers/v1/credit-usage.ts @@ -2,6 +2,7 @@ import { Request, Response } from "express"; import { RequestWithAuth } from "./types"; import { getACUCTeam } from "../auth"; import { logger } from "../../lib/logger"; +import { RateLimiterMode } from "../../types"; export async function creditUsageController( req: RequestWithAuth, @@ -20,7 +21,7 @@ export async function creditUsageController( } // Otherwise fetch fresh data - const chunk = await getACUCTeam(req.auth.team_id); + const chunk = await getACUCTeam(req.auth.team_id, false, false, RateLimiterMode.Scrape); if (!chunk) { res.status(404).json({ success: false, diff --git a/apps/api/src/controllers/v1/map.ts b/apps/api/src/controllers/v1/map.ts index 15490385..a7a47d05 100644 --- a/apps/api/src/controllers/v1/map.ts +++ b/apps/api/src/controllers/v1/map.ts @@ -25,6 +25,7 @@ import { logger } from "../../lib/logger"; import Redis from "ioredis"; import { querySitemapIndex } from "../../scraper/WebScraper/sitemap-index"; import { getIndexQueue } from "../../services/queue-service"; +import { queryIndexAtSplitLevel } from "../../services/index"; configDotenv(); const redis = new Redis(process.env.REDIS_URL!); @@ -43,6 +44,14 @@ interface MapResult { mapResults: MapDocument[]; } +async function queryIndex(url: string, limit: number, useIndex: boolean): Promise { + if (!useIndex) { + return []; + } + + return await queryIndexAtSplitLevel(url, limit); +} + export async function getMapResults({ url, search, @@ -58,6 +67,7 @@ export async function getMapResults({ mock, filterByPath = true, flags, + useIndex = true, }: { url: string; search?: string; @@ -73,6 +83,7 @@ export async function getMapResults({ mock?: string; filterByPath?: boolean; flags: TeamFlags; + useIndex?: boolean; }): Promise { const id = uuidv4(); let links: string[] = [url]; @@ -165,11 +176,16 @@ export async function getMapResults({ } // Parallelize sitemap index query with search results - const [sitemapIndexResult, ...searchResults] = await Promise.all([ + const [sitemapIndexResult, indexResults, ...searchResults] = await Promise.all([ querySitemapIndex(url, abort), + queryIndex(url, limit, useIndex), ...(cachedResult ? [] : pagePromises), ]); + if (indexResults.length > 0) { + links.push(...indexResults); + } + const twoDaysAgo = new Date(); twoDaysAgo.setDate(twoDaysAgo.getDate() - 2); @@ -333,6 +349,7 @@ export async function mapController( mock: req.body.useMock, filterByPath: req.body.filterByPath !== false, flags: req.acuc?.flags ?? null, + useIndex: req.body.useIndex, }), ...(req.body.timeout !== undefined ? [ new Promise((resolve, reject) => setTimeout(() => { diff --git a/apps/api/src/controllers/v1/scrape.ts b/apps/api/src/controllers/v1/scrape.ts index 4b681438..a97d0e63 100644 --- a/apps/api/src/controllers/v1/scrape.ts +++ b/apps/api/src/controllers/v1/scrape.ts @@ -42,6 +42,8 @@ export async function scrapeController( }); // + const isDirectToBullMQ = process.env.SEARCH_PREVIEW_TOKEN !== undefined && process.env.SEARCH_PREVIEW_TOKEN === req.body.__searchPreviewToken; + await addScrapeJob( { url: req.body.url, @@ -52,6 +54,8 @@ export async function scrapeController( teamId: req.auth.team_id, saveScrapeResultToGCS: process.env.GCS_FIRE_ENGINE_BUCKET_NAME ? true : false, unnormalizedSourceURL: preNormalizedBody.url, + useCache: req.body.__experimental_cache ? true : false, + bypassBilling: isDirectToBullMQ, }, origin: req.body.origin, startTime, @@ -59,6 +63,7 @@ export async function scrapeController( {}, jobId, jobPriority, + isDirectToBullMQ, ); const totalWait = @@ -130,6 +135,7 @@ export async function scrapeController( } } + return res.status(200).json({ success: true, data: doc, diff --git a/apps/api/src/controllers/v1/search.ts b/apps/api/src/controllers/v1/search.ts index 68e426da..ceacce4c 100644 --- a/apps/api/src/controllers/v1/search.ts +++ b/apps/api/src/controllers/v1/search.ts @@ -40,24 +40,24 @@ export async function searchAndScrapeSearchResult( try { const searchResults = await search({ query, - num_results: 5 - }); + num_results: 5, + }); - const documents = await Promise.all( - searchResults.map(result => - scrapeSearchResult( - { - url: result.url, - title: result.title, - description: result.description - }, - options, - logger, - costTracking, - flags - ) - ) - ); + const documents = await Promise.all( + searchResults.map((result) => + scrapeSearchResult( + { + url: result.url, + title: result.title, + description: result.description, + }, + options, + logger, + costTracking, + flags, + ), + ), + ); return documents; } catch (error) { @@ -77,6 +77,7 @@ async function scrapeSearchResult( costTracking: CostTracking, flags: TeamFlags, directToBullMQ: boolean = false, + isSearchPreview: boolean = false, ): Promise { const jobId = uuidv4(); const jobPriority = await getJobPriority({ @@ -100,7 +101,7 @@ async function scrapeSearchResult( mode: "single_urls" as Mode, team_id: options.teamId, scrapeOptions: options.scrapeOptions, - internalOptions: { teamId: options.teamId, useCache: true }, + internalOptions: { teamId: options.teamId, useCache: true, bypassBilling: true }, origin: options.origin, is_scrape: true, startTime: Date.now(), @@ -112,7 +113,7 @@ async function scrapeSearchResult( ); const doc: Document = await waitForJob(jobId, options.timeout); - + logger.info("Scrape job completed", { scrapeId: jobId, url: searchResult.url, @@ -171,6 +172,7 @@ export async function searchController( }; const startTime = new Date().getTime(); const costTracking = new CostTracking(); + const isSearchPreview = process.env.SEARCH_PREVIEW_TOKEN !== undefined && process.env.SEARCH_PREVIEW_TOKEN === req.body.__searchPreviewToken; try { req.body = searchRequestSchema.parse(req.body); @@ -199,7 +201,9 @@ export async function searchController( }); if (req.body.ignoreInvalidURLs) { - searchResults = searchResults.filter((result) => !isUrlBlocked(result.url, req.acuc?.flags ?? null)); + searchResults = searchResults.filter( + (result) => !isUrlBlocked(result.url, req.acuc?.flags ?? null), + ); } logger.info("Searching completed", { @@ -226,12 +230,20 @@ export async function searchController( } else { logger.info("Scraping search results"); const scrapePromises = searchResults.map((result) => - scrapeSearchResult(result, { - teamId: req.auth.team_id, - origin: req.body.origin, - timeout: req.body.timeout, - scrapeOptions: req.body.scrapeOptions, - }, logger, costTracking, req.acuc?.flags ?? null, (req.acuc?.price_credits ?? 0) <= 3000), + scrapeSearchResult( + result, + { + teamId: req.auth.team_id, + origin: req.body.origin, + timeout: req.body.timeout, + scrapeOptions: req.body.scrapeOptions, + }, + logger, + costTracking, + req.acuc?.flags ?? null, + (req.acuc?.price_credits ?? 0) <= 3000, + isSearchPreview, + ), ); const docs = await Promise.all(scrapePromises); @@ -257,17 +269,23 @@ export async function searchController( } // Bill team once for all successful results - billTeam(req.auth.team_id, req.acuc?.sub_id, responseData.data.reduce((a,x) => { - if (x.metadata?.numPages !== undefined && x.metadata.numPages > 0) { - return a + x.metadata.numPages; - } else { - return a + 1; - } - }, 0)).catch((error) => { - logger.error( - `Failed to bill team ${req.auth.team_id} for ${responseData.data.length} credits: ${error}`, - ); - }); + if (!isSearchPreview) { + billTeam( + req.auth.team_id, + req.acuc?.sub_id, + responseData.data.reduce((a, x) => { + if (x.metadata?.numPages !== undefined && x.metadata.numPages > 0) { + return a + x.metadata.numPages; + } else { + return a + 1; + } + }, 0), + ).catch((error) => { + logger.error( + `Failed to bill team ${req.auth.team_id} for ${responseData.data.length} credits: ${error}`, + ); + }); + } const endTime = new Date().getTime(); const timeTakenInSeconds = (endTime - startTime) / 1000; @@ -277,22 +295,25 @@ export async function searchController( time_taken: timeTakenInSeconds, }); - logJob({ - job_id: jobId, - success: true, - num_docs: responseData.data.length, - docs: responseData.data, - time_taken: timeTakenInSeconds, - team_id: req.auth.team_id, - mode: "search", - url: req.body.query, - scrapeOptions: req.body.scrapeOptions, - origin: req.body.origin, - cost_tracking: costTracking, - }); + logJob( + { + job_id: jobId, + success: true, + num_docs: responseData.data.length, + docs: responseData.data, + time_taken: timeTakenInSeconds, + team_id: req.auth.team_id, + mode: "search", + url: req.body.query, + scrapeOptions: req.body.scrapeOptions, + origin: req.body.origin, + cost_tracking: costTracking, + }, + false, + isSearchPreview, + ); return res.status(200).json(responseData); - } catch (error) { if ( error instanceof Error && diff --git a/apps/api/src/controllers/v1/token-usage.ts b/apps/api/src/controllers/v1/token-usage.ts index 74c36289..1e4f6116 100644 --- a/apps/api/src/controllers/v1/token-usage.ts +++ b/apps/api/src/controllers/v1/token-usage.ts @@ -21,7 +21,7 @@ export async function tokenUsageController( } // Otherwise fetch fresh data - const chunk = await getACUCTeam(req.auth.team_id, false, true, RateLimiterMode.Extract); + const chunk = await getACUCTeam(req.auth.team_id, false, false, RateLimiterMode.Extract); if (!chunk) { res.status(404).json({ success: false, diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index fb4b72c8..c36821e4 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -309,6 +309,10 @@ const baseScrapeOptions = z useMock: z.string().optional(), blockAds: z.boolean().default(true), proxy: z.enum(["basic", "stealth", "auto"]).optional(), + maxAge: z.number().int().gte(0).safe().default(0), + storeInCache: z.boolean().default(true), + __experimental_cache: z.boolean().default(false).optional(), + __searchPreviewToken: z.string().optional(), }) .strict(strictMessage); @@ -656,6 +660,7 @@ export const mapRequestSchema = crawlerOptions timeout: z.number().positive().finite().optional(), useMock: z.string().optional(), filterByPath: z.boolean().default(true), + useIndex: z.boolean().default(true), }) .strict(strictMessage); @@ -752,6 +757,8 @@ export type Document = { numPages?: number; contentType?: string; proxyUsed: "basic" | "stealth"; + cacheState?: "hit" | "miss"; + cachedAt?: string; // [key: string]: string | string[] | number | { smartScrape: number; other: number; total: number } | undefined; }; serpResults?: { @@ -1198,6 +1205,7 @@ export const searchRequestSchema = z origin: z.string().optional().default("api"), timeout: z.number().int().positive().finite().safe().default(60000), ignoreInvalidURLs: z.boolean().optional().default(false), + __searchPreviewToken: z.string().optional(), scrapeOptions: baseScrapeOptions .extend({ formats: z diff --git a/apps/api/src/lib/deep-research/deep-research-service.ts b/apps/api/src/lib/deep-research/deep-research-service.ts index 36d83eda..4c788fe7 100644 --- a/apps/api/src/lib/deep-research/deep-research-service.ts +++ b/apps/api/src/lib/deep-research/deep-research-service.ts @@ -131,6 +131,9 @@ export async function performDeepResearch(options: DeepResearchServiceOptions) { removeBase64Images: false, fastMode: false, blockAds: false, + maxAge: 0, + storeInCache: true, + __experimental_cache: true, }, }, logger, costTracking, acuc?.flags ?? null); return response.length > 0 ? response : []; diff --git a/apps/api/src/routes/admin.ts b/apps/api/src/routes/admin.ts index 27545486..ea1fcf0f 100644 --- a/apps/api/src/routes/admin.ts +++ b/apps/api/src/routes/admin.ts @@ -10,6 +10,7 @@ import { wrap } from "./v1"; import { acucCacheClearController } from "../controllers/v0/admin/acuc-cache-clear"; import { checkFireEngine } from "../controllers/v0/admin/check-fire-engine"; import { cclogController } from "../controllers/v0/admin/cclog"; +import { indexQueuePrometheus } from "../controllers/v0/admin/index-queue-prometheus"; export const adminRouter = express.Router(); @@ -49,3 +50,8 @@ adminRouter.get( `/admin/${process.env.BULL_AUTH_KEY}/cclog`, wrap(cclogController), ); + +adminRouter.get( + `/admin/${process.env.BULL_AUTH_KEY}/index-queue-prometheus`, + wrap(indexQueuePrometheus), +); diff --git a/apps/api/src/scraper/scrapeURL/engines/cache/index.ts b/apps/api/src/scraper/scrapeURL/engines/cache/index.ts index ec1db99c..cdff5134 100644 --- a/apps/api/src/scraper/scrapeURL/engines/cache/index.ts +++ b/apps/api/src/scraper/scrapeURL/engines/cache/index.ts @@ -1,14 +1,14 @@ import { cacheKey, getEntryFromCache } from "../../../../lib/cache"; import { EngineScrapeResult } from ".."; import { Meta } from "../.."; -import { EngineError } from "../../error"; +import { EngineError, IndexMissError } from "../../error"; export async function scrapeCache(meta: Meta): Promise { const key = cacheKey(meta.url, meta.options, meta.internalOptions); if (key === null) throw new EngineError("Scrape not eligible for caching"); const entry = await getEntryFromCache(key); - if (entry === null) throw new EngineError("Cache missed"); + if (entry === null) throw new IndexMissError(); if (!entry.html) { throw new EngineError("Cache hit but HTML is missing"); diff --git a/apps/api/src/scraper/scrapeURL/engines/index.ts b/apps/api/src/scraper/scrapeURL/engines/index.ts index c690ac7b..68468258 100644 --- a/apps/api/src/scraper/scrapeURL/engines/index.ts +++ b/apps/api/src/scraper/scrapeURL/engines/index.ts @@ -10,6 +10,8 @@ import { scrapePDF } from "./pdf"; import { scrapeURLWithFetch } from "./fetch"; import { scrapeURLWithPlaywright } from "./playwright"; import { scrapeCache } from "./cache"; +import { scrapeURLWithIndex } from "./index/index"; +import { useIndex } from "../../../services"; export type Engine = | "fire-engine;chrome-cdp" @@ -24,7 +26,9 @@ export type Engine = | "fetch" | "pdf" | "docx" - | "cache"; + | "cache" + | "index" + | "index;documents"; const useFireEngine = process.env.FIRE_ENGINE_BETA_URL !== "" && @@ -38,6 +42,7 @@ const useCache = export const engines: Engine[] = [ ...(useCache ? ["cache" as const] : []), + ...(useIndex ? ["index" as const, "index;documents" as const] : []), ...(useFireEngine ? [ "fire-engine;chrome-cdp" as const, @@ -114,6 +119,10 @@ export type EngineScrapeResult = { numPages?: number; + cacheInfo?: { + created_at: Date; + }; + contentType?: string; }; @@ -124,6 +133,8 @@ const engineHandlers: { ) => Promise; } = { cache: scrapeCache, + index: scrapeURLWithIndex, + "index;documents": scrapeURLWithIndex, "fire-engine;chrome-cdp": scrapeURLWithFireEngineChromeCDP, "fire-engine(retry);chrome-cdp": scrapeURLWithFireEngineChromeCDP, "fire-engine;chrome-cdp;stealth": scrapeURLWithFireEngineChromeCDP, @@ -166,6 +177,24 @@ export const engineOptions: { }, quality: 1000, // cache should always be tried first }, + index: { + features: { + actions: false, + waitFor: true, + screenshot: true, + "screenshot@fullScreen": true, + pdf: false, + docx: false, + atsv: false, + mobile: true, + location: true, + skipTlsVerification: true, + useFastMode: true, + stealthProxy: false, + disableAdblock: false, + }, + quality: 999, // index should always be tried second ? - MG + }, "fire-engine;chrome-cdp": { features: { actions: true, @@ -202,6 +231,24 @@ export const engineOptions: { }, quality: 45, }, + "index;documents": { + features: { + actions: false, + waitFor: true, + screenshot: true, + "screenshot@fullScreen": true, + pdf: true, + docx: true, + atsv: false, + location: true, + mobile: true, + skipTlsVerification: true, + useFastMode: true, + stealthProxy: false, + disableAdblock: false, + }, + quality: -1, + }, "fire-engine;chrome-cdp;stealth": { features: { actions: true, @@ -218,7 +265,7 @@ export const engineOptions: { stealthProxy: true, disableAdblock: false, }, - quality: -1, + quality: -2, }, "fire-engine(retry);chrome-cdp;stealth": { features: { @@ -401,6 +448,41 @@ export function buildFallbackList(meta: Meta): { _engines.splice(cacheIndex, 1); } } + + const shouldUseIndex = + useIndex + && process.env.FIRECRAWL_INDEX_WRITE_ONLY !== "true" + && !meta.options.formats.includes("changeTracking") + && meta.options.maxAge !== 0 + && ( + meta.options.headers === undefined + || Object.keys(meta.options.headers).length === 0 + ) + && ( + meta.options.actions === undefined + || meta.options.actions.length === 0 + ) + && meta.options.proxy !== "stealth"; + + meta.logger.warn("shouldUseIndex", { + shouldUseIndex, + formatsNoChangeTracking: !meta.options.formats.includes("changeTracking"), + maxAge: meta.options.maxAge !== 0, + headers: meta.options.headers === undefined || Object.keys(meta.options.headers).length === 0, + actions: meta.options.actions === undefined || meta.options.actions.length === 0, + proxy: meta.options.proxy !== "stealth", + }); + + if (!shouldUseIndex) { + const indexIndex = _engines.indexOf("index"); + if (indexIndex !== -1) { + _engines.splice(indexIndex, 1); + } + const indexDocumentsIndex = _engines.indexOf("index;documents"); + if (indexDocumentsIndex !== -1) { + _engines.splice(indexDocumentsIndex, 1); + } + } const prioritySum = [...meta.featureFlags].reduce( (a, x) => a + featureFlagOptions[x].priority, diff --git a/apps/api/src/scraper/scrapeURL/engines/index/index.ts b/apps/api/src/scraper/scrapeURL/engines/index/index.ts new file mode 100644 index 00000000..8231f78d --- /dev/null +++ b/apps/api/src/scraper/scrapeURL/engines/index/index.ts @@ -0,0 +1,165 @@ +import { Document } from "../../../../controllers/v1/types"; +import { EngineScrapeResult } from ".."; +import { Meta } from "../.."; +import { getIndexFromGCS, hashURL, index_supabase_service, normalizeURLForIndex, saveIndexToGCS, generateURLSplits, addIndexInsertJob } from "../../../../services"; +import { EngineError, IndexMissError } from "../../error"; +import crypto from "crypto"; + +export async function sendDocumentToIndex(meta: Meta, document: Document) { + const shouldCache = meta.options.storeInCache + && meta.winnerEngine !== "cache" + && meta.winnerEngine !== "index" + && meta.winnerEngine !== "index;documents" + && !meta.featureFlags.has("actions") + && ( + meta.options.headers === undefined + || Object.keys(meta.options.headers).length === 0 + ); + + if (!shouldCache) { + return document; + } + + (async () => { + try { + const normalizedURL = normalizeURLForIndex(meta.url); + const urlHash = await hashURL(normalizedURL); + + const urlSplits = generateURLSplits(normalizedURL); + const urlSplitsHash = await Promise.all(urlSplits.map(split => hashURL(split))); + + const indexId = crypto.randomUUID(); + + try { + await saveIndexToGCS(indexId, { + url: normalizedURL, + html: document.rawHtml!, + statusCode: document.metadata.statusCode, + error: document.metadata.error, + screenshot: document.screenshot, + numPages: document.metadata.numPages, + }); + } catch (error) { + meta.logger.error("Failed to save document to index", { + error, + }); + return document; + } + + try { + await addIndexInsertJob({ + id: indexId, + url: normalizedURL, + url_hash: urlHash, + url_splits: urlSplits, + url_splits_hash: urlSplitsHash, + original_url: document.metadata.sourceURL ?? meta.url, + resolved_url: document.metadata.url ?? document.metadata.sourceURL ?? meta.url, + has_screenshot: document.screenshot !== undefined && meta.featureFlags.has("screenshot"), + has_screenshot_fullscreen: document.screenshot !== undefined && meta.featureFlags.has("screenshot@fullScreen"), + is_mobile: meta.options.mobile, + block_ads: meta.options.blockAds, + location_country: meta.options.location?.country ?? null, + location_languages: meta.options.location?.languages ?? null, + status: document.metadata.statusCode, + ...(urlSplitsHash.slice(0, 10).reduce((a,x,i) => ({ + ...a, + [`url_split_${i}_hash`]: x, + }), {})), + }); + } catch (error) { + meta.logger.error("Failed to add document to index insert queue", { + error, + }); + } + } catch (error) { + meta.logger.error("Failed to save document to index (outer)", { + error, + }); + } + })(); + + return document; +} + +const errorCountToRegister = 3; + +export async function scrapeURLWithIndex(meta: Meta): Promise { + const normalizedURL = normalizeURLForIndex(meta.url); + const urlHash = await hashURL(normalizedURL); + + let selector = index_supabase_service + .from("index") + .select("id, created_at, status") + .eq("url_hash", urlHash) + .gte("created_at", new Date(Date.now() - meta.options.maxAge).toISOString()) + .eq("is_mobile", meta.options.mobile) + .eq("block_ads", meta.options.blockAds); + + if (meta.featureFlags.has("screenshot")) { + selector = selector.eq("has_screenshot", true); + } + if (meta.featureFlags.has("screenshot@fullScreen")) { + selector = selector.eq("has_screenshot_fullscreen", true); + } + if (meta.options.location?.country) { + selector = selector.eq("location_country", meta.options.location.country); + } else { + selector = selector.is("location_country", null); + } + if (meta.options.location?.languages) { + selector = selector.eq("location_languages", meta.options.location.languages); + } else { + selector = selector.is("location_languages", null); + } + + const { data, error } = await selector + .order("created_at", { ascending: false }) + .limit(5); + + if (error) { + throw new EngineError("Failed to retrieve URL from DB index", { + cause: error, + }); + } + + let selectedRow: { + id: string; + created_at: string; + status: number; + } | null = null; + + if (data.length > 0) { + const newest200Index = data.findIndex(x => x.status >= 200 && x.status < 300); + // If the newest 200 index is further back than the allowed error count, we should display the errored index entry + if (newest200Index >= errorCountToRegister || newest200Index === -1) { + selectedRow = data[0]; + } else { + selectedRow = data[newest200Index]; + } + } + + if (selectedRow === null || selectedRow === undefined) { + throw new IndexMissError(); + } + + const id = data[0].id; + + const doc = await getIndexFromGCS(id + ".json"); + if (!doc) { + throw new EngineError("Document not found in GCS"); + } + + return { + url: doc.url, + html: doc.html, + statusCode: doc.statusCode, + error: doc.error, + screenshot: doc.screenshot, + numPages: doc.numPages, + + cacheInfo: { + created_at: new Date(data[0].created_at), + } + }; +} diff --git a/apps/api/src/scraper/scrapeURL/error.ts b/apps/api/src/scraper/scrapeURL/error.ts index 33f59c1d..16a9212f 100644 --- a/apps/api/src/scraper/scrapeURL/error.ts +++ b/apps/api/src/scraper/scrapeURL/error.ts @@ -92,3 +92,9 @@ export class PDFInsufficientTimeError extends Error { super(`Insufficient time to process PDF of ${pageCount} pages. Please increase the timeout parameter in your scrape request to at least ${minTimeout}ms.`); } } + +export class IndexMissError extends Error { + constructor() { + super("Index doesn't have the page we're looking for"); + } +} diff --git a/apps/api/src/scraper/scrapeURL/index.ts b/apps/api/src/scraper/scrapeURL/index.ts index 2f926a96..7fc043b7 100644 --- a/apps/api/src/scraper/scrapeURL/index.ts +++ b/apps/api/src/scraper/scrapeURL/index.ts @@ -23,6 +23,7 @@ import { UnsupportedFileError, SSLError, PDFInsufficientTimeError, + IndexMissError, } from "./error"; import { executeTransformers } from "./transformers"; import { LLMRefusalError } from "./transformers/llmExtract"; @@ -59,6 +60,7 @@ export type Meta = { status: number; } | null | undefined; // undefined: no prefetch yet, null: prefetch came back empty costTracking: CostTracking; + winnerEngine?: Engine; }; function buildFeatureFlags( @@ -189,6 +191,7 @@ export type InternalOptions = { unnormalizedSourceURL?: string; saveScrapeResultToGCS?: boolean; // Passed along to fire-engine + bypassBilling?: boolean; }; export type EngineResultsTracker = { @@ -295,11 +298,23 @@ async function scrapeURLLoop(meta: Meta): Promise { unsupportedFeatures, result: engineResult as EngineScrapeResult & { markdown: string }, }; + meta.winnerEngine = engine; break; } } catch (error) { if (error instanceof EngineError) { - meta.logger.info("Engine " + engine + " could not scrape the page.", { + meta.logger.warn("Engine " + engine + " could not scrape the page.", { + error, + }); + results[engine] = { + state: "error", + error: safeguardCircularError(error), + unexpected: false, + startedAt, + finishedAt: Date.now(), + }; + } else if (error instanceof IndexMissError) { + meta.logger.info("Engine " + engine + " could not find the page in the index.", { error, }); results[engine] = { @@ -385,6 +400,14 @@ async function scrapeURLLoop(meta: Meta): Promise { numPages: result.result.numPages, contentType: result.result.contentType, proxyUsed: meta.featureFlags.has("stealthProxy") ? "stealth" : "basic", + ...(results["index"] ? ( + result.result.cacheInfo ? { + cacheState: "hit", + cachedAt: result.result.cacheInfo.created_at.toISOString(), + } : { + cacheState: "miss", + } + ) : {}) }, }; diff --git a/apps/api/src/scraper/scrapeURL/lib/extractMetadata.ts b/apps/api/src/scraper/scrapeURL/lib/extractMetadata.ts index 61d5ab04..20a6dd7b 100644 --- a/apps/api/src/scraper/scrapeURL/lib/extractMetadata.ts +++ b/apps/api/src/scraper/scrapeURL/lib/extractMetadata.ts @@ -133,7 +133,7 @@ export async function extractMetadata( // Extract all meta tags for custom metadata soup("meta").each((i, elem) => { try { - const name = soup(elem).attr("name") || soup(elem).attr("property"); + const name = soup(elem).attr("name") || soup(elem).attr("property") || soup(elem).attr("itemprop"); const content = soup(elem).attr("content"); if (name && content) { diff --git a/apps/api/src/scraper/scrapeURL/transformers/index.ts b/apps/api/src/scraper/scrapeURL/transformers/index.ts index 5186d6b4..d91a9c36 100644 --- a/apps/api/src/scraper/scrapeURL/transformers/index.ts +++ b/apps/api/src/scraper/scrapeURL/transformers/index.ts @@ -11,6 +11,9 @@ import { saveToCache } from "./cache"; import { performAgent } from "./agent"; import { deriveDiff } from "./diff"; +import { useIndex } from "../../../services/index"; +import { sendDocumentToIndex } from "../engines/index/index"; + export type Transformer = ( meta: Meta, document: Document, @@ -205,6 +208,7 @@ export const transformerStack: Transformer[] = [ deriveLinksFromHTML, deriveMetadataFromRawHTML, uploadScreenshot, + ...(useIndex ? [sendDocumentToIndex] : []), performLLMExtract, performAgent, deriveDiff, diff --git a/apps/api/src/services/index.ts b/apps/api/src/services/index.ts new file mode 100644 index 00000000..df7969de --- /dev/null +++ b/apps/api/src/services/index.ts @@ -0,0 +1,241 @@ +import { createClient, SupabaseClient } from "@supabase/supabase-js"; +import { logger } from "../lib/logger"; +import { configDotenv } from "dotenv"; +import { Storage } from "@google-cloud/storage"; +import crypto from "crypto"; +import { redisEvictConnection } from "./redis"; +configDotenv(); + +// SupabaseService class initializes the Supabase client conditionally based on environment variables. +class IndexSupabaseService { + private client: SupabaseClient | null = null; + + constructor() { + const supabaseUrl = process.env.INDEX_SUPABASE_URL; + const supabaseServiceToken = process.env.INDEX_SUPABASE_SERVICE_TOKEN; + // Only initialize the Supabase client if both URL and Service Token are provided. + if (!supabaseUrl || !supabaseServiceToken) { + // Warn the user that Authentication is disabled by setting the client to null + logger.warn( + "Index supabase client will not be initialized.", + ); + this.client = null; + } else { + this.client = createClient(supabaseUrl, supabaseServiceToken); + } + } + + // Provides access to the initialized Supabase client, if available. + getClient(): SupabaseClient | null { + return this.client; + } +} + +const serv = new IndexSupabaseService(); + +// Using a Proxy to handle dynamic access to the Supabase client or service methods. +// This approach ensures that if Supabase is not configured, any attempt to use it will result in a clear error. +export const index_supabase_service: SupabaseClient = new Proxy( + serv, + { + get: function (target, prop, receiver) { + const client = target.getClient(); + // If the Supabase client is not initialized, intercept property access to provide meaningful error feedback. + if (client === null) { + return () => { + throw new Error("Index supabase client is not configured."); + }; + } + // Direct access to SupabaseService properties takes precedence. + if (prop in target) { + return Reflect.get(target, prop, receiver); + } + // Otherwise, delegate access to the Supabase client. + return Reflect.get(client, prop, receiver); + }, + }, +) as unknown as SupabaseClient; + +const credentials = process.env.GCS_CREDENTIALS ? JSON.parse(atob(process.env.GCS_CREDENTIALS)) : undefined; + +export async function getIndexFromGCS(url: string): Promise { + // logger.info(`Getting f-engine document from GCS`, { + // url, + // }); + try { + if (!process.env.GCS_INDEX_BUCKET_NAME) { + return null; + } + + const storage = new Storage({ credentials }); + const bucket = storage.bucket(process.env.GCS_INDEX_BUCKET_NAME); + const blob = bucket.file(`${url}`); + const [exists] = await blob.exists(); + if (!exists) { + return null; + } + const [blobContent] = await blob.download(); + const parsed = JSON.parse(blobContent.toString()); + return parsed; + } catch (error) { + logger.error(`Error getting f-engine document from GCS`, { + error, + url, + }); + return null; + } +} + + +export async function saveIndexToGCS(id: string, doc: { + url: string; + html: string; + statusCode: number; + error?: string; + screenshot?: string; + numPages?: number; +}): Promise { + try { + if (!process.env.GCS_INDEX_BUCKET_NAME) { + return; + } + + const storage = new Storage({ credentials }); + const bucket = storage.bucket(process.env.GCS_INDEX_BUCKET_NAME); + const blob = bucket.file(`${id}.json`); + for (let i = 0; i < 3; i++) { + try { + await blob.save(JSON.stringify(doc), { + contentType: "application/json", + }); + break; + } catch (error) { + if (i === 2) { + throw error; + } else { + logger.error(`Error saving index document to GCS, retrying`, { + error, + indexId: id, + i, + }); + } + } + } + } catch (error) { + throw new Error("Error saving index document to GCS", { + cause: error, + }); + } +} + +export const useIndex = + process.env.INDEX_SUPABASE_URL !== "" && + process.env.INDEX_SUPABASE_URL !== undefined; + +export function normalizeURLForIndex(url: string): string { + const urlObj = new URL(url); + urlObj.hash = ""; + urlObj.protocol = "https"; + + if (urlObj.port === "80" || urlObj.port === "443") { + urlObj.port = ""; + } + + if (urlObj.hostname.startsWith("www.")) { + urlObj.hostname = urlObj.hostname.slice(4); + } + + if (urlObj.pathname.endsWith("/index.html")) { + urlObj.pathname = urlObj.pathname.slice(0, -10); + } else if (urlObj.pathname.endsWith("/index.php")) { + urlObj.pathname = urlObj.pathname.slice(0, -9); + } else if (urlObj.pathname.endsWith("/index.htm")) { + urlObj.pathname = urlObj.pathname.slice(0, -9); + } else if (urlObj.pathname.endsWith("/index.shtml")) { + urlObj.pathname = urlObj.pathname.slice(0, -11); + } else if (urlObj.pathname.endsWith("/index.xml")) { + urlObj.pathname = urlObj.pathname.slice(0, -9); + } + + if (urlObj.pathname.endsWith("/")) { + urlObj.pathname = urlObj.pathname.slice(0, -1); + } + + return urlObj.toString(); +} + +export async function hashURL(url: string): Promise { + return "\\x" + crypto.createHash("sha256").update(url).digest("hex"); +} + +export function generateURLSplits(url: string): string[] { + const urls: string[] = []; + const urlObj = new URL(url); + urlObj.hash = ""; + urlObj.search = ""; + const pathnameParts = urlObj.pathname.split("/"); + + for (let i = 0; i <= pathnameParts.length; i++) { + urlObj.pathname = pathnameParts.slice(0, i).join("/"); + urls.push(urlObj.href); + } + + urls.push(url); + + return [...new Set(urls.map(x => normalizeURLForIndex(x)))]; +} + +const INDEX_INSERT_QUEUE_KEY = "index-insert-queue"; +const INDEX_INSERT_BATCH_SIZE = 1000; + +export async function addIndexInsertJob(data: any) { + await redisEvictConnection.rpush(INDEX_INSERT_QUEUE_KEY, JSON.stringify(data)); +} + +export async function getIndexInsertJobs(): Promise { + const jobs = (await redisEvictConnection.lpop(INDEX_INSERT_QUEUE_KEY, INDEX_INSERT_BATCH_SIZE)) ?? []; + return jobs.map(x => JSON.parse(x)); +} + +export async function processIndexInsertJobs() { + const jobs = await getIndexInsertJobs(); + if (jobs.length === 0) { + return; + } + logger.info(`Index inserter found jobs to insert`, { jobCount: jobs.length }); + try { + await index_supabase_service.from("index").insert(jobs); + logger.info(`Index inserter inserted jobs`, { jobCount: jobs.length }); + } catch (error) { + logger.error(`Index inserter failed to insert jobs`, { error, jobCount: jobs.length }); + } +} + +export async function getIndexInsertQueueLength(): Promise { + return await redisEvictConnection.llen(INDEX_INSERT_QUEUE_KEY) ?? 0; +} + +export async function queryIndexAtSplitLevel(url: string, limit: number): Promise { + if (!useIndex || process.env.FIRECRAWL_INDEX_WRITE_ONLY === "true") { + return []; + } + + const urlObj = new URL(url); + urlObj.search = ""; + + const urlSplitsHash = generateURLSplits(urlObj.href).map(x => hashURL(x)); + + const { data, error } = await index_supabase_service + .from("index") + .select("resolved_url") + .eq("url_split_" + (urlSplitsHash.length - 1) + "_hash", urlSplitsHash[urlSplitsHash.length - 1]) + .gte("created_at", new Date(Date.now() - 2 * 24 * 60 * 60 * 1000).toISOString()) + .limit(limit) + + if (error) { + logger.warn("Error querying index", { error, url, limit }); + return []; + } + + return [...new Set((data ?? []).map((x) => x.resolved_url))]; +} diff --git a/apps/api/src/services/indexing/index-worker.ts b/apps/api/src/services/indexing/index-worker.ts index 30285978..677d33fa 100644 --- a/apps/api/src/services/indexing/index-worker.ts +++ b/apps/api/src/services/indexing/index-worker.ts @@ -14,6 +14,7 @@ import { saveCrawlMap } from "./crawl-maps-index"; import { processBillingBatch, queueBillingOperation, startBillingBatchProcessing } from "../billing/batch_billing"; import systemMonitor from "../system-monitor"; import { v4 as uuidv4 } from "uuid"; +import { processIndexInsertJobs } from ".."; const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000; const workerStalledCheckInterval = @@ -226,6 +227,8 @@ const workerFun = async (queue: Queue, jobProcessor: (token: string, job: Job) = process.exit(0); }; +const INDEX_INSERT_INTERVAL = 15000; + // Start the workers (async () => { // Start index worker @@ -234,7 +237,17 @@ const workerFun = async (queue: Queue, jobProcessor: (token: string, job: Job) = // Start billing worker and batch processing startBillingBatchProcessing(); const billingWorkerPromise = workerFun(getBillingQueue(), processBillingJobInternal); - + + const indexInserterInterval = setInterval(async () => { + if (isShuttingDown) { + return; + } + + await processIndexInsertJobs(); + }, INDEX_INSERT_INTERVAL); + // Wait for both workers to complete (which should only happen on shutdown) await Promise.all([indexWorkerPromise, billingWorkerPromise]); + + clearInterval(indexInserterInterval); })(); diff --git a/apps/api/src/services/logging/log_job.ts b/apps/api/src/services/logging/log_job.ts index efa6b158..262d4015 100644 --- a/apps/api/src/services/logging/log_job.ts +++ b/apps/api/src/services/logging/log_job.ts @@ -21,12 +21,13 @@ function cleanOfNull(x: T): T { } } -export async function logJob(job: FirecrawlJob, force: boolean = false) { +export async function logJob(job: FirecrawlJob, force: boolean = false, bypassLogging: boolean = false) { try { const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true"; if (!useDbAuthentication) { return; } + // Redact any pages that have an authorization header // actually, Don't. we use the db to retrieve results now. this breaks authed crawls - mogery @@ -70,6 +71,10 @@ export async function logJob(job: FirecrawlJob, force: boolean = false) { await saveJobToGCS(job); } + if (bypassLogging) { + return; + } + if (force) { let i = 0, done = false; diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 3ea7de22..c368d121 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -194,7 +194,7 @@ export async function addScrapeJob( }, ); } else { - await addScrapeJobRaw(webScraperOptions, options, jobId, jobPriority); + await addScrapeJobRaw(webScraperOptions, options, jobId, jobPriority, directToBullMQ); } } diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index afc93b6a..2a1bdf66 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -86,6 +86,8 @@ import { robustFetch } from "../scraper/scrapeURL/lib/fetch"; import { RateLimiterMode } from "../types"; import { calculateCreditsToBeBilled } from "../lib/scrape-billing"; import { redisEvictConnection } from "./redis"; +import { generateURLSplits, queryIndexAtSplitLevel } from "./index"; +import { WebCrawler } from "../scraper/WebScraper/crawler"; import type { Logger } from "winston"; configDotenv(); @@ -319,7 +321,7 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) { scrapeOptions: sc.scrapeOptions, crawlerOptions: sc.crawlerOptions, origin: job.data.origin, - }); + }, false, job.data.internalOptions?.bypassBilling ?? false); logger.info("Logged crawl!"); const data = { @@ -371,8 +373,10 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) { origin: job.data.origin, }, true, + job.data.internalOptions?.bypassBilling ?? false, ); + // v1 web hooks, call when done with no data, but with event completed if (job.data.v1 && job.data.webhook) { callWebhook( @@ -911,6 +915,29 @@ const workerFun = async ( } }; +async function kickoffGetIndexLinks(sc: StoredCrawl, crawler: WebCrawler, url: string) { + if (sc.crawlerOptions.ignoreSitemap) { + return []; + } + + const trimmedURL = new URL(url); + trimmedURL.search = ""; + + const index = await queryIndexAtSplitLevel( + sc.crawlerOptions.allowBackwardCrawling ? generateURLSplits(trimmedURL.href)[0] : trimmedURL.href, + sc.crawlerOptions.limit ?? 100, + ); + + const validIndexLinks = crawler.filterLinks( + index.filter(x => crawler.filterURL(x, trimmedURL.href) !== null), + sc.crawlerOptions.limit ?? 100, + sc.crawlerOptions.maxDepth ?? 10, + false, + ); + + return validIndexLinks; +} + async function processKickoffJob(job: Job & { id: string }, token: string) { const logger = _logger.child({ module: "queue-worker", @@ -1028,6 +1055,61 @@ async function processKickoffJob(job: Job & { id: string }, token: string) { }); } + const indexLinks = await kickoffGetIndexLinks(sc, crawler, job.data.url); + + if (indexLinks.length > 0) { + logger.debug("Using index links of length " + indexLinks.length, { + indexLinksLength: indexLinks.length, + }); + + let jobPriority = await getJobPriority({ + team_id: job.data.team_id, + basePriority: 21, + }); + logger.debug("Using job priority " + jobPriority, { jobPriority }); + + const jobs = indexLinks.map((url) => { + const uuid = uuidv4(); + return { + name: uuid, + data: { + url, + mode: "single_urls" as const, + team_id: job.data.team_id, + crawlerOptions: job.data.crawlerOptions, + scrapeOptions: job.data.scrapeOptions, + internalOptions: sc.internalOptions, + origin: job.data.origin, + crawl_id: job.data.crawl_id, + sitemapped: true, + webhook: job.data.webhook, + v1: job.data.v1, + }, + opts: { + jobId: uuid, + priority: 20, + }, + }; + }); + + logger.debug("Locking URLs..."); + const lockedIds = await lockURLsIndividually( + job.data.crawl_id, + sc, + jobs.map((x) => ({ id: x.opts.jobId, url: x.data.url })), + ); + const lockedJobs = jobs.filter((x) => + lockedIds.find((y) => y.id === x.opts.jobId), + ); + logger.debug("Adding scrape jobs to Redis..."); + await addCrawlJobs( + job.data.crawl_id, + lockedJobs.map((x) => x.opts.jobId), + ); + logger.debug("Adding scrape jobs to BullMQ..."); + await addScrapeJobs(lockedJobs); + } + logger.debug("Done queueing jobs!"); await finishCrawlKickoff(job.data.crawl_id); @@ -1048,7 +1130,7 @@ async function processKickoffJob(job: Job & { id: string }, token: string) { async function billScrapeJob(job: Job & { id: string }, document: Document, logger: Logger, costTracking?: CostTracking) { let creditsToBeBilled: number | null = null; - if (job.data.is_scrape !== true) { + if (job.data.is_scrape !== true && !job.data.internalOptions?.bypassBilling) { creditsToBeBilled = await calculateCreditsToBeBilled(job.data.scrapeOptions, document, job.id, costTracking); if ( @@ -1378,6 +1460,7 @@ async function processJob(job: Job & { id: string }, token: string) { credits_billed, }, true, + job.data.internalOptions?.bypassBilling ?? false, ); if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) { @@ -1424,7 +1507,7 @@ async function processJob(job: Job & { id: string }, token: string) { cost_tracking: costTracking, pdf_num_pages: doc.metadata.numPages, credits_billed, - }); + }, false, job.data.internalOptions?.bypassBilling ?? false); } logger.info(`🐂 Job done ${job.id}`); @@ -1523,6 +1606,7 @@ async function processJob(job: Job & { id: string }, token: string) { cost_tracking: costTracking, }, true, + job.data.internalOptions?.bypassBilling ?? false, ); return data; } diff --git a/apps/js-sdk/firecrawl/package.json b/apps/js-sdk/firecrawl/package.json index 54851cd4..47f8c3db 100644 --- a/apps/js-sdk/firecrawl/package.json +++ b/apps/js-sdk/firecrawl/package.json @@ -1,6 +1,6 @@ { "name": "@mendable/firecrawl-js", - "version": "1.25.2", + "version": "1.25.3", "description": "JavaScript SDK for Firecrawl API", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/apps/js-sdk/firecrawl/src/index.ts b/apps/js-sdk/firecrawl/src/index.ts index 265bd6c5..86e5a91c 100644 --- a/apps/js-sdk/firecrawl/src/index.ts +++ b/apps/js-sdk/firecrawl/src/index.ts @@ -120,6 +120,7 @@ export interface CrawlScrapeOptions { removeBase64Images?: boolean; blockAds?: boolean; proxy?: "basic" | "stealth" | "auto"; + storeInCache?: boolean; } export type Action = { diff --git a/docker-compose.yaml b/docker-compose.yaml index d3cbb3ee..f218e990 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -46,6 +46,9 @@ x-common-env: &common-env services: playwright-service: + # NOTE: If you don't want to build the service locally, + # uncomment the build: statement and comment out the image: statement + # image: ghcr.io/mendableai/playwright-service:latest build: apps/playwright-service-ts environment: PORT: 3000