feat(api/scrape): credits_billed column + handle billing for /scrape calls on worker side with stricter timeout enforcement (FIR-2162) (#1607)

* feat(api/scrape): stricten timeout and handle billing and logging on queue-worker

* fix: abortsignal pre-check

* fix: proper level

* add comment to clarify is_scrape

* reenable billing tests

* Revert "reenable billing tests"

This reverts commit 98236fdfa03dde8cecdd6b763fcf86810e468a28.

* oof

* fix searxng logging

---------

Co-authored-by: Nicolas <nicolascamara29@gmail.com>
This commit is contained in:
Gergő Móricz 2025-06-02 22:56:27 +02:00 committed by GitHub
parent 4167ec53eb
commit 72be73473f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 142 additions and 132 deletions

View File

@ -127,7 +127,7 @@ jobs:
- name: Kill SearXNG - name: Kill SearXNG
if: always() && matrix.search == 'searxng' if: always() && matrix.search == 'searxng'
run: | run: |
docker logs searxng > searxng/searxng.log 2>&1 docker logs searxng > searxng.log 2>&1
docker kill searxng docker kill searxng
working-directory: ./ working-directory: ./
- uses: actions/upload-artifact@v4 - uses: actions/upload-artifact@v4
@ -148,5 +148,4 @@ jobs:
with: with:
name: SearXNG (${{ matrix.ai }}, ${{ matrix.engine }}, ${{ matrix.proxy }}) name: SearXNG (${{ matrix.ai }}, ${{ matrix.engine }}, ${{ matrix.proxy }})
path: | path: |
./searxng/searxng.log ./searxng.log
./searxng/settings.yml

View File

@ -82,6 +82,7 @@ export async function scrapeHelper(
internalOptions, internalOptions,
origin: req.body.origin ?? defaultOrigin, origin: req.body.origin ?? defaultOrigin,
is_scrape: true, is_scrape: true,
startTime: Date.now(),
}, },
{}, {},
jobId, jobId,

View File

@ -112,6 +112,7 @@ export async function searchHelper(
team_id: team_id, team_id: team_id,
scrapeOptions, scrapeOptions,
internalOptions, internalOptions,
startTime: Date.now(),
}, },
opts: { opts: {
jobId: uuid, jobId: uuid,

View File

@ -31,7 +31,6 @@ export async function scrapeController(
}); });
req.body = scrapeRequestSchema.parse(req.body); req.body = scrapeRequestSchema.parse(req.body);
let earlyReturn = false;
const origin = req.body.origin; const origin = req.body.origin;
const timeout = req.body.timeout; const timeout = req.body.timeout;
@ -55,7 +54,7 @@ export async function scrapeController(
unnormalizedSourceURL: preNormalizedBody.url, unnormalizedSourceURL: preNormalizedBody.url,
}, },
origin: req.body.origin, origin: req.body.origin,
is_scrape: true, startTime,
}, },
{}, {},
jobId, jobId,
@ -125,30 +124,6 @@ export async function scrapeController(
await getScrapeQueue().remove(jobId); await getScrapeQueue().remove(jobId);
const endTime = new Date().getTime();
const timeTakenInSeconds = (endTime - startTime) / 1000;
const numTokens =
doc && doc.extract
? // ? numTokensFromString(doc.markdown, "gpt-3.5-turbo")
0 // TODO: fix
: 0;
if (earlyReturn) {
// Don't bill if we're early returning
return;
}
let creditsToBeBilled = await calculateCreditsToBeBilled(req.body, doc, jobId);
billTeam(req.auth.team_id, req.acuc?.sub_id, creditsToBeBilled).catch(
(error) => {
logger.error(
`Failed to bill team ${req.auth.team_id} for ${creditsToBeBilled} credits: ${error}`,
);
// Optionally, you could notify an admin or add to a retry queue here
},
);
if (!req.body.formats.includes("rawHtml")) { if (!req.body.formats.includes("rawHtml")) {
if (doc && doc.rawHtml) { if (doc && doc.rawHtml) {
delete doc.rawHtml; delete doc.rawHtml;

View File

@ -103,6 +103,7 @@ async function scrapeSearchResult(
internalOptions: { teamId: options.teamId, useCache: true }, internalOptions: { teamId: options.teamId, useCache: true },
origin: options.origin, origin: options.origin,
is_scrape: true, is_scrape: true,
startTime: Date.now(),
}, },
{}, {},
jobId, jobId,

View File

@ -49,6 +49,7 @@ export async function scrapeDocument(
origin: options.origin, origin: options.origin,
is_scrape: true, is_scrape: true,
from_extract: true, from_extract: true,
startTime: Date.now(),
}, },
{}, {},
jobId, jobId,

View File

@ -47,6 +47,7 @@ export async function scrapeDocument_F0(
origin: options.origin, origin: options.origin,
is_scrape: true, is_scrape: true,
from_extract: true, from_extract: true,
startTime: Date.now(),
}, },
{}, {},
jobId, jobId,

View File

@ -63,6 +63,7 @@ export async function logJob(job: FirecrawlJob, force: boolean = false) {
is_migrated: true, is_migrated: true,
cost_tracking: job.cost_tracking, cost_tracking: job.cost_tracking,
pdf_num_pages: job.pdf_num_pages ?? null, pdf_num_pages: job.pdf_num_pages ?? null,
credits_billed: job.credits_billed ?? null,
}; };
if (process.env.GCS_BUCKET_NAME) { if (process.env.GCS_BUCKET_NAME) {

View File

@ -86,6 +86,7 @@ import { robustFetch } from "../scraper/scrapeURL/lib/fetch";
import { RateLimiterMode } from "../types"; import { RateLimiterMode } from "../types";
import { calculateCreditsToBeBilled } from "../lib/scrape-billing"; import { calculateCreditsToBeBilled } from "../lib/scrape-billing";
import { redisEvictConnection } from "./redis"; import { redisEvictConnection } from "./redis";
import type { Logger } from "winston";
configDotenv(); configDotenv();
@ -1044,6 +1045,59 @@ async function processKickoffJob(job: Job & { id: string }, token: string) {
} }
} }
async function billScrapeJob(job: Job & { id: string }, document: Document, logger: Logger, costTracking?: CostTracking) {
let creditsToBeBilled: number | null = null;
if (job.data.is_scrape !== true) {
creditsToBeBilled = await calculateCreditsToBeBilled(job.data.scrapeOptions, document, job.id, costTracking);
if (
job.data.team_id !== process.env.BACKGROUND_INDEX_TEAM_ID! &&
process.env.USE_DB_AUTHENTICATION === "true"
) {
try {
const billingJobId = uuidv4();
logger.debug(
`Adding billing job to queue for team ${job.data.team_id}`,
{
billingJobId,
credits: creditsToBeBilled,
is_extract: false,
},
);
// Add directly to the billing queue - the billing worker will handle the rest
await getBillingQueue().add(
"bill_team",
{
team_id: job.data.team_id,
subscription_id: undefined,
credits: creditsToBeBilled,
is_extract: false,
timestamp: new Date().toISOString(),
originating_job_id: job.id,
},
{
jobId: billingJobId,
priority: 10,
},
);
return creditsToBeBilled;
} catch (error) {
logger.error(
`Failed to add billing job to queue for team ${job.data.team_id} for ${creditsToBeBilled} credits`,
{ error },
);
Sentry.captureException(error);
return creditsToBeBilled;
}
}
}
return creditsToBeBilled;
}
async function processJob(job: Job & { id: string }, token: string) { async function processJob(job: Job & { id: string }, token: string) {
const logger = _logger.child({ const logger = _logger.child({
module: "queue-worker", module: "queue-worker",
@ -1054,25 +1108,9 @@ async function processJob(job: Job & { id: string }, token: string) {
teamId: job.data?.team_id ?? undefined, teamId: job.data?.team_id ?? undefined,
}); });
logger.info(`🐂 Worker taking job ${job.id}`, { url: job.data.url }); logger.info(`🐂 Worker taking job ${job.id}`, { url: job.data.url });
const start = Date.now(); const start = job.data.startTime ?? Date.now();
const remainingTime = job.data.scrapeOptions.timeout ? (job.data.scrapeOptions.timeout - (Date.now() - start)) : undefined;
// Check if the job URL is researchhub and block it immediately
// TODO: remove this once solve the root issue
// if (
// job.data.url &&
// (job.data.url.includes("researchhub.com") ||
// job.data.url.includes("ebay.com"))
// ) {
// logger.info(`🐂 Blocking job ${job.id} with URL ${job.data.url}`);
// const data = {
// success: false,
// document: null,
// project_id: job.data.project_id,
// error:
// "URL is blocked. Suspecious activity detected. Please contact help@firecrawl.com if you believe this is an error.",
// };
// return data;
// }
const costTracking = new CostTracking(); const costTracking = new CostTracking();
try { try {
@ -1083,6 +1121,11 @@ async function processJob(job: Job & { id: string }, token: string) {
current_url: "", current_url: "",
}); });
if (remainingTime !== undefined && remainingTime < 0) {
throw new Error("timeout");
}
const signal = remainingTime ? AbortSignal.timeout(remainingTime) : undefined;
if (job.data.crawl_id) { if (job.data.crawl_id) {
const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl; const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl;
if (sc && sc.cancelled) { if (sc && sc.cancelled) {
@ -1096,16 +1139,22 @@ async function processJob(job: Job & { id: string }, token: string) {
token, token,
costTracking, costTracking,
}), }),
...(job.data.scrapeOptions.timeout !== undefined ...(remainingTime !== undefined
? [ ? [
(async () => { (async () => {
await sleep(job.data.scrapeOptions.timeout); await sleep(remainingTime);
throw new Error("timeout"); throw new Error("timeout");
})(), })(),
] ]
: []), : []),
]); ]);
try {
signal?.throwIfAborted();
} catch (e) {
throw new Error("timeout");
}
if (!pipeline.success) { if (!pipeline.success) {
throw pipeline.error; throw pipeline.error;
} }
@ -1206,45 +1255,6 @@ async function processJob(job: Job & { id: string }, token: string) {
} }
} }
logger.debug("Logging job to DB...");
await logJob(
{
job_id: job.id as string,
success: true,
num_docs: 1,
docs: [doc],
time_taken: timeTakenInSeconds,
team_id: job.data.team_id,
mode: job.data.mode,
url: job.data.url,
crawlerOptions: sc.crawlerOptions,
scrapeOptions: job.data.scrapeOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
cost_tracking: costTracking,
pdf_num_pages: doc.metadata.numPages,
},
true,
);
if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) {
logger.debug("Calling webhook with success...", {
webhook: job.data.webhook,
});
await callWebhook(
job.data.team_id,
job.data.crawl_id,
data,
job.data.webhook,
job.data.v1,
job.data.crawlerOptions !== null ? "crawl.page" : "batch_scrape.page",
true,
);
}
logger.debug("Declaring job as done...");
await addCrawlJobDone(job.data.crawl_id, job.id, true);
if (job.data.crawlerOptions !== null) { if (job.data.crawlerOptions !== null) {
if (!sc.cancelled) { if (!sc.cancelled) {
const crawler = crawlToCrawler( const crawler = crawlToCrawler(
@ -1340,8 +1350,64 @@ async function processJob(job: Job & { id: string }, token: string) {
} }
} }
try {
signal?.throwIfAborted();
} catch (e) {
throw new Error("timeout");
}
const credits_billed = await billScrapeJob(job, doc, logger, costTracking);
logger.debug("Logging job to DB...");
await logJob(
{
job_id: job.id as string,
success: true,
num_docs: 1,
docs: [doc],
time_taken: timeTakenInSeconds,
team_id: job.data.team_id,
mode: job.data.mode,
url: job.data.url,
crawlerOptions: sc.crawlerOptions,
scrapeOptions: job.data.scrapeOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
cost_tracking: costTracking,
pdf_num_pages: doc.metadata.numPages,
credits_billed,
},
true,
);
if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) {
logger.debug("Calling webhook with success...", {
webhook: job.data.webhook,
});
await callWebhook(
job.data.team_id,
job.data.crawl_id,
data,
job.data.webhook,
job.data.v1,
job.data.crawlerOptions !== null ? "crawl.page" : "batch_scrape.page",
true,
);
}
logger.debug("Declaring job as done...");
await addCrawlJobDone(job.data.crawl_id, job.id, true);
await finishCrawlIfNeeded(job, sc); await finishCrawlIfNeeded(job, sc);
} else { } else {
try {
signal?.throwIfAborted();
} catch (e) {
throw new Error("timeout");
}
const credits_billed = await billScrapeJob(job, doc, logger, costTracking);
await logJob({ await logJob({
job_id: job.id, job_id: job.id,
success: true, success: true,
@ -1357,53 +1423,10 @@ async function processJob(job: Job & { id: string }, token: string) {
num_tokens: 0, // TODO: fix num_tokens: 0, // TODO: fix
cost_tracking: costTracking, cost_tracking: costTracking,
pdf_num_pages: doc.metadata.numPages, pdf_num_pages: doc.metadata.numPages,
credits_billed,
}); });
} }
if (job.data.is_scrape !== true) {
let creditsToBeBilled = await calculateCreditsToBeBilled(job.data.scrapeOptions, doc, job.id, costTracking);
if (
job.data.team_id !== process.env.BACKGROUND_INDEX_TEAM_ID! &&
process.env.USE_DB_AUTHENTICATION === "true"
) {
try {
const billingJobId = uuidv4();
logger.debug(
`Adding billing job to queue for team ${job.data.team_id}`,
{
billingJobId,
credits: creditsToBeBilled,
is_extract: false,
},
);
// Add directly to the billing queue - the billing worker will handle the rest
await getBillingQueue().add(
"bill_team",
{
team_id: job.data.team_id,
subscription_id: undefined,
credits: creditsToBeBilled,
is_extract: false,
timestamp: new Date().toISOString(),
originating_job_id: job.id,
},
{
jobId: billingJobId,
priority: 10,
},
);
} catch (error) {
logger.error(
`Failed to add billing job to queue for team ${job.data.team_id} for ${creditsToBeBilled} credits`,
{ error },
);
Sentry.captureException(error);
}
}
}
logger.info(`🐂 Job done ${job.id}`); logger.info(`🐂 Job done ${job.id}`);
return data; return data;
} catch (error) { } catch (error) {

View File

@ -44,9 +44,15 @@ export interface WebScraperOptions {
sitemapped?: boolean; sitemapped?: boolean;
webhook?: z.infer<typeof webhookSchema>; webhook?: z.infer<typeof webhookSchema>;
v1?: boolean; v1?: boolean;
/**
* Disables billing on the worker side.
*/
is_scrape?: boolean; is_scrape?: boolean;
isCrawlSourceScrape?: boolean; isCrawlSourceScrape?: boolean;
from_extract?: boolean; from_extract?: boolean;
startTime?: number;
} }
export interface RunWebScraperParams { export interface RunWebScraperParams {
@ -95,6 +101,7 @@ export interface FirecrawlJob {
sources?: Record<string, string[]>; sources?: Record<string, string[]>;
cost_tracking?: CostTracking; cost_tracking?: CostTracking;
pdf_num_pages?: number; pdf_num_pages?: number;
credits_billed?: number | null;
} }
export interface FirecrawlScrapeResponse { export interface FirecrawlScrapeResponse {