feat(api/scrape): stricten timeout and handle billing and logging on queue-worker

This commit is contained in:
Gergő Móricz 2025-05-30 17:35:32 +02:00
parent 9297afd1ff
commit 42c8adf9e5
9 changed files with 133 additions and 132 deletions

View File

@ -82,6 +82,7 @@ export async function scrapeHelper(
internalOptions, internalOptions,
origin: req.body.origin ?? defaultOrigin, origin: req.body.origin ?? defaultOrigin,
is_scrape: true, is_scrape: true,
startTime: Date.now(),
}, },
{}, {},
jobId, jobId,

View File

@ -112,6 +112,7 @@ export async function searchHelper(
team_id: team_id, team_id: team_id,
scrapeOptions, scrapeOptions,
internalOptions, internalOptions,
startTime: Date.now(),
}, },
opts: { opts: {
jobId: uuid, jobId: uuid,

View File

@ -31,7 +31,6 @@ export async function scrapeController(
}); });
req.body = scrapeRequestSchema.parse(req.body); req.body = scrapeRequestSchema.parse(req.body);
let earlyReturn = false;
const origin = req.body.origin; const origin = req.body.origin;
const timeout = req.body.timeout; const timeout = req.body.timeout;
@ -55,7 +54,7 @@ export async function scrapeController(
unnormalizedSourceURL: preNormalizedBody.url, unnormalizedSourceURL: preNormalizedBody.url,
}, },
origin: req.body.origin, origin: req.body.origin,
is_scrape: true, startTime,
}, },
{}, {},
jobId, jobId,
@ -125,30 +124,6 @@ export async function scrapeController(
await getScrapeQueue().remove(jobId); await getScrapeQueue().remove(jobId);
const endTime = new Date().getTime();
const timeTakenInSeconds = (endTime - startTime) / 1000;
const numTokens =
doc && doc.extract
? // ? numTokensFromString(doc.markdown, "gpt-3.5-turbo")
0 // TODO: fix
: 0;
if (earlyReturn) {
// Don't bill if we're early returning
return;
}
let creditsToBeBilled = await calculateCreditsToBeBilled(req.body, doc, jobId);
billTeam(req.auth.team_id, req.acuc?.sub_id, creditsToBeBilled).catch(
(error) => {
logger.error(
`Failed to bill team ${req.auth.team_id} for ${creditsToBeBilled} credits: ${error}`,
);
// Optionally, you could notify an admin or add to a retry queue here
},
);
if (!req.body.formats.includes("rawHtml")) { if (!req.body.formats.includes("rawHtml")) {
if (doc && doc.rawHtml) { if (doc && doc.rawHtml) {
delete doc.rawHtml; delete doc.rawHtml;

View File

@ -102,7 +102,7 @@ async function scrapeSearchResult(
internalOptions: { teamId: options.teamId, useCache: true }, internalOptions: { teamId: options.teamId, useCache: true },
origin: options.origin, origin: options.origin,
is_scrape: true, is_scrape: true,
startTime: Date.now(),
}, },
{}, {},
jobId, jobId,

View File

@ -49,6 +49,7 @@ export async function scrapeDocument(
origin: options.origin, origin: options.origin,
is_scrape: true, is_scrape: true,
from_extract: true, from_extract: true,
startTime: Date.now(),
}, },
{}, {},
jobId, jobId,

View File

@ -47,6 +47,7 @@ export async function scrapeDocument_F0(
origin: options.origin, origin: options.origin,
is_scrape: true, is_scrape: true,
from_extract: true, from_extract: true,
startTime: Date.now(),
}, },
{}, {},
jobId, jobId,

View File

@ -104,6 +104,7 @@ export async function logJob(job: FirecrawlJob, force: boolean = false) {
is_migrated: true, is_migrated: true,
cost_tracking: job.cost_tracking, cost_tracking: job.cost_tracking,
pdf_num_pages: job.pdf_num_pages ?? null, pdf_num_pages: job.pdf_num_pages ?? null,
credits_billed: job.credits_billed ?? null,
}; };
// Send job to external server // Send job to external server

View File

@ -87,6 +87,7 @@ import { robustFetch } from "../scraper/scrapeURL/lib/fetch";
import { RateLimiterMode } from "../types"; import { RateLimiterMode } from "../types";
import { calculateCreditsToBeBilled } from "../lib/scrape-billing"; import { calculateCreditsToBeBilled } from "../lib/scrape-billing";
import { redisEvictConnection } from "./redis"; import { redisEvictConnection } from "./redis";
import type { Logger } from "winston";
configDotenv(); configDotenv();
@ -1064,6 +1065,59 @@ async function indexJob(job: Job & { id: string }, document: Document) {
} }
} }
async function billScrapeJob(job: Job & { id: string }, document: Document, logger: Logger, costTracking?: CostTracking) {
let creditsToBeBilled: number | null = null;
if (job.data.is_scrape !== true) {
creditsToBeBilled = await calculateCreditsToBeBilled(job.data.scrapeOptions, document, job.id, costTracking);
if (
job.data.team_id !== process.env.BACKGROUND_INDEX_TEAM_ID! &&
process.env.USE_DB_AUTHENTICATION === "true"
) {
try {
const billingJobId = uuidv4();
logger.debug(
`Adding billing job to queue for team ${job.data.team_id}`,
{
billingJobId,
credits: creditsToBeBilled,
is_extract: false,
},
);
// Add directly to the billing queue - the billing worker will handle the rest
await getBillingQueue().add(
"bill_team",
{
team_id: job.data.team_id,
subscription_id: undefined,
credits: creditsToBeBilled,
is_extract: false,
timestamp: new Date().toISOString(),
originating_job_id: job.id,
},
{
jobId: billingJobId,
priority: 10,
},
);
return creditsToBeBilled;
} catch (error) {
logger.error(
`Failed to add billing job to queue for team ${job.data.team_id} for ${creditsToBeBilled} credits`,
{ error },
);
Sentry.captureException(error);
return creditsToBeBilled;
}
}
}
return creditsToBeBilled;
}
async function processJob(job: Job & { id: string }, token: string) { async function processJob(job: Job & { id: string }, token: string) {
const logger = _logger.child({ const logger = _logger.child({
module: "queue-worker", module: "queue-worker",
@ -1074,25 +1128,10 @@ async function processJob(job: Job & { id: string }, token: string) {
teamId: job.data?.team_id ?? undefined, teamId: job.data?.team_id ?? undefined,
}); });
logger.info(`🐂 Worker taking job ${job.id}`, { url: job.data.url }); logger.info(`🐂 Worker taking job ${job.id}`, { url: job.data.url });
const start = Date.now(); const start = job.data.startTime ?? Date.now();
const remainingTime = job.data.scrapeOptions.timeout ? (job.data.scrapeOptions.timeout - (Date.now() - start)) : undefined;
const signal = remainingTime ? AbortSignal.timeout(remainingTime) : undefined;
// Check if the job URL is researchhub and block it immediately
// TODO: remove this once solve the root issue
// if (
// job.data.url &&
// (job.data.url.includes("researchhub.com") ||
// job.data.url.includes("ebay.com"))
// ) {
// logger.info(`🐂 Blocking job ${job.id} with URL ${job.data.url}`);
// const data = {
// success: false,
// document: null,
// project_id: job.data.project_id,
// error:
// "URL is blocked. Suspecious activity detected. Please contact help@firecrawl.com if you believe this is an error.",
// };
// return data;
// }
const costTracking = new CostTracking(); const costTracking = new CostTracking();
try { try {
@ -1116,16 +1155,22 @@ async function processJob(job: Job & { id: string }, token: string) {
token, token,
costTracking, costTracking,
}), }),
...(job.data.scrapeOptions.timeout !== undefined ...(remainingTime !== undefined
? [ ? [
(async () => { (async () => {
await sleep(job.data.scrapeOptions.timeout); await sleep(remainingTime);
throw new Error("timeout"); throw new Error("timeout");
})(), })(),
] ]
: []), : []),
]); ]);
try {
signal?.throwIfAborted();
} catch (e) {
throw new Error("timeout");
}
if (!pipeline.success) { if (!pipeline.success) {
throw pipeline.error; throw pipeline.error;
} }
@ -1226,47 +1271,6 @@ async function processJob(job: Job & { id: string }, token: string) {
} }
} }
logger.debug("Logging job to DB...");
await logJob(
{
job_id: job.id as string,
success: true,
num_docs: 1,
docs: [doc],
time_taken: timeTakenInSeconds,
team_id: job.data.team_id,
mode: job.data.mode,
url: job.data.url,
crawlerOptions: sc.crawlerOptions,
scrapeOptions: job.data.scrapeOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
cost_tracking: costTracking,
pdf_num_pages: doc.metadata.numPages,
},
true,
);
if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) {
logger.debug("Calling webhook with success...", {
webhook: job.data.webhook,
});
await callWebhook(
job.data.team_id,
job.data.crawl_id,
data,
job.data.webhook,
job.data.v1,
job.data.crawlerOptions !== null ? "crawl.page" : "batch_scrape.page",
true,
);
}
indexJob(job, doc);
logger.debug("Declaring job as done...");
await addCrawlJobDone(job.data.crawl_id, job.id, true);
if (job.data.crawlerOptions !== null) { if (job.data.crawlerOptions !== null) {
if (!sc.cancelled) { if (!sc.cancelled) {
const crawler = crawlToCrawler( const crawler = crawlToCrawler(
@ -1362,8 +1366,66 @@ async function processJob(job: Job & { id: string }, token: string) {
} }
} }
try {
signal?.throwIfAborted();
} catch (e) {
throw new Error("timeout");
}
const credits_billed = await billScrapeJob(job, doc, logger, costTracking);
logger.debug("Logging job to DB...");
await logJob(
{
job_id: job.id as string,
success: true,
num_docs: 1,
docs: [doc],
time_taken: timeTakenInSeconds,
team_id: job.data.team_id,
mode: job.data.mode,
url: job.data.url,
crawlerOptions: sc.crawlerOptions,
scrapeOptions: job.data.scrapeOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
cost_tracking: costTracking,
pdf_num_pages: doc.metadata.numPages,
credits_billed,
},
true,
);
if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) {
logger.debug("Calling webhook with success...", {
webhook: job.data.webhook,
});
await callWebhook(
job.data.team_id,
job.data.crawl_id,
data,
job.data.webhook,
job.data.v1,
job.data.crawlerOptions !== null ? "crawl.page" : "batch_scrape.page",
true,
);
}
indexJob(job, doc);
logger.debug("Declaring job as done...");
await addCrawlJobDone(job.data.crawl_id, job.id, true);
await finishCrawlIfNeeded(job, sc); await finishCrawlIfNeeded(job, sc);
} else { } else {
try {
signal?.throwIfAborted();
} catch (e) {
throw new Error("timeout");
}
const credits_billed = await billScrapeJob(job, doc, logger, costTracking);
await logJob({ await logJob({
job_id: job.id, job_id: job.id,
success: true, success: true,
@ -1379,55 +1441,12 @@ async function processJob(job: Job & { id: string }, token: string) {
num_tokens: 0, // TODO: fix num_tokens: 0, // TODO: fix
cost_tracking: costTracking, cost_tracking: costTracking,
pdf_num_pages: doc.metadata.numPages, pdf_num_pages: doc.metadata.numPages,
credits_billed,
}); });
indexJob(job, doc); indexJob(job, doc);
} }
if (job.data.is_scrape !== true) {
let creditsToBeBilled = await calculateCreditsToBeBilled(job.data.scrapeOptions, doc, job.id, costTracking);
if (
job.data.team_id !== process.env.BACKGROUND_INDEX_TEAM_ID! &&
process.env.USE_DB_AUTHENTICATION === "true"
) {
try {
const billingJobId = uuidv4();
logger.debug(
`Adding billing job to queue for team ${job.data.team_id}`,
{
billingJobId,
credits: creditsToBeBilled,
is_extract: false,
},
);
// Add directly to the billing queue - the billing worker will handle the rest
await getBillingQueue().add(
"bill_team",
{
team_id: job.data.team_id,
subscription_id: undefined,
credits: creditsToBeBilled,
is_extract: false,
timestamp: new Date().toISOString(),
originating_job_id: job.id,
},
{
jobId: billingJobId,
priority: 10,
},
);
} catch (error) {
logger.error(
`Failed to add billing job to queue for team ${job.data.team_id} for ${creditsToBeBilled} credits`,
{ error },
);
Sentry.captureException(error);
}
}
}
logger.info(`🐂 Job done ${job.id}`); logger.info(`🐂 Job done ${job.id}`);
return data; return data;
} catch (error) { } catch (error) {

View File

@ -47,6 +47,7 @@ export interface WebScraperOptions {
is_scrape?: boolean; is_scrape?: boolean;
isCrawlSourceScrape?: boolean; isCrawlSourceScrape?: boolean;
from_extract?: boolean; from_extract?: boolean;
startTime?: number;
} }
export interface RunWebScraperParams { export interface RunWebScraperParams {
@ -95,6 +96,7 @@ export interface FirecrawlJob {
sources?: Record<string, string[]>; sources?: Record<string, string[]>;
cost_tracking?: CostTracking; cost_tracking?: CostTracking;
pdf_num_pages?: number; pdf_num_pages?: number;
credits_billed?: number | null;
} }
export interface FirecrawlScrapeResponse { export interface FirecrawlScrapeResponse {