diff --git a/apps/api/src/controllers/v1/extract-status.ts b/apps/api/src/controllers/v1/extract-status.ts new file mode 100644 index 00000000..ce58a342 --- /dev/null +++ b/apps/api/src/controllers/v1/extract-status.ts @@ -0,0 +1,40 @@ +import { Response } from "express"; +import { supabaseGetJobsById } from "../../lib/supabase-jobs"; +import { RequestWithAuth } from "./types"; +import { getExtract, getExtractExpiry } from "../../lib/extract/extract-redis"; + +export async function extractStatusController( + req: RequestWithAuth<{ jobId: string }, any, any>, + res: Response, +) { + const extract = await getExtract(req.params.jobId); + + if (!extract) { + return res.status(404).json({ + success: false, + error: "Extract job not found", + }); + } + + let data: any[] = []; + + if (extract.status === "completed") { + const jobData = await supabaseGetJobsById([req.params.jobId]); + if (!jobData || jobData.length === 0) { + return res.status(404).json({ + success: false, + error: "Job not found", + }); + } + + data = jobData[0].docs; + } + + return res.status(200).json({ + success: true, + data: data, + status: extract.status, + error: extract?.error ?? undefined, + expiresAt: (await getExtractExpiry(req.params.jobId)).toISOString(), + }); +} diff --git a/apps/api/src/controllers/v1/extract.ts b/apps/api/src/controllers/v1/extract.ts index af63f446..ab69ca93 100644 --- a/apps/api/src/controllers/v1/extract.ts +++ b/apps/api/src/controllers/v1/extract.ts @@ -5,8 +5,31 @@ import { extractRequestSchema, ExtractResponse, } from "./types"; +import { getExtractQueue } from "../../services/queue-service"; +import * as Sentry from "@sentry/node"; +import { saveExtract } from "../../lib/extract/extract-redis"; +import { getTeamIdSyncB } from "../../lib/extract/team-id-sync"; import { performExtraction } from "../../lib/extract/extraction-service"; +export async function oldExtract(req: RequestWithAuth<{}, ExtractResponse, ExtractRequest>, res: Response, extractId: string){ + // Means that are in the non-queue system + // TODO: Remove this once all teams have transitioned to the new system + try { + const result = await performExtraction(extractId, { + request: req.body, + teamId: req.auth.team_id, + plan: req.auth.plan ?? "free", + subId: req.acuc?.sub_id ?? undefined, + }); + + return res.status(200).json(result); + } catch (error) { + return res.status(500).json({ + success: false, + error: "Internal server error", + }); + } + } /** * Extracts data from the provided URLs based on the request parameters. * Currently in beta. @@ -21,20 +44,59 @@ export async function extractController( const selfHosted = process.env.USE_DB_AUTHENTICATION !== "true"; req.body = extractRequestSchema.parse(req.body); - if (!req.auth.plan) { - return res.status(400).json({ - success: false, - error: "No plan specified", - urlTrace: [], - }); - } - - const result = await performExtraction({ + const extractId = crypto.randomUUID(); + const jobData = { request: req.body, teamId: req.auth.team_id, plan: req.auth.plan, - subId: req.acuc?.sub_id || undefined, + subId: req.acuc?.sub_id, + extractId, + }; + + if(await getTeamIdSyncB(req.auth.team_id) && req.body.origin !== "api-sdk") { + return await oldExtract(req, res, extractId); + } + + await saveExtract(extractId, { + id: extractId, + team_id: req.auth.team_id, + plan: req.auth.plan, + createdAt: Date.now(), + status: "processing", }); - return res.status(result.success ? 200 : 400).json(result); + if (Sentry.isInitialized()) { + const size = JSON.stringify(jobData).length; + await Sentry.startSpan( + { + name: "Add extract job", + op: "queue.publish", + attributes: { + "messaging.message.id": extractId, + "messaging.destination.name": getExtractQueue().name, + "messaging.message.body.size": size, + }, + }, + async (span) => { + await getExtractQueue().add(extractId, { + ...jobData, + sentry: { + trace: Sentry.spanToTraceHeader(span), + baggage: Sentry.spanToBaggageHeader(span), + size, + }, + }); + }, + ); + } else { + await getExtractQueue().add(extractId, jobData, { + jobId: extractId, + }); + } + + return res.status(200).json({ + success: true, + id: extractId, + urlTrace: [], + }); } diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index ccb11586..1c9112c5 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -186,6 +186,10 @@ export const scrapeOptions = z export type ScrapeOptions = z.infer; +import Ajv from "ajv"; + +const ajv = new Ajv(); + export const extractV1Options = z .object({ urls: url @@ -193,7 +197,20 @@ export const extractV1Options = z .max(10, "Maximum of 10 URLs allowed per request while in beta."), prompt: z.string().optional(), systemPrompt: z.string().optional(), - schema: z.any().optional(), + schema: z + .any() + .optional() + .refine((val) => { + if (!val) return true; // Allow undefined schema + try { + const validate = ajv.compile(val); + return typeof validate === "function"; + } catch (e) { + return false; + } + }, { + message: "Invalid JSON schema.", + }), limit: z.number().int().positive().finite().safe().optional(), ignoreSitemap: z.boolean().default(false), includeSubdomains: z.boolean().default(true), @@ -478,10 +495,11 @@ export interface URLTrace { export interface ExtractResponse { success: boolean; + error?: string; data?: any; scrape_id?: string; + id?: string; warning?: string; - error?: string; urlTrace?: URLTrace[]; } diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 20214d72..cc2d41b8 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -4,7 +4,7 @@ import * as Sentry from "@sentry/node"; import express, { NextFunction, Request, Response } from "express"; import bodyParser from "body-parser"; import cors from "cors"; -import { getScrapeQueue } from "./services/queue-service"; +import { getExtractQueue, getScrapeQueue } from "./services/queue-service"; import { v0Router } from "./routes/v0"; import os from "os"; import { logger } from "./lib/logger"; @@ -45,7 +45,7 @@ const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`); const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({ - queues: [new BullAdapter(getScrapeQueue())], + queues: [new BullAdapter(getScrapeQueue()), new BullAdapter(getExtractQueue())], serverAdapter: serverAdapter, }); diff --git a/apps/api/src/lib/extract/extract-redis.ts b/apps/api/src/lib/extract/extract-redis.ts new file mode 100644 index 00000000..f4ed0369 --- /dev/null +++ b/apps/api/src/lib/extract/extract-redis.ts @@ -0,0 +1,38 @@ +import { redisConnection } from "../../services/queue-service"; +import { logger as _logger } from "../logger"; + +export type StoredExtract = { + id: string; + team_id: string; + plan?: string; + createdAt: number; + status: "processing" | "completed" | "failed" | "cancelled"; + error?: any; +}; + +export async function saveExtract(id: string, extract: StoredExtract) { + _logger.debug("Saving extract " + id + " to Redis..."); + await redisConnection.set("extract:" + id, JSON.stringify(extract)); + await redisConnection.expire("extract:" + id, 24 * 60 * 60, "NX"); +} + +export async function getExtract(id: string): Promise { + const x = await redisConnection.get("extract:" + id); + return x ? JSON.parse(x) : null; +} + +export async function updateExtract(id: string, extract: Partial) { + const current = await getExtract(id); + if (!current) return; + await redisConnection.set("extract:" + id, JSON.stringify({ ...current, ...extract })); + await redisConnection.expire("extract:" + id, 24 * 60 * 60, "NX"); +} + + +export async function getExtractExpiry(id: string): Promise { + const d = new Date(); + const ttl = await redisConnection.pttl("extract:" + id); + d.setMilliseconds(d.getMilliseconds() + ttl); + d.setMilliseconds(0); + return d; +} diff --git a/apps/api/src/lib/extract/extraction-service.ts b/apps/api/src/lib/extract/extraction-service.ts index 2791df3c..9793540b 100644 --- a/apps/api/src/lib/extract/extraction-service.ts +++ b/apps/api/src/lib/extract/extraction-service.ts @@ -9,6 +9,7 @@ import { billTeam } from "../../services/billing/credit_billing"; import { logJob } from "../../services/logging/log_job"; import { _addScrapeJobToBullMQ } from "../../services/queue-jobs"; import { saveCrawl, StoredCrawl } from "../crawl-redis"; +import { updateExtract } from "./extract-redis"; interface ExtractServiceOptions { request: ExtractRequest; @@ -20,7 +21,7 @@ interface ExtractServiceOptions { interface ExtractResult { success: boolean; data?: any; - scrapeId: string; + extractId: string; warning?: string; urlTrace?: URLTrace[]; error?: string; @@ -38,9 +39,8 @@ function getRootDomain(url: string): string { } } -export async function performExtraction(options: ExtractServiceOptions): Promise { +export async function performExtraction(extractId: string, options: ExtractServiceOptions): Promise { const { request, teamId, plan, subId } = options; - const scrapeId = crypto.randomUUID(); const urlTraces: URLTrace[] = []; let docs: Document[] = []; @@ -65,7 +65,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise return { success: false, error: "No valid URLs found to scrape. Try adjusting your search criteria or including more URLs.", - scrapeId, + extractId, urlTrace: urlTraces, }; } @@ -89,7 +89,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise return { success: false, error: error.message, - scrapeId, + extractId, urlTrace: urlTraces, }; } @@ -191,7 +191,7 @@ export async function performExtraction(options: ExtractServiceOptions): Promise // Log job logJob({ - job_id: scrapeId, + job_id: extractId, success: true, message: "Extract completed", num_docs: 1, @@ -203,12 +203,20 @@ export async function performExtraction(options: ExtractServiceOptions): Promise scrapeOptions: request, origin: request.origin ?? "api", num_tokens: completions.numTokens ?? 0, + }).then(() => { + updateExtract(extractId, { + status: "completed", + }).catch((error) => { + logger.error(`Failed to update extract ${extractId} status to completed: ${error}`); + }); }); + + return { success: true, data: completions.extract ?? {}, - scrapeId, + extractId, warning: completions.warning, urlTrace: request.urlTrace ? urlTraces : undefined, }; diff --git a/apps/api/src/lib/extract/team-id-sync.ts b/apps/api/src/lib/extract/team-id-sync.ts new file mode 100644 index 00000000..8cf21a14 --- /dev/null +++ b/apps/api/src/lib/extract/team-id-sync.ts @@ -0,0 +1,19 @@ +import { supabase_service } from "../../services/supabase"; +import { logger } from "../logger"; + +export async function getTeamIdSyncB(teamId: string) { + try { + const { data, error } = await supabase_service + .from("eb-sync") + .select("team_id") + .eq("team_id", teamId) + .limit(1); + if (error) { + throw new Error("Error getting team id (sync b)"); + } + return data[0] ?? null; + } catch (error) { + logger.error("Error getting team id (sync b)", error); + return null; + } +} diff --git a/apps/api/src/routes/v1.ts b/apps/api/src/routes/v1.ts index b6ab2ee8..bc500325 100644 --- a/apps/api/src/routes/v1.ts +++ b/apps/api/src/routes/v1.ts @@ -24,13 +24,7 @@ import { scrapeStatusController } from "../controllers/v1/scrape-status"; import { concurrencyCheckController } from "../controllers/v1/concurrency-check"; import { batchScrapeController } from "../controllers/v1/batch-scrape"; import { extractController } from "../controllers/v1/extract"; -// import { crawlPreviewController } from "../../src/controllers/v1/crawlPreview"; -// import { crawlJobStatusPreviewController } from "../../src/controllers/v1/status"; -// import { searchController } from "../../src/controllers/v1/search"; -// import { crawlCancelController } from "../../src/controllers/v1/crawl-cancel"; -// import { keyAuthController } from "../../src/controllers/v1/keyAuth"; -// import { livenessController } from "../controllers/v1/liveness"; -// import { readinessController } from "../controllers/v1/readiness"; +import { extractStatusController } from "../controllers/v1/extract-status"; import { creditUsageController } from "../controllers/v1/credit-usage"; import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings"; import { searchController } from "../controllers/v1/search"; @@ -215,6 +209,12 @@ v1Router.post( wrap(extractController), ); +v1Router.get( + "/extract/:jobId", + authMiddleware(RateLimiterMode.CrawlStatus), + wrap(extractStatusController), +); + // v1Router.post("/crawlWebsitePreview", crawlPreviewController); v1Router.delete( diff --git a/apps/api/src/services/queue-service.ts b/apps/api/src/services/queue-service.ts index 3cfd8c91..d3d8a4e5 100644 --- a/apps/api/src/services/queue-service.ts +++ b/apps/api/src/services/queue-service.ts @@ -3,12 +3,16 @@ import { logger } from "../lib/logger"; import IORedis from "ioredis"; let scrapeQueue: Queue; +let extractQueue: Queue; +let loggingQueue: Queue; export const redisConnection = new IORedis(process.env.REDIS_URL!, { maxRetriesPerRequest: null, }); export const scrapeQueueName = "{scrapeQueue}"; +export const extractQueueName = "{extractQueue}"; +export const loggingQueueName = "{loggingQueue}"; export function getScrapeQueue() { if (!scrapeQueue) { @@ -24,24 +28,35 @@ export function getScrapeQueue() { age: 90000, // 25 hours }, }, - }, - // { - // settings: { - // lockDuration: 1 * 60 * 1000, // 1 minute in milliseconds, - // lockRenewTime: 15 * 1000, // 15 seconds in milliseconds - // stalledInterval: 30 * 1000, - // maxStalledCount: 10, - // }, - // defaultJobOptions:{ - // attempts: 5 - // } - // } + } ); logger.info("Web scraper queue created"); } return scrapeQueue; } +export function getExtractQueue() { + if (!extractQueue) { + extractQueue = new Queue( + extractQueueName, + { + connection: redisConnection, + defaultJobOptions: { + removeOnComplete: { + age: 90000, // 25 hours + }, + removeOnFail: { + age: 90000, // 25 hours + }, + }, + } + ); + logger.info("Extraction queue created"); + } + return extractQueue; +} + + // === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE // import { QueueEvents } from 'bullmq'; // export const scrapeQueueEvents = new QueueEvents(scrapeQueueName, { connection: redisConnection.duplicate() }); diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index ac04510e..f28f3f35 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -4,8 +4,10 @@ import * as Sentry from "@sentry/node"; import { CustomError } from "../lib/custom-error"; import { getScrapeQueue, + getExtractQueue, redisConnection, scrapeQueueName, + extractQueueName, } from "./queue-service"; import { startWebScraperPipeline } from "../main/runWebScraper"; import { callWebhook } from "./webhook"; @@ -52,8 +54,10 @@ import { isUrlBlocked } from "../scraper/WebScraper/utils/blocklist"; import { BLOCKLISTED_URL_MESSAGE } from "../lib/strings"; import { indexPage } from "../lib/extract/index/pinecone"; import { Document } from "../controllers/v1/types"; +import { performExtraction } from "../lib/extract/extraction-service"; import { supabase_service } from "../services/supabase"; import { normalizeUrl, normalizeUrlOnlyHostname } from "../lib/canonical-url"; +import { saveExtract, updateExtract } from "../lib/extract/extract-redis"; configDotenv(); @@ -310,6 +314,58 @@ const processJobInternal = async (token: string, job: Job & { id: string }) => { return err; }; +const processExtractJobInternal = async (token: string, job: Job & { id: string }) => { + const logger = _logger.child({ + module: "extract-worker", + method: "processJobInternal", + jobId: job.id, + extractId: job.data.extractId, + teamId: job.data?.teamId ?? undefined, + }); + + const extendLockInterval = setInterval(async () => { + logger.info(`🔄 Worker extending lock on job ${job.id}`); + await job.extendLock(token, jobLockExtensionTime); + }, jobLockExtendInterval); + + try { + const result = await performExtraction(job.data.extractId, { + request: job.data.request, + teamId: job.data.teamId, + plan: job.data.plan, + subId: job.data.subId, + }); + + if (result.success) { + // Move job to completed state in Redis + await job.moveToCompleted(result, token, false); + return result; + } else { + throw new Error(result.error || "Unknown error during extraction"); + } + } catch (error) { + logger.error(`🚫 Job errored ${job.id} - ${error}`, { error }); + + Sentry.captureException(error, { + data: { + job: job.id, + }, + }); + + // Move job to failed state in Redis + await job.moveToFailed(error, token, false); + + await updateExtract(job.data.extractId, { + status: "failed", + error: error.error ?? error ?? "Unknown error, please contact help@firecrawl.dev. Extract id: " + job.data.extractId, + }); + // throw error; + } finally { + + clearInterval(extendLockInterval); + } +}; + let isShuttingDown = false; process.on("SIGINT", () => { @@ -466,7 +522,9 @@ const workerFun = async ( } }; +// Start both workers workerFun(getScrapeQueue(), processJobInternal); +workerFun(getExtractQueue(), processExtractJobInternal); async function processKickoffJob(job: Job & { id: string }, token: string) { const logger = _logger.child({ diff --git a/apps/js-sdk/firecrawl/src/index.ts b/apps/js-sdk/firecrawl/src/index.ts index 474eea83..e7e8b65b 100644 --- a/apps/js-sdk/firecrawl/src/index.ts +++ b/apps/js-sdk/firecrawl/src/index.ts @@ -884,21 +884,35 @@ export default class FirecrawlApp { try { const response: AxiosResponse = await this.postRequest( this.apiUrl + `/v1/extract`, - { ...jsonData, schema: jsonSchema }, + { ...jsonData, schema: jsonSchema, origin: "api-sdk" }, headers ); + if (response.status === 200) { - const responseData = response.data as ExtractResponse; - if (responseData.success) { - return { - success: true, - data: responseData.data, - warning: responseData.warning, - error: responseData.error - }; - } else { - throw new FirecrawlError(`Failed to scrape URL. Error: ${responseData.error}`, response.status); - } + const jobId = response.data.id; + let extractStatus; + do { + const statusResponse: AxiosResponse = await this.getRequest( + `${this.apiUrl}/v1/extract/${jobId}`, + headers + ); + extractStatus = statusResponse.data; + if (extractStatus.status === "completed") { + if (extractStatus.success) { + return { + success: true, + data: extractStatus.data, + warning: extractStatus.warning, + error: extractStatus.error + }; + } else { + throw new FirecrawlError(`Failed to extract data. Error: ${extractStatus.error}`, statusResponse.status); + } + } else if (extractStatus.status === "failed" || extractStatus.status === "cancelled") { + throw new FirecrawlError(`Extract job ${extractStatus.status}. Error: ${extractStatus.error}`, statusResponse.status); + } + await new Promise(resolve => setTimeout(resolve, 1000)); // Polling interval + } while (extractStatus.status !== "completed"); } else { this.handleError(response, "extract"); } @@ -908,6 +922,72 @@ export default class FirecrawlApp { return { success: false, error: "Internal server error." }; } + /** + * Initiates an asynchronous extract job for a URL using the Firecrawl API. + * @param url - The URL to extract data from. + * @param params - Additional parameters for the extract request. + * @param idempotencyKey - Optional idempotency key for the request. + * @returns The response from the extract operation. + */ + async asyncExtract( + url: string, + params?: ExtractParams, + idempotencyKey?: string + ): Promise { + const headers = this.prepareHeaders(idempotencyKey); + let jsonData: any = { url, ...params }; + let jsonSchema: any; + + try { + if (params?.schema instanceof zt.ZodType) { + jsonSchema = zodToJsonSchema(params.schema); + } else { + jsonSchema = params?.schema; + } + } catch (error: any) { + throw new FirecrawlError("Invalid schema. Schema must be either a valid Zod schema or JSON schema object.", 400); + } + + try { + const response: AxiosResponse = await this.postRequest( + this.apiUrl + `/v1/extract`, + { ...jsonData, schema: jsonSchema }, + headers + ); + + if (response.status === 200) { + return response.data; + } else { + this.handleError(response, "start extract job"); + } + } catch (error: any) { + throw new FirecrawlError(error.message, 500); + } + return { success: false, error: "Internal server error." }; + } + + /** + * Retrieves the status of an extract job. + * @param jobId - The ID of the extract job. + * @returns The status of the extract job. + */ + async getExtractStatus(jobId: string): Promise { + try { + const response: AxiosResponse = await this.getRequest( + `${this.apiUrl}/v1/extract/${jobId}`, + this.prepareHeaders() + ); + + if (response.status === 200) { + return response.data; + } else { + this.handleError(response, "get extract status"); + } + } catch (error: any) { + throw new FirecrawlError(error.message, 500); + } + } + /** * Prepares the headers for an API request. * @param idempotencyKey - Optional key to ensure idempotency. diff --git a/apps/python-sdk/firecrawl/firecrawl.py b/apps/python-sdk/firecrawl/firecrawl.py index d3216405..41f8badf 100644 --- a/apps/python-sdk/firecrawl/firecrawl.py +++ b/apps/python-sdk/firecrawl/firecrawl.py @@ -538,10 +538,12 @@ class FirecrawlApp: request_data = { **jsonData, 'allowExternalLinks': params.get('allow_external_links', False), - 'schema': schema + 'schema': schema, + 'origin': 'api-sdk' } try: + # Send the initial extract request response = self._post_request( f'{self.api_url}/v1/extract', request_data, @@ -550,7 +552,29 @@ class FirecrawlApp: if response.status_code == 200: data = response.json() if data['success']: - return data + job_id = data.get('id') + if not job_id: + raise Exception('Job ID not returned from extract request.') + + # Poll for the extract status + while True: + status_response = self._get_request( + f'{self.api_url}/v1/extract/{job_id}', + headers + ) + if status_response.status_code == 200: + status_data = status_response.json() + if status_data['status'] == 'completed': + if status_data['success']: + return status_data + else: + raise Exception(f'Failed to extract. Error: {status_data["error"]}') + elif status_data['status'] in ['failed', 'cancelled']: + raise Exception(f'Extract job {status_data["status"]}. Error: {status_data["error"]}') + else: + self._handle_error(status_response, "extract-status") + + time.sleep(2) # Polling interval else: raise Exception(f'Failed to extract. Error: {data["error"]}') else: @@ -559,6 +583,69 @@ class FirecrawlApp: raise ValueError(str(e), 500) return {'success': False, 'error': "Internal server error."} + + def get_extract_status(self, job_id: str) -> Dict[str, Any]: + """ + Retrieve the status of an extract job. + + Args: + job_id (str): The ID of the extract job. + + Returns: + Dict[str, Any]: The status of the extract job. + + Raises: + ValueError: If there is an error retrieving the status. + """ + headers = self._prepare_headers() + try: + response = self._get_request(f'{self.api_url}/v1/extract/{job_id}', headers) + if response.status_code == 200: + return response.json() + else: + self._handle_error(response, "get extract status") + except Exception as e: + raise ValueError(str(e), 500) + + def async_extract(self, urls: List[str], params: Optional[Dict[str, Any]] = None, idempotency_key: Optional[str] = None) -> Dict[str, Any]: + """ + Initiate an asynchronous extract job. + + Args: + urls (List[str]): The URLs to extract data from. + params (Optional[Dict[str, Any]]): Additional parameters for the extract request. + idempotency_key (Optional[str]): A unique key to ensure idempotency of requests. + + Returns: + Dict[str, Any]: The response from the extract operation. + + Raises: + ValueError: If there is an error initiating the extract job. + """ + headers = self._prepare_headers(idempotency_key) + + schema = params.get('schema') if params else None + if schema: + if hasattr(schema, 'model_json_schema'): + # Convert Pydantic model to JSON schema + schema = schema.model_json_schema() + # Otherwise assume it's already a JSON schema dict + + jsonData = {'urls': urls, **(params or {})} + request_data = { + **jsonData, + 'allowExternalLinks': params.get('allow_external_links', False) if params else False, + 'schema': schema + } + + try: + response = self._post_request(f'{self.api_url}/v1/extract', request_data, headers) + if response.status_code == 200: + return response.json() + else: + self._handle_error(response, "async extract") + except Exception as e: + raise ValueError(str(e), 500) def _prepare_headers(self, idempotency_key: Optional[str] = None) -> Dict[str, str]: """