From a3145ccacc80e999c987e83143dc0f4c0f492960 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Fri, 23 May 2025 22:33:09 +0200 Subject: [PATCH 01/12] fix(extract-status): be able to get extract status even after TTL lapses (#1599) --- apps/api/src/controllers/v1/crawl-status.ts | 3 +- apps/api/src/controllers/v1/extract-status.ts | 43 +++++++++++++------ 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/apps/api/src/controllers/v1/crawl-status.ts b/apps/api/src/controllers/v1/crawl-status.ts index b1f97471..4d996935 100644 --- a/apps/api/src/controllers/v1/crawl-status.ts +++ b/apps/api/src/controllers/v1/crawl-status.ts @@ -33,11 +33,12 @@ export type PseudoJob = { timestamp: number, data: { scrapeOptions: any, + teamId?: string, }, failedReason?: string, } -export type DBJob = { docs: any, success: boolean, page_options: any, date_added: any, message: string | null } +export type DBJob = { docs: any, success: boolean, page_options: any, date_added: any, message: string | null, team_id: string} export async function getJob(id: string): Promise | null> { const [bullJob, dbJob, gcsJob] = await Promise.all([ diff --git a/apps/api/src/controllers/v1/extract-status.ts b/apps/api/src/controllers/v1/extract-status.ts index 76a611f8..1795363e 100644 --- a/apps/api/src/controllers/v1/extract-status.ts +++ b/apps/api/src/controllers/v1/extract-status.ts @@ -18,10 +18,11 @@ export async function getExtractJob(id: string): Promise = { id, - getState: bullJob ? bullJob.getState : (() => dbJob!.success ? "completed" : "failed"), + getState: bullJob ? bullJob.getState.bind(bullJob) : (() => dbJob!.success ? "completed" : "failed"), returnvalue: data, data: { scrapeOptions: bullJob ? bullJob.data.scrapeOptions : dbJob!.page_options, + teamId: bullJob ? bullJob.data.teamId : dbJob!.team_id, }, timestamp: bullJob ? bullJob.timestamp : new Date(dbJob!.date_added).valueOf(), failedReason: (bullJob ? bullJob.failedReason : dbJob!.message) || undefined, @@ -36,7 +37,9 @@ export async function extractStatusController( ) { const extract = await getExtract(req.params.jobId); - if (!extract) { + let status = extract?.status; + + if (extract && extract.team_id !== req.auth.team_id) { return res.status(404).json({ success: false, error: "Extract job not found", @@ -45,34 +48,46 @@ export async function extractStatusController( let data: ExtractResult | [] = []; - if (extract.status === "completed") { + if (!extract || extract.status === "completed") { const jobData = await getExtractJob(req.params.jobId); - if (!jobData) { + if ((!jobData && !extract) || (jobData && jobData.data.teamId !== req.auth.team_id)) { return res.status(404).json({ success: false, - error: "Job not found", + error: "Extract job not found", }); } - if (!jobData.returnvalue) { + if (jobData) { + const jobStatus = await jobData.getState(); + + if (jobStatus === "completed") { + status = "completed"; + } else if (jobStatus === "failed") { + status = "failed"; + } else { + status = "processing"; + } + } + + if (!jobData?.returnvalue) { // if we got in the split-second where the redis is updated but the bull isn't // just pretend it's still processing - MG - extract.status = "processing"; + status = "processing"; } else { data = jobData.returnvalue ?? []; } } return res.status(200).json({ - success: extract.status === "failed" ? false : true, + success: status === "failed" ? false : true, data, - status: extract.status, + status, error: extract?.error ?? undefined, expiresAt: (await getExtractExpiry(req.params.jobId)).toISOString(), - steps: extract.showSteps ? extract.steps : undefined, - llmUsage: extract.showLLMUsage ? extract.llmUsage : undefined, - sources: extract.showSources ? extract.sources : undefined, - costTracking: extract.showCostTracking ? extract.costTracking : undefined, - sessionIds: extract.sessionIds ? extract.sessionIds : undefined, + steps: extract?.showSteps ? extract.steps : undefined, + llmUsage: extract?.showLLMUsage ? extract.llmUsage : undefined, + sources: extract?.showSources ? extract.sources : undefined, + costTracking: extract?.showCostTracking ? extract.costTracking : undefined, + sessionIds: extract?.sessionIds ? extract.sessionIds : undefined, }); } From 492d97e88903d39b9c91d8697b1efae8517ab0c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Sat, 24 May 2025 00:09:05 +0200 Subject: [PATCH 02/12] reduce logging --- apps/api/src/lib/crawl-redis.ts | 24 +++++++++--------- apps/api/src/lib/gcs-jobs.ts | 6 ++--- .../src/scraper/scrapeURL/engines/index.ts | 25 ++++--------------- apps/api/src/services/logging/log_job.ts | 18 ++++++------- apps/api/src/services/queue-jobs.ts | 8 +++--- 5 files changed, 33 insertions(+), 48 deletions(-) diff --git a/apps/api/src/lib/crawl-redis.ts b/apps/api/src/lib/crawl-redis.ts index caa95429..96de520d 100644 --- a/apps/api/src/lib/crawl-redis.ts +++ b/apps/api/src/lib/crawl-redis.ts @@ -163,15 +163,15 @@ export async function finishCrawlPre(id: string) { await redisConnection.expire("crawl:" + id + ":finished_pre", 24 * 60 * 60); return set === 1; } else { - _logger.debug("Crawl can not be pre-finished yet, not marking as finished.", { - module: "crawl-redis", - method: "finishCrawlPre", - crawlId: id, - jobs_done: await redisConnection.scard("crawl:" + id + ":jobs_done"), - jobs: await redisConnection.scard("crawl:" + id + ":jobs"), - kickoff_finished: - (await redisConnection.get("crawl:" + id + ":kickoff:finish")) !== null, - }); + // _logger.debug("Crawl can not be pre-finished yet, not marking as finished.", { + // module: "crawl-redis", + // method: "finishCrawlPre", + // crawlId: id, + // jobs_done: await redisConnection.scard("crawl:" + id + ":jobs_done"), + // jobs: await redisConnection.scard("crawl:" + id + ":jobs"), + // kickoff_finished: + // (await redisConnection.get("crawl:" + id + ":kickoff:finish")) !== null, + // }); } } @@ -279,9 +279,9 @@ export async function lockURL( (await redisConnection.scard("crawl:" + id + ":visited_unique")) >= sc.crawlerOptions.limit ) { - logger.debug( - "Crawl has already hit visited_unique limit, not locking URL.", - ); + // logger.debug( + // "Crawl has already hit visited_unique limit, not locking URL.", + // ); return false; } } diff --git a/apps/api/src/lib/gcs-jobs.ts b/apps/api/src/lib/gcs-jobs.ts index 278e6e19..3e780457 100644 --- a/apps/api/src/lib/gcs-jobs.ts +++ b/apps/api/src/lib/gcs-jobs.ts @@ -105,9 +105,9 @@ export async function getJobFromGCS(jobId: string): Promise { // TODO: fix the any type (we have multiple Document types in the codebase) export async function getDocFromGCS(url: string): Promise { - logger.info(`Getting f-engine document from GCS`, { - url, - }); +// logger.info(`Getting f-engine document from GCS`, { +// url, +// }); try { if (!process.env.GCS_FIRE_ENGINE_BUCKET_NAME) { return null; diff --git a/apps/api/src/scraper/scrapeURL/engines/index.ts b/apps/api/src/scraper/scrapeURL/engines/index.ts index b59313e3..06c5e072 100644 --- a/apps/api/src/scraper/scrapeURL/engines/index.ts +++ b/apps/api/src/scraper/scrapeURL/engines/index.ts @@ -383,9 +383,8 @@ export function buildFallbackList(meta: Meta): { if (cacheIndex !== -1) { _engines.splice(cacheIndex, 1); } - } else { - meta.logger.debug("Cache engine enabled by useCache option"); } + const prioritySum = [...meta.featureFlags].reduce( (a, x) => a + featureFlagOptions[x].priority, 0, @@ -424,24 +423,6 @@ export function buildFallbackList(meta: Meta): { if (supportScore >= priorityThreshold) { selectedEngines.push({ engine, supportScore, unsupportedFeatures }); - meta.logger.debug(`Engine ${engine} meets feature priority threshold`, { - supportScore, - prioritySum, - priorityThreshold, - featureFlags: [...meta.featureFlags], - unsupportedFeatures, - }); - } else { - meta.logger.debug( - `Engine ${engine} does not meet feature priority threshold`, - { - supportScore, - prioritySum, - priorityThreshold, - featureFlags: [...meta.featureFlags], - unsupportedFeatures, - }, - ); } } @@ -459,6 +440,10 @@ export function buildFallbackList(meta: Meta): { ); } + meta.logger.info("Selected engines", { + selectedEngines, + }); + return selectedEngines; } diff --git a/apps/api/src/services/logging/log_job.ts b/apps/api/src/services/logging/log_job.ts index a1e0a8d5..e73afeb5 100644 --- a/apps/api/src/services/logging/log_job.ts +++ b/apps/api/src/services/logging/log_job.ts @@ -47,18 +47,18 @@ async function indexJob(job: FirecrawlJob): Promise { if (!response.ok) { const errorData = await response.json(); - logger.error(`Failed to send job to external server: ${response.status} ${response.statusText}`, { - error: errorData, - scrapeId: job.job_id, - }); + // logger.error(`Failed to send job to external server: ${response.status} ${response.statusText}`, { + // error: errorData, + // scrapeId: job.job_id, + // }); } else { - logger.debug("Job sent to external server successfully!", { scrapeId: job.job_id }); + // logger.debug("Job sent to external server successfully!", { scrapeId: job.job_id }); } } catch (error) { - logger.error(`Error sending job to external server: ${error.message}`, { - error, - scrapeId: job.job_id, - }); + // logger.error(`Error sending job to external server: ${error.message}`, { + // error, + // scrapeId: job.job_id, + // }); } } diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 2a08de6c..3456c82c 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -132,13 +132,13 @@ async function addScrapeJobRaw( // If above by 2x, send them an email // No need to 2x as if there are more than the max concurrency in the concurrency queue, it is already 2x if(concurrencyQueueJobs > maxConcurrency) { - logger.info("Concurrency limited 2x (single) - ", "Concurrency queue jobs: ", concurrencyQueueJobs, "Max concurrency: ", maxConcurrency, "Team ID: ", webScraperOptions.team_id); + // logger.info("Concurrency limited 2x (single) - ", "Concurrency queue jobs: ", concurrencyQueueJobs, "Max concurrency: ", maxConcurrency, "Team ID: ", webScraperOptions.team_id); // Only send notification if it's not a crawl or batch scrape const shouldSendNotification = await shouldSendConcurrencyLimitNotification(webScraperOptions.team_id); if (shouldSendNotification) { sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { - logger.error("Error sending notification (concurrency limit reached): ", error); + logger.error("Error sending notification (concurrency limit reached)", { error }); }); } } @@ -231,13 +231,13 @@ export async function addScrapeJobs( // equals 2x the max concurrency if(addToCQ.length > maxConcurrency) { - logger.info(`Concurrency limited 2x (multiple) - Concurrency queue jobs: ${addToCQ.length} Max concurrency: ${maxConcurrency} Team ID: ${jobs[0].data.team_id}`); + // logger.info(`Concurrency limited 2x (multiple) - Concurrency queue jobs: ${addToCQ.length} Max concurrency: ${maxConcurrency} Team ID: ${jobs[0].data.team_id}`); // Only send notification if it's not a crawl or batch scrape if (!isCrawlOrBatchScrape(dontAddToCCQ[0].data)) { const shouldSendNotification = await shouldSendConcurrencyLimitNotification(dontAddToCCQ[0].data.team_id); if (shouldSendNotification) { sendNotificationWithCustomDays(dontAddToCCQ[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { - logger.error("Error sending notification (concurrency limit reached): ", error); + logger.error("Error sending notification (concurrency limit reached)", { error }); }); } } From c3738063cf0890a57052007611888a7794407999 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Sun, 25 May 2025 15:50:20 +0200 Subject: [PATCH 03/12] less logs even more --- .../scrapeURL/engines/fire-engine/index.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/api/src/scraper/scrapeURL/engines/fire-engine/index.ts b/apps/api/src/scraper/scrapeURL/engines/fire-engine/index.ts index e69df48b..6c1648bf 100644 --- a/apps/api/src/scraper/scrapeURL/engines/fire-engine/index.ts +++ b/apps/api/src/scraper/scrapeURL/engines/fire-engine/index.ts @@ -245,18 +245,18 @@ export async function scrapeURLWithFireEngineChromeCDP( meta.options.formats.includes("screenshot") || meta.options.formats.includes("screenshot@fullPage") ) { - meta.logger.debug( - "Transforming screenshots from actions into screenshot field", - { screenshots: response.screenshots }, - ); + // meta.logger.debug( + // "Transforming screenshots from actions into screenshot field", + // { screenshots: response.screenshots }, + // ); if (response.screenshots) { response.screenshot = response.screenshots.slice(-1)[0]; response.screenshots = response.screenshots.slice(0, -1); } - meta.logger.debug("Screenshot transformation done", { - screenshots: response.screenshots, - screenshot: response.screenshot, - }); + // meta.logger.debug("Screenshot transformation done", { + // screenshots: response.screenshots, + // screenshot: response.screenshot, + // }); } if (!response.url) { From 474e5a0543737a697cc78bcee4dccb06450db3b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Tue, 27 May 2025 15:39:31 +0200 Subject: [PATCH 04/12] fix(crawler): always set expiry on sitemap links in redis --- apps/api/src/scraper/WebScraper/crawler.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apps/api/src/scraper/WebScraper/crawler.ts b/apps/api/src/scraper/WebScraper/crawler.ts index 34d1426a..56c46faf 100644 --- a/apps/api/src/scraper/WebScraper/crawler.ts +++ b/apps/api/src/scraper/WebScraper/crawler.ts @@ -334,6 +334,12 @@ export class WebCrawler { count++; } + await redisConnection.expire( + "sitemap:" + this.jobId + ":links", + 3600, + "NX", + ); + return count; } catch (error) { if (error.message === "Sitemap fetch timeout") { From a36c6a4f401d06f8d27b135727bf10edded431d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Tue, 27 May 2025 21:33:44 +0200 Subject: [PATCH 05/12] feat(scrapeURL): add unnormalizedSourceURL for url matching DX (FIR-2137) (#1601) * feat(scrapeURL): add unnormalizedSourceURL for url matching DX * fix(tests): fixc --- apps/api/src/__tests__/snips/batch-scrape.test.ts | 8 ++++++++ apps/api/src/__tests__/snips/scrape.test.ts | 8 ++++++++ apps/api/src/controllers/v1/batch-scrape.ts | 15 +++++++-------- apps/api/src/controllers/v1/scrape.ts | 1 + apps/api/src/scraper/scrapeURL/index.ts | 3 ++- 5 files changed, 26 insertions(+), 9 deletions(-) diff --git a/apps/api/src/__tests__/snips/batch-scrape.test.ts b/apps/api/src/__tests__/snips/batch-scrape.test.ts index f3e9e585..8d4c3946 100644 --- a/apps/api/src/__tests__/snips/batch-scrape.test.ts +++ b/apps/api/src/__tests__/snips/batch-scrape.test.ts @@ -48,4 +48,12 @@ describe("Batch scrape tests", () => { }, 180000); }); } + + it.concurrent("sourceURL stays unnormalized", async () => { + const response = await batchScrape({ + urls: ["https://firecrawl.dev/?pagewanted=all&et_blog"], + }); + + expect(response.body.data[0].metadata.sourceURL).toBe("https://firecrawl.dev/?pagewanted=all&et_blog"); + }, 35000); }); diff --git a/apps/api/src/__tests__/snips/scrape.test.ts b/apps/api/src/__tests__/snips/scrape.test.ts index 4703964e..9c2a9e2d 100644 --- a/apps/api/src/__tests__/snips/scrape.test.ts +++ b/apps/api/src/__tests__/snips/scrape.test.ts @@ -366,4 +366,12 @@ describe("Scrape tests", () => { }, 30000); }); } + + it.concurrent("sourceURL stays unnormalized", async () => { + const response = await scrape({ + url: "https://firecrawl.dev/?pagewanted=all&et_blog", + }); + + expect(response.metadata.sourceURL).toBe("https://firecrawl.dev/?pagewanted=all&et_blog"); + }, 35000); }); diff --git a/apps/api/src/controllers/v1/batch-scrape.ts b/apps/api/src/controllers/v1/batch-scrape.ts index 326aba5f..d14f8cd7 100644 --- a/apps/api/src/controllers/v1/batch-scrape.ts +++ b/apps/api/src/controllers/v1/batch-scrape.ts @@ -22,7 +22,6 @@ import { getJobPriority } from "../../lib/job-priority"; import { addScrapeJobs } from "../../services/queue-jobs"; import { callWebhook } from "../../services/webhook"; import { logger as _logger } from "../../lib/logger"; -import { CostTracking } from "../../lib/extract/extraction-service"; import { BLOCKLISTED_URL_MESSAGE } from "../../lib/strings"; import { isUrlBlocked } from "../../scraper/WebScraper/utils/blocklist"; @@ -30,6 +29,8 @@ export async function batchScrapeController( req: RequestWithAuth<{}, BatchScrapeResponse, BatchScrapeRequest>, res: Response, ) { + const preNormalizedBody = { ...req.body }; + if (req.body?.ignoreInvalidURLs === true) { req.body = batchScrapeRequestSchemaNoURLValidation.parse(req.body); } else { @@ -46,6 +47,7 @@ export async function batchScrapeController( }); let urls = req.body.urls; + let unnormalizedURLs = preNormalizedBody.urls; let invalidURLs: string[] | undefined = undefined; if (req.body.ignoreInvalidURLs) { @@ -53,11 +55,13 @@ export async function batchScrapeController( let pendingURLs = urls; urls = []; + unnormalizedURLs = []; for (const u of pendingURLs) { try { const nu = urlSchema.parse(u); if (!isUrlBlocked(nu, req.acuc?.flags ?? null)) { urls.push(nu); + unnormalizedURLs.push(u); } else { invalidURLs.push(u); } @@ -86,12 +90,6 @@ export async function batchScrapeController( await logCrawl(id, req.auth.team_id); } - let { remainingCredits } = req.account!; - const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true"; - if (!useDbAuthentication) { - remainingCredits = Infinity; - } - const sc: StoredCrawl = req.body.appendToId ? ((await getCrawl(req.body.appendToId)) as StoredCrawl) : { @@ -127,7 +125,7 @@ export async function batchScrapeController( delete (scrapeOptions as any).urls; delete (scrapeOptions as any).appendToId; - const jobs = urls.map((x) => { + const jobs = urls.map((x, i) => { return { data: { url: x, @@ -142,6 +140,7 @@ export async function batchScrapeController( webhook: req.body.webhook, internalOptions: { saveScrapeResultToGCS: process.env.GCS_FIRE_ENGINE_BUCKET_NAME ? true : false, + unnormalizedSourceURL: unnormalizedURLs[i], }, }, opts: { diff --git a/apps/api/src/controllers/v1/scrape.ts b/apps/api/src/controllers/v1/scrape.ts index 8da72dae..092d86e8 100644 --- a/apps/api/src/controllers/v1/scrape.ts +++ b/apps/api/src/controllers/v1/scrape.ts @@ -51,6 +51,7 @@ export async function scrapeController( internalOptions: { teamId: req.auth.team_id, saveScrapeResultToGCS: process.env.GCS_FIRE_ENGINE_BUCKET_NAME ? true : false, + unnormalizedSourceURL: preNormalizedBody.url, }, origin: req.body.origin, is_scrape: true, diff --git a/apps/api/src/scraper/scrapeURL/index.ts b/apps/api/src/scraper/scrapeURL/index.ts index 85254f37..fbc0c53b 100644 --- a/apps/api/src/scraper/scrapeURL/index.ts +++ b/apps/api/src/scraper/scrapeURL/index.ts @@ -182,6 +182,7 @@ export type InternalOptions = { fromCache?: boolean; // Indicates if the document was retrieved from cache abort?: AbortSignal; urlInvisibleInCurrentCrawl?: boolean; + unnormalizedSourceURL?: string; saveScrapeResultToGCS?: boolean; // Passed along to fire-engine }; @@ -373,7 +374,7 @@ async function scrapeURLLoop(meta: Meta): Promise { screenshot: result.result.screenshot, actions: result.result.actions, metadata: { - sourceURL: meta.url, + sourceURL: meta.internalOptions.unnormalizedSourceURL ?? meta.url, url: result.result.url, statusCode: result.result.statusCode, error: result.result.error, From 299e3e29e0d391b6d93ab196880ef6fa625bc3dc Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 27 May 2025 18:44:24 -0300 Subject: [PATCH 06/12] Update batch_billing.ts --- apps/api/src/services/billing/batch_billing.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/api/src/services/billing/batch_billing.ts b/apps/api/src/services/billing/batch_billing.ts index 8006b880..1a13e8e0 100644 --- a/apps/api/src/services/billing/batch_billing.ts +++ b/apps/api/src/services/billing/batch_billing.ts @@ -9,7 +9,7 @@ import { getACUC, setCachedACUC, setCachedACUCTeam } from "../../controllers/aut // Configuration constants const BATCH_KEY = "billing_batch"; const BATCH_LOCK_KEY = "billing_batch_lock"; -const BATCH_SIZE = 50; // Batch size for processing +const BATCH_SIZE = 1000; // Batch size for processing const BATCH_TIMEOUT = 15000; // 15 seconds processing interval const LOCK_TIMEOUT = 30000; // 30 seconds lock timeout From 756b452a01f83bdf144e525a050efc913cd7bb93 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 27 May 2025 19:05:00 -0300 Subject: [PATCH 07/12] Update batch_billing.ts --- apps/api/src/services/billing/batch_billing.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/api/src/services/billing/batch_billing.ts b/apps/api/src/services/billing/batch_billing.ts index 1a13e8e0..fa72bbc7 100644 --- a/apps/api/src/services/billing/batch_billing.ts +++ b/apps/api/src/services/billing/batch_billing.ts @@ -9,7 +9,7 @@ import { getACUC, setCachedACUC, setCachedACUCTeam } from "../../controllers/aut // Configuration constants const BATCH_KEY = "billing_batch"; const BATCH_LOCK_KEY = "billing_batch_lock"; -const BATCH_SIZE = 1000; // Batch size for processing +const BATCH_SIZE = 100; // Batch size for processing const BATCH_TIMEOUT = 15000; // 15 seconds processing interval const LOCK_TIMEOUT = 30000; // 30 seconds lock timeout From a5efff07f909759978ad585695ce05ecb9ca3a1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Wed, 28 May 2025 09:58:04 +0200 Subject: [PATCH 08/12] feat(apps/api): add support for a separate, non-eviction Redis (#1600) * feat(apps/api): add support for a separate, non-eviction Redis * fix: misimport --- apps/api/src/controllers/v0/admin/cclog.ts | 6 +- apps/api/src/controllers/v0/crawl-cancel.ts | 4 +- apps/api/src/controllers/v0/crawl-status.ts | 5 +- apps/api/src/controllers/v0/crawl.ts | 4 +- apps/api/src/controllers/v0/keyAuth.ts | 4 +- apps/api/src/controllers/v0/scrape.ts | 5 +- apps/api/src/controllers/v0/search.ts | 5 +- .../src/controllers/v1/concurrency-check.ts | 4 +- apps/api/src/controllers/v1/crawl-errors.ts | 5 +- apps/api/src/lib/concurrency-limit.ts | 36 +++---- apps/api/src/lib/crawl-redis.ts | 94 +++++++++---------- .../lib/deep-research/deep-research-redis.ts | 14 +-- apps/api/src/lib/extract/extract-redis.ts | 14 +-- .../generate-llmstxt-redis.ts | 14 +-- apps/api/src/lib/job-priority.ts | 10 +- apps/api/src/scraper/WebScraper/crawler.ts | 14 +-- .../notification/email_notification.ts | 14 +-- apps/api/src/services/queue-service.ts | 4 - apps/api/src/services/queue-worker.ts | 18 ++-- apps/api/src/services/redis.ts | 5 +- 20 files changed, 140 insertions(+), 139 deletions(-) diff --git a/apps/api/src/controllers/v0/admin/cclog.ts b/apps/api/src/controllers/v0/admin/cclog.ts index f9654285..7d81201e 100644 --- a/apps/api/src/controllers/v0/admin/cclog.ts +++ b/apps/api/src/controllers/v0/admin/cclog.ts @@ -1,4 +1,4 @@ -import { redisConnection } from "../../../services/queue-service"; +import { redisEvictConnection } from "../../../services/redis"; import { supabase_service } from "../../../services/supabase"; import { logger as _logger } from "../../../lib/logger"; import { Request, Response } from "express"; @@ -10,7 +10,7 @@ async function cclog() { let cursor = 0; do { - const result = await redisConnection.scan(cursor, "MATCH", "concurrency-limiter:*", "COUNT", 100000); + const result = await redisEvictConnection.scan(cursor, "MATCH", "concurrency-limiter:*", "COUNT", 100000); cursor = parseInt(result[0], 10); const usable = result[1].filter(x => !x.includes("preview_")); @@ -25,7 +25,7 @@ async function cclog() { for (const x of usable) { const at = new Date(); - const concurrency = await redisConnection.zrangebyscore(x, Date.now(), Infinity); + const concurrency = await redisEvictConnection.zrangebyscore(x, Date.now(), Infinity); if (concurrency) { entries.push({ team_id: x.split(":")[1], diff --git a/apps/api/src/controllers/v0/crawl-cancel.ts b/apps/api/src/controllers/v0/crawl-cancel.ts index 16f6cc87..a9c07b8d 100644 --- a/apps/api/src/controllers/v0/crawl-cancel.ts +++ b/apps/api/src/controllers/v0/crawl-cancel.ts @@ -6,7 +6,7 @@ import { logger } from "../../../src/lib/logger"; import { getCrawl, saveCrawl } from "../../../src/lib/crawl-redis"; import * as Sentry from "@sentry/node"; import { configDotenv } from "dotenv"; -import { redisConnection } from "../../services/queue-service"; +import { redisEvictConnection } from "../../../src/services/redis"; configDotenv(); export async function crawlCancelController(req: Request, res: Response) { @@ -20,7 +20,7 @@ export async function crawlCancelController(req: Request, res: Response) { const { team_id } = auth; - redisConnection.sadd("teams_using_v0", team_id) + redisEvictConnection.sadd("teams_using_v0", team_id) .catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id })); const sc = await getCrawl(req.params.jobId); diff --git a/apps/api/src/controllers/v0/crawl-status.ts b/apps/api/src/controllers/v0/crawl-status.ts index f9ed7917..92482817 100644 --- a/apps/api/src/controllers/v0/crawl-status.ts +++ b/apps/api/src/controllers/v0/crawl-status.ts @@ -1,7 +1,8 @@ import { Request, Response } from "express"; import { authenticateUser } from "../auth"; import { RateLimiterMode } from "../../../src/types"; -import { getScrapeQueue, redisConnection } from "../../../src/services/queue-service"; +import { getScrapeQueue } from "../../../src/services/queue-service"; +import { redisEvictConnection } from "../../../src/services/redis"; import { logger } from "../../../src/lib/logger"; import { getCrawl, getCrawlJobs } from "../../../src/lib/crawl-redis"; import { supabaseGetJobsByCrawlId } from "../../../src/lib/supabase-jobs"; @@ -80,7 +81,7 @@ export async function crawlStatusController(req: Request, res: Response) { const { team_id } = auth; - redisConnection.sadd("teams_using_v0", team_id) + redisEvictConnection.sadd("teams_using_v0", team_id) .catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id })); const sc = await getCrawl(req.params.jobId); diff --git a/apps/api/src/controllers/v0/crawl.ts b/apps/api/src/controllers/v0/crawl.ts index 5755b3bb..e70c79d9 100644 --- a/apps/api/src/controllers/v0/crawl.ts +++ b/apps/api/src/controllers/v0/crawl.ts @@ -24,7 +24,7 @@ import { saveCrawl, StoredCrawl, } from "../../../src/lib/crawl-redis"; -import { getScrapeQueue, redisConnection } from "../../../src/services/queue-service"; +import { redisEvictConnection } from "../../../src/services/redis"; import { checkAndUpdateURL } from "../../../src/lib/validateUrl"; import * as Sentry from "@sentry/node"; import { getJobPriority } from "../../lib/job-priority"; @@ -41,7 +41,7 @@ export async function crawlController(req: Request, res: Response) { const { team_id, chunk } = auth; - redisConnection.sadd("teams_using_v0", team_id) + redisEvictConnection.sadd("teams_using_v0", team_id) .catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id })); if (req.headers["x-idempotency-key"]) { diff --git a/apps/api/src/controllers/v0/keyAuth.ts b/apps/api/src/controllers/v0/keyAuth.ts index ce8dc7ce..baf4427e 100644 --- a/apps/api/src/controllers/v0/keyAuth.ts +++ b/apps/api/src/controllers/v0/keyAuth.ts @@ -2,7 +2,7 @@ import { AuthResponse, RateLimiterMode } from "../../types"; import { Request, Response } from "express"; import { authenticateUser } from "../auth"; -import { redisConnection } from "../../services/queue-service"; +import { redisEvictConnection } from "../../../src/services/redis"; import { logger } from "../../lib/logger"; export const keyAuthController = async (req: Request, res: Response) => { @@ -13,7 +13,7 @@ export const keyAuthController = async (req: Request, res: Response) => { return res.status(auth.status).json({ error: auth.error }); } - redisConnection.sadd("teams_using_v0", auth.team_id) + redisEvictConnection.sadd("teams_using_v0", auth.team_id) .catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id: auth.team_id })); // if success, return success: true diff --git a/apps/api/src/controllers/v0/scrape.ts b/apps/api/src/controllers/v0/scrape.ts index 8eb38f81..f7a9fbdd 100644 --- a/apps/api/src/controllers/v0/scrape.ts +++ b/apps/api/src/controllers/v0/scrape.ts @@ -21,7 +21,8 @@ import { defaultOrigin, } from "../../lib/default-values"; import { addScrapeJob, waitForJob } from "../../services/queue-jobs"; -import { getScrapeQueue, redisConnection } from "../../services/queue-service"; +import { getScrapeQueue } from "../../services/queue-service"; +import { redisEvictConnection } from "../../../src/services/redis"; import { v4 as uuidv4 } from "uuid"; import { logger } from "../../lib/logger"; import * as Sentry from "@sentry/node"; @@ -184,7 +185,7 @@ export async function scrapeController(req: Request, res: Response) { const { team_id, chunk } = auth; - redisConnection.sadd("teams_using_v0", team_id) + redisEvictConnection.sadd("teams_using_v0", team_id) .catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id })); const crawlerOptions = req.body.crawlerOptions ?? {}; diff --git a/apps/api/src/controllers/v0/search.ts b/apps/api/src/controllers/v0/search.ts index 1d5d16fb..15172e37 100644 --- a/apps/api/src/controllers/v0/search.ts +++ b/apps/api/src/controllers/v0/search.ts @@ -11,7 +11,8 @@ import { search } from "../../search"; import { isUrlBlocked } from "../../scraper/WebScraper/utils/blocklist"; import { v4 as uuidv4 } from "uuid"; import { logger } from "../../lib/logger"; -import { getScrapeQueue, redisConnection } from "../../services/queue-service"; +import { getScrapeQueue } from "../../services/queue-service"; +import { redisEvictConnection } from "../../../src/services/redis"; import { addScrapeJob, waitForJob } from "../../services/queue-jobs"; import * as Sentry from "@sentry/node"; import { getJobPriority } from "../../lib/job-priority"; @@ -167,7 +168,7 @@ export async function searchController(req: Request, res: Response) { } const { team_id, chunk } = auth; - redisConnection.sadd("teams_using_v0", team_id) + redisEvictConnection.sadd("teams_using_v0", team_id) .catch(error => logger.error("Failed to add team to teams_using_v0", { error, team_id })); const crawlerOptions = req.body.crawlerOptions ?? {}; diff --git a/apps/api/src/controllers/v1/concurrency-check.ts b/apps/api/src/controllers/v1/concurrency-check.ts index 1aa69363..a6313c72 100644 --- a/apps/api/src/controllers/v1/concurrency-check.ts +++ b/apps/api/src/controllers/v1/concurrency-check.ts @@ -4,7 +4,7 @@ import { RequestWithAuth, } from "./types"; import { Response } from "express"; -import { redisConnection } from "../../services/queue-service"; +import { redisEvictConnection } from "../../../src/services/redis"; // Basically just middleware and error wrapping export async function concurrencyCheckController( @@ -13,7 +13,7 @@ export async function concurrencyCheckController( ) { const concurrencyLimiterKey = "concurrency-limiter:" + req.auth.team_id; const now = Date.now(); - const activeJobsOfTeam = await redisConnection.zrangebyscore( + const activeJobsOfTeam = await redisEvictConnection.zrangebyscore( concurrencyLimiterKey, now, Infinity, diff --git a/apps/api/src/controllers/v1/crawl-errors.ts b/apps/api/src/controllers/v1/crawl-errors.ts index 979a6d7a..d8d029ee 100644 --- a/apps/api/src/controllers/v1/crawl-errors.ts +++ b/apps/api/src/controllers/v1/crawl-errors.ts @@ -8,7 +8,8 @@ import { getCrawl, getCrawlJobs, } from "../../lib/crawl-redis"; -import { getScrapeQueue, redisConnection } from "../../services/queue-service"; +import { getScrapeQueue } from "../../services/queue-service"; +import { redisEvictConnection } from "../../../src/services/redis"; import { configDotenv } from "dotenv"; import { Job } from "bullmq"; configDotenv(); @@ -65,7 +66,7 @@ export async function crawlErrorsController( url: x.data.url, error: x.failedReason, })), - robotsBlocked: await redisConnection.smembers( + robotsBlocked: await redisEvictConnection.smembers( "crawl:" + req.params.jobId + ":robots_blocked", ), }); diff --git a/apps/api/src/lib/concurrency-limit.ts b/apps/api/src/lib/concurrency-limit.ts index 8901413f..59634e0b 100644 --- a/apps/api/src/lib/concurrency-limit.ts +++ b/apps/api/src/lib/concurrency-limit.ts @@ -1,4 +1,4 @@ -import { redisConnection } from "../services/queue-service"; +import { redisEvictConnection } from "../services/redis"; import type { JobsOptions } from "bullmq"; const constructKey = (team_id: string) => "concurrency-limiter:" + team_id; @@ -12,14 +12,14 @@ export async function cleanOldConcurrencyLimitEntries( team_id: string, now: number = Date.now(), ) { - await redisConnection.zremrangebyscore(constructKey(team_id), -Infinity, now); + await redisEvictConnection.zremrangebyscore(constructKey(team_id), -Infinity, now); } export async function getConcurrencyLimitActiveJobs( team_id: string, now: number = Date.now(), ): Promise { - return await redisConnection.zrangebyscore( + return await redisEvictConnection.zrangebyscore( constructKey(team_id), now, Infinity, @@ -32,7 +32,7 @@ export async function pushConcurrencyLimitActiveJob( timeout: number, now: number = Date.now(), ) { - await redisConnection.zadd( + await redisEvictConnection.zadd( constructKey(team_id), now + timeout, id, @@ -43,7 +43,7 @@ export async function removeConcurrencyLimitActiveJob( team_id: string, id: string, ) { - await redisConnection.zrem(constructKey(team_id), id); + await redisEvictConnection.zrem(constructKey(team_id), id); } export type ConcurrencyLimitedJob = { @@ -56,8 +56,8 @@ export type ConcurrencyLimitedJob = { export async function takeConcurrencyLimitedJob( team_id: string, ): Promise { - await redisConnection.zremrangebyscore(constructQueueKey(team_id), -Infinity, Date.now()); - const res = await redisConnection.zmpop(1, constructQueueKey(team_id), "MIN"); + await redisEvictConnection.zremrangebyscore(constructQueueKey(team_id), -Infinity, Date.now()); + const res = await redisEvictConnection.zmpop(1, constructQueueKey(team_id), "MIN"); if (res === null || res === undefined) { return null; } @@ -70,7 +70,7 @@ export async function pushConcurrencyLimitedJob( job: ConcurrencyLimitedJob, timeout: number, ) { - await redisConnection.zadd( + await redisEvictConnection.zadd( constructQueueKey(team_id), Date.now() + timeout, JSON.stringify(job), @@ -80,11 +80,11 @@ export async function pushConcurrencyLimitedJob( export async function getConcurrencyLimitedJobs( team_id: string, ) { - return new Set((await redisConnection.zrange(constructQueueKey(team_id), 0, -1)).map(x => JSON.parse(x).id)); + return new Set((await redisEvictConnection.zrange(constructQueueKey(team_id), 0, -1)).map(x => JSON.parse(x).id)); } export async function getConcurrencyQueueJobsCount(team_id: string): Promise { - const count = await redisConnection.zcard(constructQueueKey(team_id)); + const count = await redisEvictConnection.zcard(constructQueueKey(team_id)); return count; } @@ -92,14 +92,14 @@ export async function cleanOldCrawlConcurrencyLimitEntries( crawl_id: string, now: number = Date.now(), ) { - await redisConnection.zremrangebyscore(constructCrawlKey(crawl_id), -Infinity, now); + await redisEvictConnection.zremrangebyscore(constructCrawlKey(crawl_id), -Infinity, now); } export async function getCrawlConcurrencyLimitActiveJobs( crawl_id: string, now: number = Date.now(), ): Promise { - return await redisConnection.zrangebyscore( + return await redisEvictConnection.zrangebyscore( constructCrawlKey(crawl_id), now, Infinity, @@ -112,7 +112,7 @@ export async function pushCrawlConcurrencyLimitActiveJob( timeout: number, now: number = Date.now(), ) { - await redisConnection.zadd( + await redisEvictConnection.zadd( constructCrawlKey(crawl_id), now + timeout, id, @@ -123,13 +123,13 @@ export async function removeCrawlConcurrencyLimitActiveJob( crawl_id: string, id: string, ) { - await redisConnection.zrem(constructCrawlKey(crawl_id), id); + await redisEvictConnection.zrem(constructCrawlKey(crawl_id), id); } export async function takeCrawlConcurrencyLimitedJob( crawl_id: string, ): Promise { - const res = await redisConnection.zmpop(1, constructCrawlQueueKey(crawl_id), "MIN"); + const res = await redisEvictConnection.zmpop(1, constructCrawlQueueKey(crawl_id), "MIN"); if (res === null || res === undefined) { return null; } @@ -140,7 +140,7 @@ export async function pushCrawlConcurrencyLimitedJob( crawl_id: string, job: ConcurrencyLimitedJob, ) { - await redisConnection.zadd( + await redisEvictConnection.zadd( constructCrawlQueueKey(crawl_id), job.priority ?? 1, JSON.stringify(job), @@ -150,10 +150,10 @@ export async function pushCrawlConcurrencyLimitedJob( export async function getCrawlConcurrencyLimitedJobs( crawl_id: string, ) { - return new Set((await redisConnection.zrange(constructCrawlQueueKey(crawl_id), 0, -1)).map(x => JSON.parse(x).id)); + return new Set((await redisEvictConnection.zrange(constructCrawlQueueKey(crawl_id), 0, -1)).map(x => JSON.parse(x).id)); } export async function getCrawlConcurrencyQueueJobsCount(crawl_id: string): Promise { - const count = await redisConnection.zcard(constructCrawlQueueKey(crawl_id)); + const count = await redisEvictConnection.zcard(constructCrawlQueueKey(crawl_id)); return count; } diff --git a/apps/api/src/lib/crawl-redis.ts b/apps/api/src/lib/crawl-redis.ts index 96de520d..0984a628 100644 --- a/apps/api/src/lib/crawl-redis.ts +++ b/apps/api/src/lib/crawl-redis.ts @@ -1,7 +1,7 @@ import { InternalOptions } from "../scraper/scrapeURL"; import { ScrapeOptions, TeamFlags } from "../controllers/v1/types"; import { WebCrawler } from "../scraper/WebScraper/crawler"; -import { redisConnection } from "../services/queue-service"; +import { redisEvictConnection } from "../services/redis"; import { logger as _logger } from "./logger"; import { getAdjustedMaxDepth } from "../scraper/WebScraper/utils/maxDepthUtils"; @@ -24,24 +24,24 @@ export async function saveCrawl(id: string, crawl: StoredCrawl) { crawlId: id, teamId: crawl.team_id, }); - await redisConnection.set("crawl:" + id, JSON.stringify(crawl)); - await redisConnection.expire("crawl:" + id, 24 * 60 * 60); + await redisEvictConnection.set("crawl:" + id, JSON.stringify(crawl)); + await redisEvictConnection.expire("crawl:" + id, 24 * 60 * 60); } export async function getCrawl(id: string): Promise { - const x = await redisConnection.get("crawl:" + id); + const x = await redisEvictConnection.get("crawl:" + id); if (x === null) { return null; } - await redisConnection.expire("crawl:" + id, 24 * 60 * 60); + await redisEvictConnection.expire("crawl:" + id, 24 * 60 * 60); return JSON.parse(x); } export async function getCrawlExpiry(id: string): Promise { const d = new Date(); - const ttl = await redisConnection.pttl("crawl:" + id); + const ttl = await redisEvictConnection.pttl("crawl:" + id); d.setMilliseconds(d.getMilliseconds() + ttl); d.setMilliseconds(0); return d; @@ -54,8 +54,8 @@ export async function addCrawlJob(id: string, job_id: string) { method: "addCrawlJob", crawlId: id, }); - await redisConnection.sadd("crawl:" + id + ":jobs", job_id); - await redisConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60); + await redisEvictConnection.sadd("crawl:" + id + ":jobs", job_id); + await redisEvictConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60); } export async function addCrawlJobs(id: string, job_ids: string[]) { @@ -67,8 +67,8 @@ export async function addCrawlJobs(id: string, job_ids: string[]) { method: "addCrawlJobs", crawlId: id, }); - await redisConnection.sadd("crawl:" + id + ":jobs", ...job_ids); - await redisConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60); + await redisEvictConnection.sadd("crawl:" + id + ":jobs", ...job_ids); + await redisEvictConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60); } export async function addCrawlJobDone( @@ -82,32 +82,32 @@ export async function addCrawlJobDone( method: "addCrawlJobDone", crawlId: id, }); - await redisConnection.sadd("crawl:" + id + ":jobs_done", job_id); - await redisConnection.expire( + await redisEvictConnection.sadd("crawl:" + id + ":jobs_done", job_id); + await redisEvictConnection.expire( "crawl:" + id + ":jobs_done", 24 * 60 * 60, ); if (success) { - await redisConnection.rpush("crawl:" + id + ":jobs_done_ordered", job_id); + await redisEvictConnection.rpush("crawl:" + id + ":jobs_done_ordered", job_id); } else { // in case it's already been pushed, make sure it's removed - await redisConnection.lrem( + await redisEvictConnection.lrem( "crawl:" + id + ":jobs_done_ordered", -1, job_id, ); } - await redisConnection.expire( + await redisEvictConnection.expire( "crawl:" + id + ":jobs_done_ordered", 24 * 60 * 60, ); } export async function getDoneJobsOrderedLength(id: string): Promise { - await redisConnection.expire("crawl:" + id + ":jobs_done_ordered", 24 * 60 * 60); - return await redisConnection.llen("crawl:" + id + ":jobs_done_ordered"); + await redisEvictConnection.expire("crawl:" + id + ":jobs_done_ordered", 24 * 60 * 60); + return await redisEvictConnection.llen("crawl:" + id + ":jobs_done_ordered"); } export async function getDoneJobsOrdered( @@ -115,8 +115,8 @@ export async function getDoneJobsOrdered( start = 0, end = -1, ): Promise { - await redisConnection.expire("crawl:" + id + ":jobs_done_ordered", 24 * 60 * 60); - return await redisConnection.lrange( + await redisEvictConnection.expire("crawl:" + id + ":jobs_done_ordered", 24 * 60 * 60); + return await redisEvictConnection.lrange( "crawl:" + id + ":jobs_done_ordered", start, end, @@ -124,27 +124,27 @@ export async function getDoneJobsOrdered( } export async function isCrawlFinished(id: string) { - await redisConnection.expire("crawl:" + id + ":kickoff:finish", 24 * 60 * 60); + await redisEvictConnection.expire("crawl:" + id + ":kickoff:finish", 24 * 60 * 60); return ( - (await redisConnection.scard("crawl:" + id + ":jobs_done")) === - (await redisConnection.scard("crawl:" + id + ":jobs")) && - (await redisConnection.get("crawl:" + id + ":kickoff:finish")) !== null + (await redisEvictConnection.scard("crawl:" + id + ":jobs_done")) === + (await redisEvictConnection.scard("crawl:" + id + ":jobs")) && + (await redisEvictConnection.get("crawl:" + id + ":kickoff:finish")) !== null ); } export async function isCrawlKickoffFinished(id: string) { - await redisConnection.expire("crawl:" + id + ":kickoff:finish", 24 * 60 * 60); + await redisEvictConnection.expire("crawl:" + id + ":kickoff:finish", 24 * 60 * 60); return ( - (await redisConnection.get("crawl:" + id + ":kickoff:finish")) !== null + (await redisEvictConnection.get("crawl:" + id + ":kickoff:finish")) !== null ); } export async function isCrawlFinishedLocked(id: string) { - return await redisConnection.exists("crawl:" + id + ":finish"); + return await redisEvictConnection.exists("crawl:" + id + ":finish"); } export async function finishCrawlKickoff(id: string) { - await redisConnection.set( + await redisEvictConnection.set( "crawl:" + id + ":kickoff:finish", "yes", "EX", @@ -159,18 +159,18 @@ export async function finishCrawlPre(id: string) { method: "finishCrawlPre", crawlId: id, }); - const set = await redisConnection.setnx("crawl:" + id + ":finished_pre", "yes"); - await redisConnection.expire("crawl:" + id + ":finished_pre", 24 * 60 * 60); + const set = await redisEvictConnection.setnx("crawl:" + id + ":finished_pre", "yes"); + await redisEvictConnection.expire("crawl:" + id + ":finished_pre", 24 * 60 * 60); return set === 1; } else { // _logger.debug("Crawl can not be pre-finished yet, not marking as finished.", { // module: "crawl-redis", // method: "finishCrawlPre", // crawlId: id, - // jobs_done: await redisConnection.scard("crawl:" + id + ":jobs_done"), - // jobs: await redisConnection.scard("crawl:" + id + ":jobs"), + // jobs_done: await redisEvictConnection.scard("crawl:" + id + ":jobs_done"), + // jobs: await redisEvictConnection.scard("crawl:" + id + ":jobs"), // kickoff_finished: - // (await redisConnection.get("crawl:" + id + ":kickoff:finish")) !== null, + // (await redisEvictConnection.get("crawl:" + id + ":kickoff:finish")) !== null, // }); } } @@ -181,16 +181,16 @@ export async function finishCrawl(id: string) { method: "finishCrawl", crawlId: id, }); - await redisConnection.set("crawl:" + id + ":finish", "yes"); - await redisConnection.expire("crawl:" + id + ":finish", 24 * 60 * 60); + await redisEvictConnection.set("crawl:" + id + ":finish", "yes"); + await redisEvictConnection.expire("crawl:" + id + ":finish", 24 * 60 * 60); } export async function getCrawlJobs(id: string): Promise { - return await redisConnection.smembers("crawl:" + id + ":jobs"); + return await redisEvictConnection.smembers("crawl:" + id + ":jobs"); } export async function getCrawlJobCount(id: string): Promise { - return await redisConnection.scard("crawl:" + id + ":jobs"); + return await redisEvictConnection.scard("crawl:" + id + ":jobs"); } export function normalizeURL(url: string, sc: StoredCrawl): string { @@ -276,7 +276,7 @@ export async function lockURL( if (typeof sc.crawlerOptions?.limit === "number") { if ( - (await redisConnection.scard("crawl:" + id + ":visited_unique")) >= + (await redisEvictConnection.scard("crawl:" + id + ":visited_unique")) >= sc.crawlerOptions.limit ) { // logger.debug( @@ -291,22 +291,22 @@ export async function lockURL( let res: boolean; if (!sc.crawlerOptions?.deduplicateSimilarURLs) { - res = (await redisConnection.sadd("crawl:" + id + ":visited", url)) !== 0; + res = (await redisEvictConnection.sadd("crawl:" + id + ":visited", url)) !== 0; } else { const permutations = generateURLPermutations(url).map((x) => x.href); // logger.debug("Adding URL permutations for URL " + JSON.stringify(url) + "...", { permutations }); - const x = await redisConnection.sadd( + const x = await redisEvictConnection.sadd( "crawl:" + id + ":visited", ...permutations, ); res = x === permutations.length; } - await redisConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60); + await redisEvictConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60); if (res) { - await redisConnection.sadd("crawl:" + id + ":visited_unique", url); - await redisConnection.expire( + await redisEvictConnection.sadd("crawl:" + id + ":visited_unique", url); + await redisEvictConnection.expire( "crawl:" + id + ":visited_unique", 24 * 60 * 60, ); @@ -336,29 +336,29 @@ export async function lockURLs( // Add to visited_unique set logger.debug("Locking " + urls.length + " URLs..."); - await redisConnection.sadd("crawl:" + id + ":visited_unique", ...urls); - await redisConnection.expire( + await redisEvictConnection.sadd("crawl:" + id + ":visited_unique", ...urls); + await redisEvictConnection.expire( "crawl:" + id + ":visited_unique", 24 * 60 * 60, ); let res: boolean; if (!sc.crawlerOptions?.deduplicateSimilarURLs) { - const x = await redisConnection.sadd("crawl:" + id + ":visited", ...urls); + const x = await redisEvictConnection.sadd("crawl:" + id + ":visited", ...urls); res = x === urls.length; } else { const allPermutations = urls.flatMap((url) => generateURLPermutations(url).map((x) => x.href), ); logger.debug("Adding " + allPermutations.length + " URL permutations..."); - const x = await redisConnection.sadd( + const x = await redisEvictConnection.sadd( "crawl:" + id + ":visited", ...allPermutations, ); res = x === allPermutations.length; } - await redisConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60); + await redisEvictConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60); logger.debug("lockURLs final result: " + res, { res }); return res; diff --git a/apps/api/src/lib/deep-research/deep-research-redis.ts b/apps/api/src/lib/deep-research/deep-research-redis.ts index 3e846b49..b2a0b8c8 100644 --- a/apps/api/src/lib/deep-research/deep-research-redis.ts +++ b/apps/api/src/lib/deep-research/deep-research-redis.ts @@ -1,4 +1,4 @@ -import { redisConnection } from "../../services/queue-service"; +import { redisEvictConnection } from "../../services/redis"; import { logger as _logger } from "../logger"; export enum DeepResearchStep { @@ -52,12 +52,12 @@ const DEEP_RESEARCH_TTL = 6 * 60 * 60; export async function saveDeepResearch(id: string, research: StoredDeepResearch) { _logger.debug("Saving deep research " + id + " to Redis..."); - await redisConnection.set("deep-research:" + id, JSON.stringify(research)); - await redisConnection.expire("deep-research:" + id, DEEP_RESEARCH_TTL); + await redisEvictConnection.set("deep-research:" + id, JSON.stringify(research)); + await redisEvictConnection.expire("deep-research:" + id, DEEP_RESEARCH_TTL); } export async function getDeepResearch(id: string): Promise { - const x = await redisConnection.get("deep-research:" + id); + const x = await redisEvictConnection.get("deep-research:" + id); return x ? JSON.parse(x) : null; } @@ -91,13 +91,13 @@ export async function updateDeepResearch( - await redisConnection.set("deep-research:" + id, JSON.stringify(updatedResearch)); - await redisConnection.expire("deep-research:" + id, DEEP_RESEARCH_TTL); + await redisEvictConnection.set("deep-research:" + id, JSON.stringify(updatedResearch)); + await redisEvictConnection.expire("deep-research:" + id, DEEP_RESEARCH_TTL); } export async function getDeepResearchExpiry(id: string): Promise { const d = new Date(); - const ttl = await redisConnection.pttl("deep-research:" + id); + const ttl = await redisEvictConnection.pttl("deep-research:" + id); d.setMilliseconds(d.getMilliseconds() + ttl); d.setMilliseconds(0); return d; diff --git a/apps/api/src/lib/extract/extract-redis.ts b/apps/api/src/lib/extract/extract-redis.ts index d256c582..397a8a60 100644 --- a/apps/api/src/lib/extract/extract-redis.ts +++ b/apps/api/src/lib/extract/extract-redis.ts @@ -1,4 +1,4 @@ -import { redisConnection } from "../../services/queue-service"; +import { redisEvictConnection } from "../../services/redis"; import { logger as _logger } from "../logger"; import { CostTracking } from "./extraction-service"; @@ -61,12 +61,12 @@ export async function saveExtract(id: string, extract: StoredExtract) { discoveredLinks: step.discoveredLinks?.slice(0, STEPS_MAX_DISCOVERED_LINKS) })) }; - await redisConnection.set("extract:" + id, JSON.stringify(minimalExtract)); - await redisConnection.expire("extract:" + id, EXTRACT_TTL); + await redisEvictConnection.set("extract:" + id, JSON.stringify(minimalExtract)); + await redisEvictConnection.expire("extract:" + id, EXTRACT_TTL); } export async function getExtract(id: string): Promise { - const x = await redisConnection.get("extract:" + id); + const x = await redisEvictConnection.get("extract:" + id); return x ? JSON.parse(x) : null; } @@ -111,13 +111,13 @@ export async function updateExtract( console.log(minimalExtract.sessionIds) - await redisConnection.set("extract:" + id, JSON.stringify(minimalExtract)); - await redisConnection.expire("extract:" + id, EXTRACT_TTL); + await redisEvictConnection.set("extract:" + id, JSON.stringify(minimalExtract)); + await redisEvictConnection.expire("extract:" + id, EXTRACT_TTL); } export async function getExtractExpiry(id: string): Promise { const d = new Date(); - const ttl = await redisConnection.pttl("extract:" + id); + const ttl = await redisEvictConnection.pttl("extract:" + id); d.setMilliseconds(d.getMilliseconds() + ttl); d.setMilliseconds(0); return d; diff --git a/apps/api/src/lib/generate-llmstxt/generate-llmstxt-redis.ts b/apps/api/src/lib/generate-llmstxt/generate-llmstxt-redis.ts index c5bf6479..f44148cf 100644 --- a/apps/api/src/lib/generate-llmstxt/generate-llmstxt-redis.ts +++ b/apps/api/src/lib/generate-llmstxt/generate-llmstxt-redis.ts @@ -1,4 +1,4 @@ -import { redisConnection } from "../../services/queue-service"; +import { redisEvictConnection } from "../../services/redis"; import { logger as _logger } from "../logger"; export interface GenerationData { @@ -20,12 +20,12 @@ const GENERATION_TTL = 24 * 60 * 60; export async function saveGeneratedLlmsTxt(id: string, data: GenerationData): Promise { _logger.debug("Saving llmstxt generation " + id + " to Redis..."); - await redisConnection.set("generation:" + id, JSON.stringify(data)); - await redisConnection.expire("generation:" + id, GENERATION_TTL); + await redisEvictConnection.set("generation:" + id, JSON.stringify(data)); + await redisEvictConnection.expire("generation:" + id, GENERATION_TTL); } export async function getGeneratedLlmsTxt(id: string): Promise { - const x = await redisConnection.get("generation:" + id); + const x = await redisEvictConnection.get("generation:" + id); return x ? JSON.parse(x) : null; } @@ -41,13 +41,13 @@ export async function updateGeneratedLlmsTxt( ...data }; - await redisConnection.set("generation:" + id, JSON.stringify(updatedGeneration)); - await redisConnection.expire("generation:" + id, GENERATION_TTL); + await redisEvictConnection.set("generation:" + id, JSON.stringify(updatedGeneration)); + await redisEvictConnection.expire("generation:" + id, GENERATION_TTL); } export async function getGeneratedLlmsTxtExpiry(id: string): Promise { const d = new Date(); - const ttl = await redisConnection.pttl("generation:" + id); + const ttl = await redisEvictConnection.pttl("generation:" + id); d.setMilliseconds(d.getMilliseconds() + ttl); d.setMilliseconds(0); return d; diff --git a/apps/api/src/lib/job-priority.ts b/apps/api/src/lib/job-priority.ts index 02356c21..3e225db5 100644 --- a/apps/api/src/lib/job-priority.ts +++ b/apps/api/src/lib/job-priority.ts @@ -1,6 +1,6 @@ import { RateLimiterMode } from "../types"; import { getACUC, getACUCTeam } from "../controllers/auth"; -import { redisConnection } from "../services/queue-service"; +import { redisEvictConnection } from "../services/redis"; import { logger } from "./logger"; const SET_KEY_PREFIX = "limit_team_id:"; @@ -9,10 +9,10 @@ export async function addJobPriority(team_id, job_id) { const setKey = SET_KEY_PREFIX + team_id; // Add scrape job id to the set - await redisConnection.sadd(setKey, job_id); + await redisEvictConnection.sadd(setKey, job_id); // This approach will reset the expiration time to 60 seconds every time a new job is added to the set. - await redisConnection.expire(setKey, 60); + await redisEvictConnection.expire(setKey, 60); } catch (e) { logger.error(`Add job priority (sadd) failed: ${team_id}, ${job_id}`); } @@ -23,7 +23,7 @@ export async function deleteJobPriority(team_id, job_id) { const setKey = SET_KEY_PREFIX + team_id; // remove job_id from the set - await redisConnection.srem(setKey, job_id); + await redisEvictConnection.srem(setKey, job_id); } catch (e) { logger.error(`Delete job priority (srem) failed: ${team_id}, ${job_id}`); } @@ -48,7 +48,7 @@ export async function getJobPriority({ const setKey = SET_KEY_PREFIX + team_id; // Get the length of the set - const setLength = await redisConnection.scard(setKey); + const setLength = await redisEvictConnection.scard(setKey); // Determine the priority based on the plan and set length let planModifier = acuc?.plan_priority.planModifier ?? 1; diff --git a/apps/api/src/scraper/WebScraper/crawler.ts b/apps/api/src/scraper/WebScraper/crawler.ts index 56c46faf..899e5157 100644 --- a/apps/api/src/scraper/WebScraper/crawler.ts +++ b/apps/api/src/scraper/WebScraper/crawler.ts @@ -7,7 +7,7 @@ import { getURLDepth } from "./utils/maxDepthUtils"; import { axiosTimeout } from "../../lib/timeout"; import { logger as _logger } from "../../lib/logger"; import https from "https"; -import { redisConnection } from "../../services/queue-service"; +import { redisEvictConnection } from "../../services/redis"; import { extractLinks } from "../../lib/html-transformer"; import { TimeoutSignal } from "../../controllers/v1/types"; export class WebCrawler { @@ -287,7 +287,7 @@ export class WebCrawler { let uniqueURLs: string[] = []; for (const url of filteredLinks) { if ( - await redisConnection.sadd( + await redisEvictConnection.sadd( "sitemap:" + this.jobId + ":links", normalizeUrl(url), ) @@ -296,7 +296,7 @@ export class WebCrawler { } } - await redisConnection.expire( + await redisEvictConnection.expire( "sitemap:" + this.jobId + ":links", 3600, "NX", @@ -324,7 +324,7 @@ export class WebCrawler { if (count > 0) { if ( - await redisConnection.sadd( + await redisEvictConnection.sadd( "sitemap:" + this.jobId + ":links", normalizeUrl(this.initialUrl), ) @@ -334,7 +334,7 @@ export class WebCrawler { count++; } - await redisConnection.expire( + await redisEvictConnection.expire( "sitemap:" + this.jobId + ":links", 3600, "NX", @@ -390,11 +390,11 @@ export class WebCrawler { !this.isRobotsAllowed(fullUrl, this.ignoreRobotsTxt) ) { (async () => { - await redisConnection.sadd( + await redisEvictConnection.sadd( "crawl:" + this.jobId + ":robots_blocked", fullUrl, ); - await redisConnection.expire( + await redisEvictConnection.expire( "crawl:" + this.jobId + ":robots_blocked", 24 * 60 * 60, ); diff --git a/apps/api/src/services/notification/email_notification.ts b/apps/api/src/services/notification/email_notification.ts index cf238b06..e85109f0 100644 --- a/apps/api/src/services/notification/email_notification.ts +++ b/apps/api/src/services/notification/email_notification.ts @@ -7,7 +7,7 @@ import { sendSlackWebhook } from "../alerts/slack"; import { getNotificationString } from "./notification_string"; import { AuthCreditUsageChunk } from "../../controllers/v1/types"; import { redlock } from "../redlock"; -import { redisConnection } from "../queue-service"; +import { redisEvictConnection } from "../redis"; const emailTemplates: Record< NotificationType, @@ -268,14 +268,14 @@ export async function sendNotificationWithCustomDays( ) => { const redisKey = "notification_sent:" + notificationType + ":" + team_id; - const didSendRecentNotification = (await redisConnection.get(redisKey)) !== null; + const didSendRecentNotification = (await redisEvictConnection.get(redisKey)) !== null; if (didSendRecentNotification && !bypassRecentChecks) { logger.debug(`Notification already sent within the last ${daysBetweenEmails} days for team_id: ${team_id} and notificationType: ${notificationType}`); return { success: true }; } - await redisConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60); + await redisEvictConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60); const now = new Date(); const pastDate = new Date(now.getTime() - daysBetweenEmails * 24 * 60 * 60 * 1000); @@ -289,13 +289,13 @@ export async function sendNotificationWithCustomDays( if (recentNotificationsError) { logger.debug(`Error fetching recent notifications: ${recentNotificationsError}`); - await redisConnection.del(redisKey); // free up redis, let it try again + await redisEvictConnection.del(redisKey); // free up redis, let it try again return { success: false }; } if (recentNotifications.length > 0 && !bypassRecentChecks) { logger.debug(`Notification already sent within the last ${daysBetweenEmails} days for team_id: ${team_id} and notificationType: ${notificationType}`); - await redisConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60); + await redisEvictConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60); return { success: true }; } @@ -310,7 +310,7 @@ export async function sendNotificationWithCustomDays( if (emailsError) { logger.debug(`Error fetching emails: ${emailsError}`); - await redisConnection.del(redisKey); // free up redis, let it try again + await redisEvictConnection.del(redisKey); // free up redis, let it try again return { success: false }; } @@ -341,7 +341,7 @@ export async function sendNotificationWithCustomDays( if (insertError) { logger.debug(`Error inserting notification record: ${insertError}`); - await redisConnection.del(redisKey); // free up redis, let it try again + await redisEvictConnection.del(redisKey); // free up redis, let it try again return { success: false }; } diff --git a/apps/api/src/services/queue-service.ts b/apps/api/src/services/queue-service.ts index e84c006c..b2353296 100644 --- a/apps/api/src/services/queue-service.ts +++ b/apps/api/src/services/queue-service.ts @@ -131,7 +131,3 @@ export function getBillingQueue() { } return billingQueue; } - -// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE -// import { QueueEvents } from 'bullmq'; -// export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() }); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 2738b8ee..601e319e 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -6,11 +6,8 @@ import { getScrapeQueue, getExtractQueue, getDeepResearchQueue, - redisConnection, - scrapeQueueName, - extractQueueName, - deepResearchQueueName, getIndexQueue, + redisConnection, getGenerateLlmsTxtQueue, getBillingQueue, } from "./queue-service"; @@ -88,6 +85,7 @@ import https from "https"; import { cacheableLookup } from "../scraper/scrapeURL/lib/cacheableLookup"; import { robustFetch } from "../scraper/scrapeURL/lib/fetch"; import { RateLimiterMode } from "../types"; +import { redisEvictConnection } from "./redis"; configDotenv(); @@ -132,11 +130,11 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) { logger.info("Crawl is pre-finished, checking if we need to add more jobs"); if ( job.data.crawlerOptions && - !(await redisConnection.exists( + !(await redisEvictConnection.exists( "crawl:" + job.data.crawl_id + ":invisible_urls", )) ) { - await redisConnection.set( + await redisEvictConnection.set( "crawl:" + job.data.crawl_id + ":invisible_urls", "done", "EX", @@ -146,7 +144,7 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) { const sc = (await getCrawl(job.data.crawl_id))!; const visitedUrls = new Set( - await redisConnection.smembers( + await redisEvictConnection.smembers( "crawl:" + job.data.crawl_id + ":visited_unique", ), ); @@ -261,7 +259,7 @@ async function finishCrawlIfNeeded(job: Job & { id: string }, sc: StoredCrawl) { ? normalizeUrlOnlyHostname(sc.originUrl) : undefined; // Get all visited unique URLs from Redis - const visitedUrls = await redisConnection.smembers( + const visitedUrls = await redisEvictConnection.smembers( "crawl:" + job.data.crawl_id + ":visited_unique", ); // Upload to Supabase if we have URLs and this is a crawl (not a batch scrape) @@ -1230,7 +1228,7 @@ async function processJob(job: Job & { id: string }, token: string) { // Prevent redirect target from being visited in the crawl again // See lockURL - const x = await redisConnection.sadd( + const x = await redisEvictConnection.sadd( "crawl:" + job.data.crawl_id + ":visited", ...p1.map((x) => x.href), ); @@ -1452,7 +1450,7 @@ async function processJob(job: Job & { id: string }, token: string) { logger.debug("Declaring job as done..."); await addCrawlJobDone(job.data.crawl_id, job.id, false); - await redisConnection.srem( + await redisEvictConnection.srem( "crawl:" + job.data.crawl_id + ":visited_unique", normalizeURL(job.data.url, sc), ); diff --git a/apps/api/src/services/redis.ts b/apps/api/src/services/redis.ts index d2c7dd3a..4076aab8 100644 --- a/apps/api/src/services/redis.ts +++ b/apps/api/src/services/redis.ts @@ -1,4 +1,4 @@ -import Redis from "ioredis"; +import IORedis from "ioredis"; import { redisRateLimitClient } from "./rate-limiter"; import { logger } from "../lib/logger"; @@ -70,3 +70,6 @@ const deleteKey = async (key: string) => { }; export { setValue, getValue, deleteKey }; + +const redisEvictURL = process.env.REDIS_EVICT_URL ?? process.env.REDIS_RATE_LIMIT_URL; +export const redisEvictConnection = new IORedis(redisEvictURL!); From 3557c90210191f4c691d3a562b35f11fac87f387 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Wed, 28 May 2025 19:31:48 +0200 Subject: [PATCH 09/12] feat(js-sdk): auto mode proxy (FIR-2145) (#1602) * feat(js-sdk): auto mode proxy * Nick: py sdk --------- Co-authored-by: Nicolas --- apps/js-sdk/firecrawl/package.json | 2 +- apps/js-sdk/firecrawl/src/index.ts | 2 +- apps/python-sdk/firecrawl/__init__.py | 2 +- apps/python-sdk/firecrawl/firecrawl.py | 20 ++++++++++---------- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/apps/js-sdk/firecrawl/package.json b/apps/js-sdk/firecrawl/package.json index 0baa2d5b..54851cd4 100644 --- a/apps/js-sdk/firecrawl/package.json +++ b/apps/js-sdk/firecrawl/package.json @@ -1,6 +1,6 @@ { "name": "@mendable/firecrawl-js", - "version": "1.25.1", + "version": "1.25.2", "description": "JavaScript SDK for Firecrawl API", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/apps/js-sdk/firecrawl/src/index.ts b/apps/js-sdk/firecrawl/src/index.ts index ca77d200..265bd6c5 100644 --- a/apps/js-sdk/firecrawl/src/index.ts +++ b/apps/js-sdk/firecrawl/src/index.ts @@ -119,7 +119,7 @@ export interface CrawlScrapeOptions { skipTlsVerification?: boolean; removeBase64Images?: boolean; blockAds?: boolean; - proxy?: "basic" | "stealth"; + proxy?: "basic" | "stealth" | "auto"; } export type Action = { diff --git a/apps/python-sdk/firecrawl/__init__.py b/apps/python-sdk/firecrawl/__init__.py index db682a25..de45132b 100644 --- a/apps/python-sdk/firecrawl/__init__.py +++ b/apps/python-sdk/firecrawl/__init__.py @@ -13,7 +13,7 @@ import os from .firecrawl import FirecrawlApp, AsyncFirecrawlApp, JsonConfig, ScrapeOptions, ChangeTrackingOptions # noqa -__version__ = "2.7.0" +__version__ = "2.7.1" # Define the logger for the Firecrawl project logger: logging.Logger = logging.getLogger("firecrawl") diff --git a/apps/python-sdk/firecrawl/firecrawl.py b/apps/python-sdk/firecrawl/firecrawl.py index 16a79314..7c3f5552 100644 --- a/apps/python-sdk/firecrawl/firecrawl.py +++ b/apps/python-sdk/firecrawl/firecrawl.py @@ -155,7 +155,7 @@ class ScrapeOptions(pydantic.BaseModel): skipTlsVerification: Optional[bool] = None removeBase64Images: Optional[bool] = None blockAds: Optional[bool] = None - proxy: Optional[Literal["basic", "stealth"]] = None + proxy: Optional[Literal["basic", "stealth", "auto"]] = None changeTrackingOptions: Optional[ChangeTrackingOptions] = None class WaitAction(pydantic.BaseModel): @@ -459,7 +459,7 @@ class FirecrawlApp: skip_tls_verification: Optional[bool] = None, remove_base64_images: Optional[bool] = None, block_ads: Optional[bool] = None, - proxy: Optional[Literal["basic", "stealth"]] = None, + proxy: Optional[Literal["basic", "stealth", "auto"]] = None, extract: Optional[JsonConfig] = None, json_options: Optional[JsonConfig] = None, actions: Optional[List[Union[WaitAction, ScreenshotAction, ClickAction, WriteAction, PressAction, ScrollAction, ScrapeAction, ExecuteJavascriptAction]]] = None, @@ -481,7 +481,7 @@ class FirecrawlApp: skip_tls_verification (Optional[bool]): Skip TLS verification remove_base64_images (Optional[bool]): Remove base64 images block_ads (Optional[bool]): Block ads - proxy (Optional[Literal["basic", "stealth"]]): Proxy type (basic/stealth) + proxy (Optional[Literal["basic", "stealth", "auto"]]): Proxy type (basic/stealth) extract (Optional[JsonConfig]): Content extraction settings json_options (Optional[JsonConfig]): JSON extraction settings actions (Optional[List[Union[WaitAction, ScreenshotAction, ClickAction, WriteAction, PressAction, ScrollAction, ScrapeAction, ExecuteJavascriptAction]]]): Actions to perform @@ -1191,7 +1191,7 @@ class FirecrawlApp: skip_tls_verification: Optional[bool] = None, remove_base64_images: Optional[bool] = None, block_ads: Optional[bool] = None, - proxy: Optional[Literal["basic", "stealth"]] = None, + proxy: Optional[Literal["basic", "stealth", "auto"]] = None, extract: Optional[JsonConfig] = None, json_options: Optional[JsonConfig] = None, actions: Optional[List[Union[WaitAction, ScreenshotAction, ClickAction, WriteAction, PressAction, ScrollAction, ScrapeAction, ExecuteJavascriptAction]]] = None, @@ -1325,7 +1325,7 @@ class FirecrawlApp: skip_tls_verification: Optional[bool] = None, remove_base64_images: Optional[bool] = None, block_ads: Optional[bool] = None, - proxy: Optional[Literal["basic", "stealth"]] = None, + proxy: Optional[Literal["basic", "stealth", "auto"]] = None, extract: Optional[JsonConfig] = None, json_options: Optional[JsonConfig] = None, actions: Optional[List[Union[WaitAction, ScreenshotAction, ClickAction, WriteAction, PressAction, ScrollAction, ScrapeAction, ExecuteJavascriptAction]]] = None, @@ -1457,7 +1457,7 @@ class FirecrawlApp: skip_tls_verification: Optional[bool] = None, remove_base64_images: Optional[bool] = None, block_ads: Optional[bool] = None, - proxy: Optional[Literal["basic", "stealth"]] = None, + proxy: Optional[Literal["basic", "stealth", "auto"]] = None, extract: Optional[JsonConfig] = None, json_options: Optional[JsonConfig] = None, actions: Optional[List[Union[WaitAction, ScreenshotAction, ClickAction, WriteAction, PressAction, ScrollAction, ScrapeAction, ExecuteJavascriptAction]]] = None, @@ -2852,7 +2852,7 @@ class AsyncFirecrawlApp(FirecrawlApp): skip_tls_verification: Optional[bool] = None, remove_base64_images: Optional[bool] = None, block_ads: Optional[bool] = None, - proxy: Optional[Literal["basic", "stealth"]] = None, + proxy: Optional[Literal["basic", "stealth", "auto"]] = None, extract: Optional[JsonConfig] = None, json_options: Optional[JsonConfig] = None, actions: Optional[List[Union[WaitAction, ScreenshotAction, ClickAction, WriteAction, PressAction, ScrollAction, ScrapeAction, ExecuteJavascriptAction]]] = None, @@ -2873,7 +2873,7 @@ class AsyncFirecrawlApp(FirecrawlApp): skip_tls_verification (Optional[bool]): Skip TLS verification remove_base64_images (Optional[bool]): Remove base64 images block_ads (Optional[bool]): Block ads - proxy (Optional[Literal["basic", "stealth"]]): Proxy type (basic/stealth) + proxy (Optional[Literal["basic", "stealth", "auto"]]): Proxy type (basic/stealth) extract (Optional[JsonConfig]): Content extraction settings json_options (Optional[JsonConfig]): JSON extraction settings actions (Optional[List[Union[WaitAction, ScreenshotAction, ClickAction, WriteAction, PressAction, ScrollAction, ScrapeAction, ExecuteJavascriptAction]]]): Actions to perform @@ -2981,7 +2981,7 @@ class AsyncFirecrawlApp(FirecrawlApp): skip_tls_verification: Optional[bool] = None, remove_base64_images: Optional[bool] = None, block_ads: Optional[bool] = None, - proxy: Optional[Literal["basic", "stealth"]] = None, + proxy: Optional[Literal["basic", "stealth", "auto"]] = None, extract: Optional[JsonConfig] = None, json_options: Optional[JsonConfig] = None, actions: Optional[List[Union[WaitAction, ScreenshotAction, ClickAction, WriteAction, PressAction, ScrollAction, ScrapeAction, ExecuteJavascriptAction]]] = None, @@ -3120,7 +3120,7 @@ class AsyncFirecrawlApp(FirecrawlApp): skip_tls_verification: Optional[bool] = None, remove_base64_images: Optional[bool] = None, block_ads: Optional[bool] = None, - proxy: Optional[Literal["basic", "stealth"]] = None, + proxy: Optional[Literal["basic", "stealth", "auto"]] = None, extract: Optional[JsonConfig] = None, json_options: Optional[JsonConfig] = None, actions: Optional[List[Union[WaitAction, ScreenshotAction, ClickAction, WriteAction, PressAction, ScrollAction, ScrapeAction, ExecuteJavascriptAction]]] = None, From 706d378a8994de7f658c121b2458737f82134b54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Thu, 29 May 2025 13:02:54 +0200 Subject: [PATCH 10/12] feat(api/v1/scrape-status): log supa lookup errors --- apps/api/src/controllers/v1/scrape-status.ts | 11 ++++++++++- apps/api/src/lib/supabase-jobs.ts | 6 +++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/apps/api/src/controllers/v1/scrape-status.ts b/apps/api/src/controllers/v1/scrape-status.ts index e89f724e..4f076a34 100644 --- a/apps/api/src/controllers/v1/scrape-status.ts +++ b/apps/api/src/controllers/v1/scrape-status.ts @@ -1,9 +1,18 @@ import { Response } from "express"; import { supabaseGetJobByIdOnlyData } from "../../lib/supabase-jobs"; import { getJob } from "./crawl-status"; +import { logger as _logger } from "../../lib/logger"; export async function scrapeStatusController(req: any, res: any) { - const job = await supabaseGetJobByIdOnlyData(req.params.jobId); + const logger = _logger.child({ + module: "scrape-status", + method: "scrapeStatusController", + teamId: req.auth.team_id, + jobId: req.params.jobId, + scrapeId: req.params.jobId, + }); + + const job = await supabaseGetJobByIdOnlyData(req.params.jobId, logger); if (!job) { return res.status(404).json({ diff --git a/apps/api/src/lib/supabase-jobs.ts b/apps/api/src/lib/supabase-jobs.ts index 874c5293..7e826849 100644 --- a/apps/api/src/lib/supabase-jobs.ts +++ b/apps/api/src/lib/supabase-jobs.ts @@ -1,3 +1,4 @@ +import type { Logger } from "winston"; import { supabase_rr_service, supabase_service } from "../services/supabase"; import { logger } from "./logger"; import * as Sentry from "@sentry/node"; @@ -73,7 +74,7 @@ export const supabaseGetJobsByCrawlId = async (crawlId: string) => { return data; }; -export const supabaseGetJobByIdOnlyData = async (jobId: string) => { +export const supabaseGetJobByIdOnlyData = async (jobId: string, logger?: Logger) => { const { data, error } = await supabase_rr_service .from("firecrawl_jobs") .select("team_id") @@ -81,6 +82,9 @@ export const supabaseGetJobByIdOnlyData = async (jobId: string) => { .single(); if (error) { + if (logger) { + logger.error("Error in supabaseGetJobByIdOnlyData", { error }); + } return null; } From 7e73b015996f752b5b93e50498c1f5d6eea81290 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Thu, 29 May 2025 14:40:47 +0200 Subject: [PATCH 11/12] fix(queue-worker): call webhook after job is in DB --- apps/api/src/services/queue-worker.ts | 30 +++++++++++++-------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 601e319e..44503732 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -1161,21 +1161,6 @@ async function processJob(job: Job & { id: string }, token: string) { document: doc, }; - if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) { - logger.debug("Calling webhook with success...", { - webhook: job.data.webhook, - }); - await callWebhook( - job.data.team_id, - job.data.crawl_id, - data, - job.data.webhook, - job.data.v1, - job.data.crawlerOptions !== null ? "crawl.page" : "batch_scrape.page", - true, - ); - } - if (job.data.crawl_id) { const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl; @@ -1261,6 +1246,21 @@ async function processJob(job: Job & { id: string }, token: string) { true, ); + if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) { + logger.debug("Calling webhook with success...", { + webhook: job.data.webhook, + }); + await callWebhook( + job.data.team_id, + job.data.crawl_id, + data, + job.data.webhook, + job.data.v1, + job.data.crawlerOptions !== null ? "crawl.page" : "batch_scrape.page", + true, + ); + } + indexJob(job, doc); logger.debug("Declaring job as done..."); From 38c96b524f120a068dfae3a35c3a13fcb2d5e889 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Thu, 29 May 2025 15:26:07 +0200 Subject: [PATCH 12/12] feat(scrapeURL): handle contentType JSON better in markdown conversion (#1604) --- apps/api/src/__tests__/snips/scrape.test.ts | 11 ++++++++++- apps/api/src/controllers/v1/types.ts | 1 + .../api/src/scraper/scrapeURL/engines/fetch/index.ts | 5 ++++- .../scraper/scrapeURL/engines/fire-engine/index.ts | 12 ++++++++++++ apps/api/src/scraper/scrapeURL/engines/index.ts | 2 ++ apps/api/src/scraper/scrapeURL/index.ts | 1 + apps/api/src/scraper/scrapeURL/transformers/index.ts | 11 +++++++++++ 7 files changed, 41 insertions(+), 2 deletions(-) diff --git a/apps/api/src/__tests__/snips/scrape.test.ts b/apps/api/src/__tests__/snips/scrape.test.ts index 9c2a9e2d..a27057f7 100644 --- a/apps/api/src/__tests__/snips/scrape.test.ts +++ b/apps/api/src/__tests__/snips/scrape.test.ts @@ -373,5 +373,14 @@ describe("Scrape tests", () => { }); expect(response.metadata.sourceURL).toBe("https://firecrawl.dev/?pagewanted=all&et_blog"); - }, 35000); + }, 30000); + + it.concurrent("application/json content type is markdownified properly", async () => { + const response = await scrape({ + url: "https://jsonplaceholder.typicode.com/todos/1", + formats: ["markdown"], + }); + + expect(response.markdown).toContain("```json"); + }, 30000); }); diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index 908a51d3..5ee1328d 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -750,6 +750,7 @@ export type Document = { scrapeId?: string; error?: string; numPages?: number; + contentType?: string; proxyUsed: "basic" | "stealth"; // [key: string]: string | string[] | number | { smartScrape: number; other: number; total: number } | undefined; }; diff --git a/apps/api/src/scraper/scrapeURL/engines/fetch/index.ts b/apps/api/src/scraper/scrapeURL/engines/fetch/index.ts index 40c34399..299b0d35 100644 --- a/apps/api/src/scraper/scrapeURL/engines/fetch/index.ts +++ b/apps/api/src/scraper/scrapeURL/engines/fetch/index.ts @@ -30,7 +30,7 @@ export async function scrapeURLWithFetch( url: string; body: string, status: number; - headers: any; + headers: [string, string][]; }; if (meta.mock !== null) { @@ -117,5 +117,8 @@ export async function scrapeURLWithFetch( url: response.url, html: response.body, statusCode: response.status, + contentType: (response.headers.find( + (x) => x[0].toLowerCase() === "content-type", + ) ?? [])[1] ?? undefined, }; } diff --git a/apps/api/src/scraper/scrapeURL/engines/fire-engine/index.ts b/apps/api/src/scraper/scrapeURL/engines/fire-engine/index.ts index 6c1648bf..8bbacb4c 100644 --- a/apps/api/src/scraper/scrapeURL/engines/fire-engine/index.ts +++ b/apps/api/src/scraper/scrapeURL/engines/fire-engine/index.ts @@ -273,6 +273,10 @@ export async function scrapeURLWithFireEngineChromeCDP( error: response.pageError, statusCode: response.pageStatusCode, + contentType: (Object.entries(response.responseHeaders ?? {}).find( + (x) => x[0].toLowerCase() === "content-type", + ) ?? [])[1] ?? undefined, + screenshot: response.screenshot, ...(actions.length > 0 ? { @@ -336,6 +340,10 @@ export async function scrapeURLWithFireEnginePlaywright( error: response.pageError, statusCode: response.pageStatusCode, + contentType: (Object.entries(response.responseHeaders ?? {}).find( + (x) => x[0].toLowerCase() === "content-type", + ) ?? [])[1] ?? undefined, + ...(response.screenshots !== undefined && response.screenshots.length > 0 ? { screenshot: response.screenshots[0], @@ -391,5 +399,9 @@ export async function scrapeURLWithFireEngineTLSClient( html: response.content, error: response.pageError, statusCode: response.pageStatusCode, + + contentType: (Object.entries(response.responseHeaders ?? {}).find( + (x) => x[0].toLowerCase() === "content-type", + ) ?? [])[1] ?? undefined, }; } diff --git a/apps/api/src/scraper/scrapeURL/engines/index.ts b/apps/api/src/scraper/scrapeURL/engines/index.ts index 06c5e072..fe0ae8c7 100644 --- a/apps/api/src/scraper/scrapeURL/engines/index.ts +++ b/apps/api/src/scraper/scrapeURL/engines/index.ts @@ -111,6 +111,8 @@ export type EngineScrapeResult = { }; numPages?: number; + + contentType?: string; }; const engineHandlers: { diff --git a/apps/api/src/scraper/scrapeURL/index.ts b/apps/api/src/scraper/scrapeURL/index.ts index fbc0c53b..90873ca6 100644 --- a/apps/api/src/scraper/scrapeURL/index.ts +++ b/apps/api/src/scraper/scrapeURL/index.ts @@ -379,6 +379,7 @@ async function scrapeURLLoop(meta: Meta): Promise { statusCode: result.result.statusCode, error: result.result.error, numPages: result.result.numPages, + contentType: result.result.contentType, proxyUsed: meta.featureFlags.has("stealthProxy") ? "stealth" : "basic", }, }; diff --git a/apps/api/src/scraper/scrapeURL/transformers/index.ts b/apps/api/src/scraper/scrapeURL/transformers/index.ts index d9fc4b8c..5186d6b4 100644 --- a/apps/api/src/scraper/scrapeURL/transformers/index.ts +++ b/apps/api/src/scraper/scrapeURL/transformers/index.ts @@ -61,6 +61,17 @@ export async function deriveMarkdownFromHTML( ); } + if (document.metadata.contentType?.includes("application/json")) { + if (document.rawHtml === undefined) { + throw new Error( + "rawHtml is undefined -- this transformer is being called out of order", + ); + } + + document.markdown = "```json\n" + document.rawHtml + "\n```"; + return document; + } + document.markdown = await parseMarkdown(document.html); return document; }