diff --git a/.github/workflows/test-server-self-host.yml b/.github/workflows/test-server-self-host.yml index 7ab4d5c6..39a260ab 100644 --- a/.github/workflows/test-server-self-host.yml +++ b/.github/workflows/test-server-self-host.yml @@ -127,7 +127,7 @@ jobs: - name: Kill SearXNG if: always() && matrix.search == 'searxng' run: | - docker logs searxng > searxng/searxng.log 2>&1 + docker logs searxng > searxng.log 2>&1 docker kill searxng working-directory: ./ - uses: actions/upload-artifact@v4 @@ -148,5 +148,4 @@ jobs: with: name: SearXNG (${{ matrix.ai }}, ${{ matrix.engine }}, ${{ matrix.proxy }}) path: | - ./searxng/searxng.log - ./searxng/settings.yml + ./searxng.log diff --git a/apps/api/src/controllers/v0/scrape.ts b/apps/api/src/controllers/v0/scrape.ts index f7a9fbdd..559d222c 100644 --- a/apps/api/src/controllers/v0/scrape.ts +++ b/apps/api/src/controllers/v0/scrape.ts @@ -82,6 +82,7 @@ export async function scrapeHelper( internalOptions, origin: req.body.origin ?? defaultOrigin, is_scrape: true, + startTime: Date.now(), }, {}, jobId, diff --git a/apps/api/src/controllers/v0/search.ts b/apps/api/src/controllers/v0/search.ts index 15172e37..e4be9bd7 100644 --- a/apps/api/src/controllers/v0/search.ts +++ b/apps/api/src/controllers/v0/search.ts @@ -112,6 +112,7 @@ export async function searchHelper( team_id: team_id, scrapeOptions, internalOptions, + startTime: Date.now(), }, opts: { jobId: uuid, diff --git a/apps/api/src/controllers/v1/scrape.ts b/apps/api/src/controllers/v1/scrape.ts index a75a58d1..4b681438 100644 --- a/apps/api/src/controllers/v1/scrape.ts +++ b/apps/api/src/controllers/v1/scrape.ts @@ -31,7 +31,6 @@ export async function scrapeController( }); req.body = scrapeRequestSchema.parse(req.body); - let earlyReturn = false; const origin = req.body.origin; const timeout = req.body.timeout; @@ -55,7 +54,7 @@ export async function scrapeController( unnormalizedSourceURL: preNormalizedBody.url, }, origin: req.body.origin, - is_scrape: true, + startTime, }, {}, jobId, @@ -125,30 +124,6 @@ export async function scrapeController( await getScrapeQueue().remove(jobId); - const endTime = new Date().getTime(); - const timeTakenInSeconds = (endTime - startTime) / 1000; - const numTokens = - doc && doc.extract - ? // ? numTokensFromString(doc.markdown, "gpt-3.5-turbo") - 0 // TODO: fix - : 0; - - if (earlyReturn) { - // Don't bill if we're early returning - return; - } - - let creditsToBeBilled = await calculateCreditsToBeBilled(req.body, doc, jobId); - - billTeam(req.auth.team_id, req.acuc?.sub_id, creditsToBeBilled).catch( - (error) => { - logger.error( - `Failed to bill team ${req.auth.team_id} for ${creditsToBeBilled} credits: ${error}`, - ); - // Optionally, you could notify an admin or add to a retry queue here - }, - ); - if (!req.body.formats.includes("rawHtml")) { if (doc && doc.rawHtml) { delete doc.rawHtml; diff --git a/apps/api/src/controllers/v1/search.ts b/apps/api/src/controllers/v1/search.ts index fe104dc1..68e426da 100644 --- a/apps/api/src/controllers/v1/search.ts +++ b/apps/api/src/controllers/v1/search.ts @@ -103,6 +103,7 @@ async function scrapeSearchResult( internalOptions: { teamId: options.teamId, useCache: true }, origin: options.origin, is_scrape: true, + startTime: Date.now(), }, {}, jobId, diff --git a/apps/api/src/lib/extract/document-scraper.ts b/apps/api/src/lib/extract/document-scraper.ts index 190252c8..799062b6 100644 --- a/apps/api/src/lib/extract/document-scraper.ts +++ b/apps/api/src/lib/extract/document-scraper.ts @@ -49,6 +49,7 @@ export async function scrapeDocument( origin: options.origin, is_scrape: true, from_extract: true, + startTime: Date.now(), }, {}, jobId, diff --git a/apps/api/src/lib/extract/fire-0/document-scraper-f0.ts b/apps/api/src/lib/extract/fire-0/document-scraper-f0.ts index b5f8f0bb..fa2f289e 100644 --- a/apps/api/src/lib/extract/fire-0/document-scraper-f0.ts +++ b/apps/api/src/lib/extract/fire-0/document-scraper-f0.ts @@ -47,6 +47,7 @@ export async function scrapeDocument_F0( origin: options.origin, is_scrape: true, from_extract: true, + startTime: Date.now(), }, {}, jobId, diff --git a/apps/api/src/services/logging/log_job.ts b/apps/api/src/services/logging/log_job.ts index 162ddb43..efa6b158 100644 --- a/apps/api/src/services/logging/log_job.ts +++ b/apps/api/src/services/logging/log_job.ts @@ -63,6 +63,7 @@ export async function logJob(job: FirecrawlJob, force: boolean = false) { is_migrated: true, cost_tracking: job.cost_tracking, pdf_num_pages: job.pdf_num_pages ?? null, + credits_billed: job.credits_billed ?? null, }; if (process.env.GCS_BUCKET_NAME) { diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index c007e18b..afc93b6a 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -86,6 +86,7 @@ import { robustFetch } from "../scraper/scrapeURL/lib/fetch"; import { RateLimiterMode } from "../types"; import { calculateCreditsToBeBilled } from "../lib/scrape-billing"; import { redisEvictConnection } from "./redis"; +import type { Logger } from "winston"; configDotenv(); @@ -1044,6 +1045,59 @@ async function processKickoffJob(job: Job & { id: string }, token: string) { } } +async function billScrapeJob(job: Job & { id: string }, document: Document, logger: Logger, costTracking?: CostTracking) { + let creditsToBeBilled: number | null = null; + + if (job.data.is_scrape !== true) { + creditsToBeBilled = await calculateCreditsToBeBilled(job.data.scrapeOptions, document, job.id, costTracking); + + if ( + job.data.team_id !== process.env.BACKGROUND_INDEX_TEAM_ID! && + process.env.USE_DB_AUTHENTICATION === "true" + ) { + try { + const billingJobId = uuidv4(); + logger.debug( + `Adding billing job to queue for team ${job.data.team_id}`, + { + billingJobId, + credits: creditsToBeBilled, + is_extract: false, + }, + ); + + // Add directly to the billing queue - the billing worker will handle the rest + await getBillingQueue().add( + "bill_team", + { + team_id: job.data.team_id, + subscription_id: undefined, + credits: creditsToBeBilled, + is_extract: false, + timestamp: new Date().toISOString(), + originating_job_id: job.id, + }, + { + jobId: billingJobId, + priority: 10, + }, + ); + + return creditsToBeBilled; + } catch (error) { + logger.error( + `Failed to add billing job to queue for team ${job.data.team_id} for ${creditsToBeBilled} credits`, + { error }, + ); + Sentry.captureException(error); + return creditsToBeBilled; + } + } + } + + return creditsToBeBilled; +} + async function processJob(job: Job & { id: string }, token: string) { const logger = _logger.child({ module: "queue-worker", @@ -1054,25 +1108,9 @@ async function processJob(job: Job & { id: string }, token: string) { teamId: job.data?.team_id ?? undefined, }); logger.info(`🐂 Worker taking job ${job.id}`, { url: job.data.url }); - const start = Date.now(); + const start = job.data.startTime ?? Date.now(); + const remainingTime = job.data.scrapeOptions.timeout ? (job.data.scrapeOptions.timeout - (Date.now() - start)) : undefined; - // Check if the job URL is researchhub and block it immediately - // TODO: remove this once solve the root issue - // if ( - // job.data.url && - // (job.data.url.includes("researchhub.com") || - // job.data.url.includes("ebay.com")) - // ) { - // logger.info(`🐂 Blocking job ${job.id} with URL ${job.data.url}`); - // const data = { - // success: false, - // document: null, - // project_id: job.data.project_id, - // error: - // "URL is blocked. Suspecious activity detected. Please contact help@firecrawl.com if you believe this is an error.", - // }; - // return data; - // } const costTracking = new CostTracking(); try { @@ -1083,6 +1121,11 @@ async function processJob(job: Job & { id: string }, token: string) { current_url: "", }); + if (remainingTime !== undefined && remainingTime < 0) { + throw new Error("timeout"); + } + const signal = remainingTime ? AbortSignal.timeout(remainingTime) : undefined; + if (job.data.crawl_id) { const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl; if (sc && sc.cancelled) { @@ -1096,16 +1139,22 @@ async function processJob(job: Job & { id: string }, token: string) { token, costTracking, }), - ...(job.data.scrapeOptions.timeout !== undefined + ...(remainingTime !== undefined ? [ (async () => { - await sleep(job.data.scrapeOptions.timeout); + await sleep(remainingTime); throw new Error("timeout"); })(), ] : []), ]); + try { + signal?.throwIfAborted(); + } catch (e) { + throw new Error("timeout"); + } + if (!pipeline.success) { throw pipeline.error; } @@ -1206,45 +1255,6 @@ async function processJob(job: Job & { id: string }, token: string) { } } - logger.debug("Logging job to DB..."); - await logJob( - { - job_id: job.id as string, - success: true, - num_docs: 1, - docs: [doc], - time_taken: timeTakenInSeconds, - team_id: job.data.team_id, - mode: job.data.mode, - url: job.data.url, - crawlerOptions: sc.crawlerOptions, - scrapeOptions: job.data.scrapeOptions, - origin: job.data.origin, - crawl_id: job.data.crawl_id, - cost_tracking: costTracking, - pdf_num_pages: doc.metadata.numPages, - }, - true, - ); - - if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) { - logger.debug("Calling webhook with success...", { - webhook: job.data.webhook, - }); - await callWebhook( - job.data.team_id, - job.data.crawl_id, - data, - job.data.webhook, - job.data.v1, - job.data.crawlerOptions !== null ? "crawl.page" : "batch_scrape.page", - true, - ); - } - - logger.debug("Declaring job as done..."); - await addCrawlJobDone(job.data.crawl_id, job.id, true); - if (job.data.crawlerOptions !== null) { if (!sc.cancelled) { const crawler = crawlToCrawler( @@ -1340,8 +1350,64 @@ async function processJob(job: Job & { id: string }, token: string) { } } + try { + signal?.throwIfAborted(); + } catch (e) { + throw new Error("timeout"); + } + + const credits_billed = await billScrapeJob(job, doc, logger, costTracking); + + logger.debug("Logging job to DB..."); + await logJob( + { + job_id: job.id as string, + success: true, + num_docs: 1, + docs: [doc], + time_taken: timeTakenInSeconds, + team_id: job.data.team_id, + mode: job.data.mode, + url: job.data.url, + crawlerOptions: sc.crawlerOptions, + scrapeOptions: job.data.scrapeOptions, + origin: job.data.origin, + crawl_id: job.data.crawl_id, + cost_tracking: costTracking, + pdf_num_pages: doc.metadata.numPages, + credits_billed, + }, + true, + ); + + if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) { + logger.debug("Calling webhook with success...", { + webhook: job.data.webhook, + }); + await callWebhook( + job.data.team_id, + job.data.crawl_id, + data, + job.data.webhook, + job.data.v1, + job.data.crawlerOptions !== null ? "crawl.page" : "batch_scrape.page", + true, + ); + } + + logger.debug("Declaring job as done..."); + await addCrawlJobDone(job.data.crawl_id, job.id, true); + await finishCrawlIfNeeded(job, sc); } else { + try { + signal?.throwIfAborted(); + } catch (e) { + throw new Error("timeout"); + } + + const credits_billed = await billScrapeJob(job, doc, logger, costTracking); + await logJob({ job_id: job.id, success: true, @@ -1357,53 +1423,10 @@ async function processJob(job: Job & { id: string }, token: string) { num_tokens: 0, // TODO: fix cost_tracking: costTracking, pdf_num_pages: doc.metadata.numPages, + credits_billed, }); } - if (job.data.is_scrape !== true) { - let creditsToBeBilled = await calculateCreditsToBeBilled(job.data.scrapeOptions, doc, job.id, costTracking); - - if ( - job.data.team_id !== process.env.BACKGROUND_INDEX_TEAM_ID! && - process.env.USE_DB_AUTHENTICATION === "true" - ) { - try { - const billingJobId = uuidv4(); - logger.debug( - `Adding billing job to queue for team ${job.data.team_id}`, - { - billingJobId, - credits: creditsToBeBilled, - is_extract: false, - }, - ); - - // Add directly to the billing queue - the billing worker will handle the rest - await getBillingQueue().add( - "bill_team", - { - team_id: job.data.team_id, - subscription_id: undefined, - credits: creditsToBeBilled, - is_extract: false, - timestamp: new Date().toISOString(), - originating_job_id: job.id, - }, - { - jobId: billingJobId, - priority: 10, - }, - ); - } catch (error) { - logger.error( - `Failed to add billing job to queue for team ${job.data.team_id} for ${creditsToBeBilled} credits`, - { error }, - ); - Sentry.captureException(error); - } - } - } - logger.info(`🐂 Job done ${job.id}`); return data; } catch (error) { diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index 7b2cc640..2e18ba3d 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -44,9 +44,15 @@ export interface WebScraperOptions { sitemapped?: boolean; webhook?: z.infer; v1?: boolean; + + /** + * Disables billing on the worker side. + */ is_scrape?: boolean; + isCrawlSourceScrape?: boolean; from_extract?: boolean; + startTime?: number; } export interface RunWebScraperParams { @@ -95,6 +101,7 @@ export interface FirecrawlJob { sources?: Record; cost_tracking?: CostTracking; pdf_num_pages?: number; + credits_billed?: number | null; } export interface FirecrawlScrapeResponse {