Merge pull request #1044 from mendableai/nsc/extract-queue

(feat/extract) Move extract to a queue system
This commit is contained in:
Nicolas 2025-01-07 18:10:46 -03:00 committed by GitHub
commit f82a742cd1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 480 additions and 55 deletions

View File

@ -0,0 +1,40 @@
import { Response } from "express";
import { supabaseGetJobsById } from "../../lib/supabase-jobs";
import { RequestWithAuth } from "./types";
import { getExtract, getExtractExpiry } from "../../lib/extract/extract-redis";
export async function extractStatusController(
req: RequestWithAuth<{ jobId: string }, any, any>,
res: Response,
) {
const extract = await getExtract(req.params.jobId);
if (!extract) {
return res.status(404).json({
success: false,
error: "Extract job not found",
});
}
let data: any[] = [];
if (extract.status === "completed") {
const jobData = await supabaseGetJobsById([req.params.jobId]);
if (!jobData || jobData.length === 0) {
return res.status(404).json({
success: false,
error: "Job not found",
});
}
data = jobData[0].docs;
}
return res.status(200).json({
success: true,
data: data,
status: extract.status,
error: extract?.error ?? undefined,
expiresAt: (await getExtractExpiry(req.params.jobId)).toISOString(),
});
}

View File

@ -5,8 +5,31 @@ import {
extractRequestSchema,
ExtractResponse,
} from "./types";
import { getExtractQueue } from "../../services/queue-service";
import * as Sentry from "@sentry/node";
import { saveExtract } from "../../lib/extract/extract-redis";
import { getTeamIdSyncB } from "../../lib/extract/team-id-sync";
import { performExtraction } from "../../lib/extract/extraction-service";
export async function oldExtract(req: RequestWithAuth<{}, ExtractResponse, ExtractRequest>, res: Response<ExtractResponse>, extractId: string){
// Means that are in the non-queue system
// TODO: Remove this once all teams have transitioned to the new system
try {
const result = await performExtraction(extractId, {
request: req.body,
teamId: req.auth.team_id,
plan: req.auth.plan ?? "free",
subId: req.acuc?.sub_id ?? undefined,
});
return res.status(200).json(result);
} catch (error) {
return res.status(500).json({
success: false,
error: "Internal server error",
});
}
}
/**
* Extracts data from the provided URLs based on the request parameters.
* Currently in beta.
@ -21,20 +44,59 @@ export async function extractController(
const selfHosted = process.env.USE_DB_AUTHENTICATION !== "true";
req.body = extractRequestSchema.parse(req.body);
if (!req.auth.plan) {
return res.status(400).json({
success: false,
error: "No plan specified",
urlTrace: [],
});
}
const result = await performExtraction({
const extractId = crypto.randomUUID();
const jobData = {
request: req.body,
teamId: req.auth.team_id,
plan: req.auth.plan,
subId: req.acuc?.sub_id || undefined,
subId: req.acuc?.sub_id,
extractId,
};
if(await getTeamIdSyncB(req.auth.team_id) && req.body.origin !== "api-sdk") {
return await oldExtract(req, res, extractId);
}
await saveExtract(extractId, {
id: extractId,
team_id: req.auth.team_id,
plan: req.auth.plan,
createdAt: Date.now(),
status: "processing",
});
return res.status(result.success ? 200 : 400).json(result);
if (Sentry.isInitialized()) {
const size = JSON.stringify(jobData).length;
await Sentry.startSpan(
{
name: "Add extract job",
op: "queue.publish",
attributes: {
"messaging.message.id": extractId,
"messaging.destination.name": getExtractQueue().name,
"messaging.message.body.size": size,
},
},
async (span) => {
await getExtractQueue().add(extractId, {
...jobData,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
});
},
);
} else {
await getExtractQueue().add(extractId, jobData, {
jobId: extractId,
});
}
return res.status(200).json({
success: true,
id: extractId,
urlTrace: [],
});
}

View File

@ -186,6 +186,10 @@ export const scrapeOptions = z
export type ScrapeOptions = z.infer<typeof scrapeOptions>;
import Ajv from "ajv";
const ajv = new Ajv();
export const extractV1Options = z
.object({
urls: url
@ -193,7 +197,20 @@ export const extractV1Options = z
.max(10, "Maximum of 10 URLs allowed per request while in beta."),
prompt: z.string().optional(),
systemPrompt: z.string().optional(),
schema: z.any().optional(),
schema: z
.any()
.optional()
.refine((val) => {
if (!val) return true; // Allow undefined schema
try {
const validate = ajv.compile(val);
return typeof validate === "function";
} catch (e) {
return false;
}
}, {
message: "Invalid JSON schema.",
}),
limit: z.number().int().positive().finite().safe().optional(),
ignoreSitemap: z.boolean().default(false),
includeSubdomains: z.boolean().default(true),
@ -478,10 +495,11 @@ export interface URLTrace {
export interface ExtractResponse {
success: boolean;
error?: string;
data?: any;
scrape_id?: string;
id?: string;
warning?: string;
error?: string;
urlTrace?: URLTrace[];
}

View File

@ -4,7 +4,7 @@ import * as Sentry from "@sentry/node";
import express, { NextFunction, Request, Response } from "express";
import bodyParser from "body-parser";
import cors from "cors";
import { getScrapeQueue } from "./services/queue-service";
import { getExtractQueue, getScrapeQueue } from "./services/queue-service";
import { v0Router } from "./routes/v0";
import os from "os";
import { logger } from "./lib/logger";
@ -45,7 +45,7 @@ const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`);
const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
queues: [new BullAdapter(getScrapeQueue())],
queues: [new BullAdapter(getScrapeQueue()), new BullAdapter(getExtractQueue())],
serverAdapter: serverAdapter,
});

View File

@ -0,0 +1,38 @@
import { redisConnection } from "../../services/queue-service";
import { logger as _logger } from "../logger";
export type StoredExtract = {
id: string;
team_id: string;
plan?: string;
createdAt: number;
status: "processing" | "completed" | "failed" | "cancelled";
error?: any;
};
export async function saveExtract(id: string, extract: StoredExtract) {
_logger.debug("Saving extract " + id + " to Redis...");
await redisConnection.set("extract:" + id, JSON.stringify(extract));
await redisConnection.expire("extract:" + id, 24 * 60 * 60, "NX");
}
export async function getExtract(id: string): Promise<StoredExtract | null> {
const x = await redisConnection.get("extract:" + id);
return x ? JSON.parse(x) : null;
}
export async function updateExtract(id: string, extract: Partial<StoredExtract>) {
const current = await getExtract(id);
if (!current) return;
await redisConnection.set("extract:" + id, JSON.stringify({ ...current, ...extract }));
await redisConnection.expire("extract:" + id, 24 * 60 * 60, "NX");
}
export async function getExtractExpiry(id: string): Promise<Date> {
const d = new Date();
const ttl = await redisConnection.pttl("extract:" + id);
d.setMilliseconds(d.getMilliseconds() + ttl);
d.setMilliseconds(0);
return d;
}

View File

@ -9,6 +9,7 @@ import { billTeam } from "../../services/billing/credit_billing";
import { logJob } from "../../services/logging/log_job";
import { _addScrapeJobToBullMQ } from "../../services/queue-jobs";
import { saveCrawl, StoredCrawl } from "../crawl-redis";
import { updateExtract } from "./extract-redis";
interface ExtractServiceOptions {
request: ExtractRequest;
@ -20,7 +21,7 @@ interface ExtractServiceOptions {
interface ExtractResult {
success: boolean;
data?: any;
scrapeId: string;
extractId: string;
warning?: string;
urlTrace?: URLTrace[];
error?: string;
@ -38,9 +39,8 @@ function getRootDomain(url: string): string {
}
}
export async function performExtraction(options: ExtractServiceOptions): Promise<ExtractResult> {
export async function performExtraction(extractId: string, options: ExtractServiceOptions): Promise<ExtractResult> {
const { request, teamId, plan, subId } = options;
const scrapeId = crypto.randomUUID();
const urlTraces: URLTrace[] = [];
let docs: Document[] = [];
@ -65,7 +65,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise
return {
success: false,
error: "No valid URLs found to scrape. Try adjusting your search criteria or including more URLs.",
scrapeId,
extractId,
urlTrace: urlTraces,
};
}
@ -89,7 +89,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise
return {
success: false,
error: error.message,
scrapeId,
extractId,
urlTrace: urlTraces,
};
}
@ -191,7 +191,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise
// Log job
logJob({
job_id: scrapeId,
job_id: extractId,
success: true,
message: "Extract completed",
num_docs: 1,
@ -203,12 +203,20 @@ export async function performExtraction(options: ExtractServiceOptions): Promise
scrapeOptions: request,
origin: request.origin ?? "api",
num_tokens: completions.numTokens ?? 0,
}).then(() => {
updateExtract(extractId, {
status: "completed",
}).catch((error) => {
logger.error(`Failed to update extract ${extractId} status to completed: ${error}`);
});
});
return {
success: true,
data: completions.extract ?? {},
scrapeId,
extractId,
warning: completions.warning,
urlTrace: request.urlTrace ? urlTraces : undefined,
};

View File

@ -0,0 +1,19 @@
import { supabase_service } from "../../services/supabase";
import { logger } from "../logger";
export async function getTeamIdSyncB(teamId: string) {
try {
const { data, error } = await supabase_service
.from("eb-sync")
.select("team_id")
.eq("team_id", teamId)
.limit(1);
if (error) {
throw new Error("Error getting team id (sync b)");
}
return data[0] ?? null;
} catch (error) {
logger.error("Error getting team id (sync b)", error);
return null;
}
}

View File

@ -24,13 +24,7 @@ import { scrapeStatusController } from "../controllers/v1/scrape-status";
import { concurrencyCheckController } from "../controllers/v1/concurrency-check";
import { batchScrapeController } from "../controllers/v1/batch-scrape";
import { extractController } from "../controllers/v1/extract";
// import { crawlPreviewController } from "../../src/controllers/v1/crawlPreview";
// import { crawlJobStatusPreviewController } from "../../src/controllers/v1/status";
// import { searchController } from "../../src/controllers/v1/search";
// import { crawlCancelController } from "../../src/controllers/v1/crawl-cancel";
// import { keyAuthController } from "../../src/controllers/v1/keyAuth";
// import { livenessController } from "../controllers/v1/liveness";
// import { readinessController } from "../controllers/v1/readiness";
import { extractStatusController } from "../controllers/v1/extract-status";
import { creditUsageController } from "../controllers/v1/credit-usage";
import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings";
import { searchController } from "../controllers/v1/search";
@ -215,6 +209,12 @@ v1Router.post(
wrap(extractController),
);
v1Router.get(
"/extract/:jobId",
authMiddleware(RateLimiterMode.CrawlStatus),
wrap(extractStatusController),
);
// v1Router.post("/crawlWebsitePreview", crawlPreviewController);
v1Router.delete(

View File

@ -3,12 +3,16 @@ import { logger } from "../lib/logger";
import IORedis from "ioredis";
let scrapeQueue: Queue;
let extractQueue: Queue;
let loggingQueue: Queue;
export const redisConnection = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null,
});
export const scrapeQueueName = "{scrapeQueue}";
export const extractQueueName = "{extractQueue}";
export const loggingQueueName = "{loggingQueue}";
export function getScrapeQueue() {
if (!scrapeQueue) {
@ -24,24 +28,35 @@ export function getScrapeQueue() {
age: 90000, // 25 hours
},
},
},
// {
// settings: {
// lockDuration: 1 * 60 * 1000, // 1 minute in milliseconds,
// lockRenewTime: 15 * 1000, // 15 seconds in milliseconds
// stalledInterval: 30 * 1000,
// maxStalledCount: 10,
// },
// defaultJobOptions:{
// attempts: 5
// }
// }
}
);
logger.info("Web scraper queue created");
}
return scrapeQueue;
}
export function getExtractQueue() {
if (!extractQueue) {
extractQueue = new Queue(
extractQueueName,
{
connection: redisConnection,
defaultJobOptions: {
removeOnComplete: {
age: 90000, // 25 hours
},
removeOnFail: {
age: 90000, // 25 hours
},
},
}
);
logger.info("Extraction queue created");
}
return extractQueue;
}
// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE
// import { QueueEvents } from 'bullmq';
// export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() });

View File

@ -4,8 +4,10 @@ import * as Sentry from "@sentry/node";
import { CustomError } from "../lib/custom-error";
import {
getScrapeQueue,
getExtractQueue,
redisConnection,
scrapeQueueName,
extractQueueName,
} from "./queue-service";
import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook";
@ -52,8 +54,10 @@ import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist";
import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings";
import { indexPage } from "../lib/extract/index/pinecone";
import { Document } from "../controllers/v1/types";
import { performExtraction } from "../lib/extract/extraction-service";
import { supabase_service } from "../services/supabase";
import { normalizeUrl, normalizeUrlOnlyHostname } from "../lib/canonical-url";
import { saveExtract, updateExtract } from "../lib/extract/extract-redis";
configDotenv();
@ -310,6 +314,58 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => {
return err;
};
const processExtractJobInternal = async (token: string, job: Job & { id: string }) => {
const logger = _logger.child({
module: "extract-worker",
method: "processJobInternal",
jobId: job.id,
extractId: job.data.extractId,
teamId: job.data?.teamId ?? undefined,
});
const extendLockInterval = setInterval(async () => {
logger.info(`🔄 Worker extending lock on job ${job.id}`);
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);
try {
const result = await performExtraction(job.data.extractId, {
request: job.data.request,
teamId: job.data.teamId,
plan: job.data.plan,
subId: job.data.subId,
});
if (result.success) {
// Move job to completed state in Redis
await job.moveToCompleted(result, token, false);
return result;
} else {
throw new Error(result.error || "Unknown error during extraction");
}
} catch (error) {
logger.error(`🚫 Job errored ${job.id} - ${error}`, { error });
Sentry.captureException(error, {
data: {
job: job.id,
},
});
// Move job to failed state in Redis
await job.moveToFailed(error, token, false);
await updateExtract(job.data.extractId, {
status: "failed",
error: error.error ?? error ?? "Unknown error, please contact help@firecrawl.dev. Extract id: " + job.data.extractId,
});
// throw error;
} finally {
clearInterval(extendLockInterval);
}
};
let isShuttingDown = false;
process.on("SIGINT", () => {
@ -466,7 +522,9 @@ const workerFun = async (
}
};
// Start both workers
workerFun(getScrapeQueue(), processJobInternal);
workerFun(getExtractQueue(), processExtractJobInternal);
async function processKickoffJob(job: Job & { id: string }, token: string) {
const logger = _logger.child({

View File

@ -884,21 +884,35 @@ export default class FirecrawlApp {
try {
const response: AxiosResponse = await this.postRequest(
this.apiUrl + `/v1/extract`,
{ ...jsonData, schema: jsonSchema },
{ ...jsonData, schema: jsonSchema, origin: "api-sdk" },
headers
);
if (response.status === 200) {
const responseData = response.data as ExtractResponse<T>;
if (responseData.success) {
const jobId = response.data.id;
let extractStatus;
do {
const statusResponse: AxiosResponse = await this.getRequest(
`${this.apiUrl}/v1/extract/${jobId}`,
headers
);
extractStatus = statusResponse.data;
if (extractStatus.status === "completed") {
if (extractStatus.success) {
return {
success: true,
data: responseData.data,
warning: responseData.warning,
error: responseData.error
data: extractStatus.data,
warning: extractStatus.warning,
error: extractStatus.error
};
} else {
throw new FirecrawlError(`Failed to scrape URL. Error: ${responseData.error}`, response.status);
throw new FirecrawlError(`Failed to extract data. Error: ${extractStatus.error}`, statusResponse.status);
}
} else if (extractStatus.status === "failed" || extractStatus.status === "cancelled") {
throw new FirecrawlError(`Extract job ${extractStatus.status}. Error: ${extractStatus.error}`, statusResponse.status);
}
await new Promise(resolve => setTimeout(resolve, 1000)); // Polling interval
} while (extractStatus.status !== "completed");
} else {
this.handleError(response, "extract");
}
@ -908,6 +922,72 @@ export default class FirecrawlApp {
return { success: false, error: "Internal server error." };
}
/**
* Initiates an asynchronous extract job for a URL using the Firecrawl API.
* @param url - The URL to extract data from.
* @param params - Additional parameters for the extract request.
* @param idempotencyKey - Optional idempotency key for the request.
* @returns The response from the extract operation.
*/
async asyncExtract(
url: string,
params?: ExtractParams,
idempotencyKey?: string
): Promise<ExtractResponse | ErrorResponse> {
const headers = this.prepareHeaders(idempotencyKey);
let jsonData: any = { url, ...params };
let jsonSchema: any;
try {
if (params?.schema instanceof zt.ZodType) {
jsonSchema = zodToJsonSchema(params.schema);
} else {
jsonSchema = params?.schema;
}
} catch (error: any) {
throw new FirecrawlError("Invalid schema. Schema must be either a valid Zod schema or JSON schema object.", 400);
}
try {
const response: AxiosResponse = await this.postRequest(
this.apiUrl + `/v1/extract`,
{ ...jsonData, schema: jsonSchema },
headers
);
if (response.status === 200) {
return response.data;
} else {
this.handleError(response, "start extract job");
}
} catch (error: any) {
throw new FirecrawlError(error.message, 500);
}
return { success: false, error: "Internal server error." };
}
/**
* Retrieves the status of an extract job.
* @param jobId - The ID of the extract job.
* @returns The status of the extract job.
*/
async getExtractStatus(jobId: string): Promise<any> {
try {
const response: AxiosResponse = await this.getRequest(
`${this.apiUrl}/v1/extract/${jobId}`,
this.prepareHeaders()
);
if (response.status === 200) {
return response.data;
} else {
this.handleError(response, "get extract status");
}
} catch (error: any) {
throw new FirecrawlError(error.message, 500);
}
}
/**
* Prepares the headers for an API request.
* @param idempotencyKey - Optional key to ensure idempotency.

View File

@ -538,10 +538,12 @@ class FirecrawlApp:
request_data = {
**jsonData,
'allowExternalLinks': params.get('allow_external_links', False),
'schema': schema
'schema': schema,
'origin': 'api-sdk'
}
try:
# Send the initial extract request
response = self._post_request(
f'{self.api_url}/v1/extract',
request_data,
@ -550,7 +552,29 @@ class FirecrawlApp:
if response.status_code == 200:
data = response.json()
if data['success']:
return data
job_id = data.get('id')
if not job_id:
raise Exception('Job ID not returned from extract request.')
# Poll for the extract status
while True:
status_response = self._get_request(
f'{self.api_url}/v1/extract/{job_id}',
headers
)
if status_response.status_code == 200:
status_data = status_response.json()
if status_data['status'] == 'completed':
if status_data['success']:
return status_data
else:
raise Exception(f'Failed to extract. Error: {status_data["error"]}')
elif status_data['status'] in ['failed', 'cancelled']:
raise Exception(f'Extract job {status_data["status"]}. Error: {status_data["error"]}')
else:
self._handle_error(status_response, "extract-status")
time.sleep(2) # Polling interval
else:
raise Exception(f'Failed to extract. Error: {data["error"]}')
else:
@ -560,6 +584,69 @@ class FirecrawlApp:
return {'success': False, 'error': "Internal server error."}
def get_extract_status(self, job_id: str) -> Dict[str, Any]:
"""
Retrieve the status of an extract job.
Args:
job_id (str): The ID of the extract job.
Returns:
Dict[str, Any]: The status of the extract job.
Raises:
ValueError: If there is an error retrieving the status.
"""
headers = self._prepare_headers()
try:
response = self._get_request(f'{self.api_url}/v1/extract/{job_id}', headers)
if response.status_code == 200:
return response.json()
else:
self._handle_error(response, "get extract status")
except Exception as e:
raise ValueError(str(e), 500)
def async_extract(self, urls: List[str], params: Optional[Dict[str, Any]] = None, idempotency_key: Optional[str] = None) -> Dict[str, Any]:
"""
Initiate an asynchronous extract job.
Args:
urls (List[str]): The URLs to extract data from.
params (Optional[Dict[str, Any]]): Additional parameters for the extract request.
idempotency_key (Optional[str]): A unique key to ensure idempotency of requests.
Returns:
Dict[str, Any]: The response from the extract operation.
Raises:
ValueError: If there is an error initiating the extract job.
"""
headers = self._prepare_headers(idempotency_key)
schema = params.get('schema') if params else None
if schema:
if hasattr(schema, 'model_json_schema'):
# Convert Pydantic model to JSON schema
schema = schema.model_json_schema()
# Otherwise assume it's already a JSON schema dict
jsonData = {'urls': urls, **(params or {})}
request_data = {
**jsonData,
'allowExternalLinks': params.get('allow_external_links', False) if params else False,
'schema': schema
}
try:
response = self._post_request(f'{self.api_url}/v1/extract', request_data, headers)
if response.status_code == 200:
return response.json()
else:
self._handle_error(response, "async extract")
except Exception as e:
raise ValueError(str(e), 500)
def _prepare_headers(self, idempotency_key: Optional[str] = None) -> Dict[str, str]:
"""
Prepare the headers for API requests.