From 3641070ece62d26a007f86385a4fe2aecfd96b16 Mon Sep 17 00:00:00 2001 From: rafaelmmiller <150964962+rafaelsideguide@users.noreply.github.com> Date: Thu, 13 Mar 2025 16:27:59 -0300 Subject: [PATCH] async --- apps/python-sdk/example.py | 4 +- apps/python-sdk/example_async.py | 168 +++ apps/python-sdk/firecrawl/firecrawl.py | 1770 +++++++++++++++++++++--- apps/python-sdk/requirements.txt | 3 +- 4 files changed, 1762 insertions(+), 183 deletions(-) create mode 100644 apps/python-sdk/example_async.py diff --git a/apps/python-sdk/example.py b/apps/python-sdk/example.py index fb960187..ae4258f7 100644 --- a/apps/python-sdk/example.py +++ b/apps/python-sdk/example.py @@ -47,7 +47,7 @@ while attempts > 0 and crawl_status['status'] != 'completed': attempts -= 1 time.sleep(1) -crawl_status = app.get_crawl_status(async_result['id']) +crawl_status = app.check_crawl_status(async_result['id']) print(crawl_status) # LLM Extraction: @@ -155,4 +155,4 @@ async def start_crawl_and_watch(): watcher.add_event_listener("done", on_done) # Start the watcher - await watcher.connect() + await watcher.connect() \ No newline at end of file diff --git a/apps/python-sdk/example_async.py b/apps/python-sdk/example_async.py new file mode 100644 index 00000000..7afe6a70 --- /dev/null +++ b/apps/python-sdk/example_async.py @@ -0,0 +1,168 @@ +import time +import nest_asyncio +import uuid +import asyncio +from firecrawl.firecrawl import AsyncFirecrawlApp +from pydantic import BaseModel, Field +from typing import List + +app = AsyncFirecrawlApp(api_key="fc-") + +async def example_scrape(): + # Scrape a website: + scrape_result = await app.scrape_url('firecrawl.dev') + print(scrape_result['markdown']) + +async def example_batch_scrape(): + # Batch scrape + urls = ['https://example.com', 'https://docs.firecrawl.dev'] + batch_scrape_params = { + 'formats': ['markdown', 'html'], + } + + # Synchronous batch scrape + batch_result = await app.batch_scrape_urls(urls, batch_scrape_params) + print("Synchronous Batch Scrape Result:") + print(batch_result['data'][0]['markdown']) + + # Asynchronous batch scrape + async_batch_result = await app.async_batch_scrape_urls(urls, batch_scrape_params) + print("\nAsynchronous Batch Scrape Result:") + print(async_batch_result) + +async def example_crawl(): + # Crawl a website: + idempotency_key = str(uuid.uuid4()) # optional idempotency key + crawl_result = await app.crawl_url('firecrawl.dev', {'excludePaths': ['blog/*']}, 2, idempotency_key) + print(crawl_result) + + # Asynchronous Crawl a website: + async_result = await app.async_crawl_url('firecrawl.dev', {'excludePaths': ['blog/*']}, "") + print(async_result) + + crawl_status = await app.check_crawl_status(async_result['id']) + print(crawl_status) + + attempts = 15 + while attempts > 0 and crawl_status['status'] != 'completed': + print(crawl_status) + crawl_status = await app.check_crawl_status(async_result['id']) + attempts -= 1 + await asyncio.sleep(1) # Use async sleep instead of time.sleep + + crawl_status = await app.check_crawl_status(async_result['id']) + print(crawl_status) + +async def example_llm_extraction(): + # Define schema to extract contents into using pydantic + class ArticleSchema(BaseModel): + title: str + points: int + by: str + commentsURL: str + + class TopArticlesSchema(BaseModel): + top: List[ArticleSchema] = Field(..., description="Top 5 stories") + + llm_extraction_result = await app.scrape_url('https://news.ycombinator.com', { + 'formats': ['extract'], + 'extract': { + 'schema': TopArticlesSchema.model_json_schema() + } + }) + + print(llm_extraction_result['extract']) + + # Define schema to extract contents into using json schema + json_schema = { + "type": "object", + "properties": { + "top": { + "type": "array", + "items": { + "type": "object", + "properties": { + "title": {"type": "string"}, + "points": {"type": "number"}, + "by": {"type": "string"}, + "commentsURL": {"type": "string"} + }, + "required": ["title", "points", "by", "commentsURL"] + }, + "minItems": 5, + "maxItems": 5, + "description": "Top 5 stories on Hacker News" + } + }, + "required": ["top"] + } + + app2 = AsyncFirecrawlApp(api_key="fc-", version="v0") + + llm_extraction_result = await app2.scrape_url('https://news.ycombinator.com', { + 'extractorOptions': { + 'extractionSchema': json_schema, + 'mode': 'llm-extraction' + }, + 'pageOptions':{ + 'onlyMainContent': True + } + }) + +async def example_map_and_extract(): + # Map a website: + map_result = await app.map_url('https://firecrawl.dev', { 'search': 'blog' }) + print(map_result) + + # Extract URLs: + class ExtractSchema(BaseModel): + title: str + description: str + links: List[str] + + # Define the schema using Pydantic + extract_schema = ExtractSchema.schema() + + # Perform the extraction + extract_result = await app.extract(['https://firecrawl.dev'], { + 'prompt': "Extract the title, description, and links from the website", + 'schema': extract_schema + }) + print(extract_result) + +# Define event handlers for websocket +def on_document(detail): + print("DOC", detail) + +def on_error(detail): + print("ERR", detail['error']) + +def on_done(detail): + print("DONE", detail['status']) + +async def example_websocket_crawl(): + # Initiate the crawl job and get the watcher + watcher = await app.crawl_url_and_watch('firecrawl.dev', { 'excludePaths': ['blog/*'], 'limit': 5 }) + + # Add event listeners + watcher.add_event_listener("document", on_document) + watcher.add_event_listener("error", on_error) + watcher.add_event_listener("done", on_done) + + # Start the watcher + await watcher.connect() + +async def main(): + # Apply nest_asyncio to allow nested event loops + nest_asyncio.apply() + + # Run all the examples + await example_scrape() + await example_batch_scrape() + await example_crawl() + await example_llm_extraction() + await example_map_and_extract() + await example_websocket_crawl() + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/apps/python-sdk/firecrawl/firecrawl.py b/apps/python-sdk/firecrawl/firecrawl.py index d212dea7..e0f8c940 100644 --- a/apps/python-sdk/firecrawl/firecrawl.py +++ b/apps/python-sdk/firecrawl/firecrawl.py @@ -19,6 +19,8 @@ from datetime import datetime import requests import pydantic import websockets +import aiohttp +import asyncio logger : logging.Logger = logging.getLogger("firecrawl") @@ -326,21 +328,19 @@ class FirecrawlApp: Args: url (str): Target URL to scrape - params (Optional[Dict[str, Any]]): See ScrapeParams model for configuration: - Content Options: * formats - Content types to retrieve (markdown/html/etc) * includeTags - HTML tags to include * excludeTags - HTML tags to exclude * onlyMainContent - Extract main content only - + Request Options: * headers - Custom HTTP headers * timeout - Request timeout (ms) * mobile - Use mobile user agent * proxy - Proxy type (basic/stealth) - + Extraction Options: * extract - Content extraction settings * jsonOptions - JSON extraction settings @@ -348,7 +348,6 @@ class FirecrawlApp: Returns: ScrapeResponse with: - * Requested content formats * Page metadata * Extraction results @@ -465,7 +464,7 @@ class FirecrawlApp: raise Exception(f'Failed to parse Firecrawl response as JSON.') def crawl_url(self, url: str, - params: Optional[Dict[str, Any]] = None, + params: Optional[CrawlParams] = None, poll_interval: Optional[int] = 2, idempotency_key: Optional[str] = None) -> CrawlStatusResponse: """ @@ -473,9 +472,7 @@ class FirecrawlApp: Args: url (str): Target URL to start crawling from - - params (Optional[Dict[str, Any]]): See CrawlParams model for configuration: - + params (Optional[CrawlParams]): See CrawlParams model: URL Discovery: * includePaths - Patterns of URLs to include * excludePaths - Patterns of URLs to exclude @@ -494,10 +491,8 @@ class FirecrawlApp: * deduplicateSimilarURLs - Remove similar URLs * ignoreQueryParameters - Ignore URL parameters * regexOnFullURL - Apply regex to full URLs - - poll_interval: Seconds between status checks (default: 2) - - idempotency_key: Request deduplication key + poll_interval (int): Seconds between status checks (default: 2) + idempotency_key (Optional[str]): Unique key to prevent duplicate requests Returns: CrawlStatusResponse with: @@ -667,10 +662,19 @@ class FirecrawlApp: Returns information about crawl errors. Args: - id (str): The ID of the crawl job. + id (str): The ID of the crawl job Returns: - Dict[str, Any]: Information about crawl errors. + CrawlErrorsResponse containing: + * errors (List[Dict[str, str]]): List of errors with fields: + - id (str): Error ID + - timestamp (str): When the error occurred + - url (str): URL that caused the error + - error (str): Error message + * robotsBlocked (List[str]): List of URLs blocked by robots.txt + + Raises: + Exception: If error check fails """ headers = self._prepare_headers() response = self._get_request(f'{self.api_url}/v1/crawl/{id}/errors', headers) @@ -684,13 +688,18 @@ class FirecrawlApp: def cancel_crawl(self, id: str) -> Dict[str, Any]: """ - Cancel an asynchronous crawl job using the Firecrawl API. + Cancel an asynchronous crawl job. Args: - id (str): The ID of the crawl job to cancel. + id (str): The ID of the crawl job to cancel Returns: - Dict[str, Any]: The response from the cancel crawl request. + Dict[str, Any] containing: + * success (bool): Whether cancellation was successful + * error (str, optional): Error message if cancellation failed + + Raises: + Exception: If cancellation fails """ headers = self._prepare_headers() response = self._delete_request(f'{self.api_url}/v1/crawl/{id}', headers) @@ -702,17 +711,42 @@ class FirecrawlApp: else: self._handle_error(response, "cancel crawl job") - def crawl_url_and_watch(self, url: str, params: Optional[Dict[str, Any]] = None, idempotency_key: Optional[str] = None) -> 'CrawlWatcher': + def crawl_url_and_watch( + self, + url: str, + params: Optional[CrawlParams] = None, + idempotency_key: Optional[str] = None) -> 'CrawlWatcher': """ Initiate a crawl job and return a CrawlWatcher to monitor the job via WebSocket. Args: - url (str): The URL to crawl. - params (Optional[Dict[str, Any]]): Additional parameters for the crawl request. - idempotency_key (Optional[str]): A unique uuid key to ensure idempotency of requests. + url (str): Target URL to start crawling from + params (Optional[CrawlParams]): See CrawlParams model for configuration: + URL Discovery: + * includePaths - Patterns of URLs to include + * excludePaths - Patterns of URLs to exclude + * maxDepth - Maximum crawl depth + * maxDiscoveryDepth - Maximum depth for finding new URLs + * limit - Maximum pages to crawl + + Link Following: + * allowBackwardLinks - Follow parent directory links + * allowExternalLinks - Follow external domain links + * ignoreSitemap - Skip sitemap.xml processing + + Advanced: + * scrapeOptions - Page scraping configuration + * webhook - Notification webhook settings + * deduplicateSimilarURLs - Remove similar URLs + * ignoreQueryParameters - Ignore URL parameters + * regexOnFullURL - Apply regex to full URLs + idempotency_key (Optional[str]): Unique key to prevent duplicate requests Returns: - CrawlWatcher: An instance of CrawlWatcher to monitor the crawl job. + AsyncCrawlWatcher: An instance to monitor the crawl job via WebSocket + + Raises: + Exception: If crawl job fails to start """ crawl_response = self.async_crawl_url(url, params, idempotency_key) if crawl_response['success'] and 'id' in crawl_response: @@ -725,27 +759,27 @@ class FirecrawlApp: Map and discover links from a URL. Args: - url: Target URL to map + url: Target URL to map - params: See MapParams model: + params: See MapParams model: - Discovery Options: - * search - Filter pattern for URLs - * ignoreSitemap - Skip sitemap.xml - * includeSubdomains - Include subdomain links - * sitemapOnly - Only use sitemap.xml - - Limits: - * limit - Max URLs to return - * timeout - Request timeout (ms) + Discovery Options: + * search - Filter pattern for URLs + * ignoreSitemap - Skip sitemap.xml + * includeSubdomains - Include subdomain links + * sitemapOnly - Only use sitemap.xml + + Limits: + * limit - Max URLs to return + * timeout - Request timeout (ms) Returns: - MapResponse with: - * Discovered URLs - * Success/error status + MapResponse with: + * Discovered URLs + * Success/error status Raises: - Exception: If mapping fails + Exception: If mapping fails """ endpoint = f'/v1/map' headers = self._prepare_headers() @@ -776,46 +810,40 @@ class FirecrawlApp: self._handle_error(response, 'map') def batch_scrape_urls(self, urls: List[str], - params: Optional[Dict[str, Any]] = None, + params: Optional[ScrapeParams] = None, poll_interval: Optional[int] = 2, idempotency_key: Optional[str] = None) -> BatchScrapeStatusResponse: """ Batch scrape multiple URLs and monitor until completion. Args: - urls: URLs to scrape - - params: See ScrapeParams model: - - Content Options: - * formats - Content formats to retrieve - * includeTags - HTML tags to include - * excludeTags - HTML tags to exclude - * onlyMainContent - Extract main content only + urls (List[str]): URLs to scrape + params (Optional[ScrapeParams]): See ScrapeParams model: + Content Options: + * formats - Content formats to retrieve + * includeTags - HTML tags to include + * excludeTags - HTML tags to exclude + * onlyMainContent - Extract main content only - Request Options: - * headers - Custom HTTP headers - * timeout - Request timeout (ms) - * mobile - Use mobile user agent - * proxy - Proxy type - - Extraction Options: - * extract - Content extraction config - * jsonOptions - JSON extraction config - * actions - Actions to perform - - poll_interval: Seconds between status checks (default: 2) - - idempotency_key: Request deduplication key + Request Options: + * headers - Custom HTTP headers + * timeout - Request timeout (ms) + * mobile - Use mobile user agent + * proxy - Proxy type + + Extraction Options: + * extract - Content extraction config + * jsonOptions - JSON extraction config + * actions - Actions to perform Returns: - BatchScrapeStatusResponse with: - * Scraping status and progress - * Scraped content for each URL - * Success/error information + BatchScrapeStatusResponse with: + * Scraping status and progress + * Scraped content for each URL + * Success/error information Raises: - Exception: If batch scrape fails + Exception: If batch scrape fails """ endpoint = f'/v1/batch/scrape' headers = self._prepare_headers(idempotency_key) @@ -837,21 +865,41 @@ class FirecrawlApp: def async_batch_scrape_urls( self, urls: List[str], - params: Optional[Dict[str, Any]] = None, + params: Optional[ScrapeParams] = None, idempotency_key: Optional[str] = None) -> BatchScrapeResponse: """ Initiate a batch scrape job asynchronously. Args: - urls (List[str]): The URLs to scrape. - params (Optional[Dict[str, Any]]): Additional parameters for the scraper. - idempotency_key (Optional[str]): A unique uuid key to ensure idempotency of requests. + urls (List[str]): List of URLs to scrape + params (Optional[ScrapeParams]): See ScrapeParams model for configuration: + Content Options: + * formats - Content formats to retrieve + * includeTags - HTML tags to include + * excludeTags - HTML tags to exclude + * onlyMainContent - Extract main content only + + Request Options: + * headers - Custom HTTP headers + * timeout - Request timeout (ms) + * mobile - Use mobile user agent + * proxy - Proxy type + + Extraction Options: + * extract - Content extraction config + * jsonOptions - JSON extraction config + * actions - Actions to perform + idempotency_key (Optional[str]): Unique key to prevent duplicate requests Returns: - BatchScrapeResponse: A dictionary containing the batch scrape initiation response. The structure includes: - - 'success' (bool): Indicates if the batch scrape initiation was successful. - - 'id' (str): The unique identifier for the batch scrape job. - - 'url' (str): The URL to check the status of the batch scrape job. + BatchScrapeResponse with: + * success - Whether job started successfully + * id - Unique identifier for the job + * url - Status check URL + * error - Error message if start failed + + Raises: + Exception: If job initiation fails """ endpoint = f'/v1/batch/scrape' headers = self._prepare_headers(idempotency_key) @@ -876,12 +924,32 @@ class FirecrawlApp: Initiate a batch scrape job and return a CrawlWatcher to monitor the job via WebSocket. Args: - urls (List[str]): The URLs to scrape. - params (Optional[ScrapeParams]): Additional parameters for the scraper. - idempotency_key (Optional[str]): A unique uuid key to ensure idempotency of requests. + urls (List[str]): List of URLs to scrape + params (Optional[ScrapeParams]): See ScrapeParams model for configuration: + + Content Options: + * formats - Content formats to retrieve + * includeTags - HTML tags to include + * excludeTags - HTML tags to exclude + * onlyMainContent - Extract main content only + + Request Options: + * headers - Custom HTTP headers + * timeout - Request timeout (ms) + * mobile - Use mobile user agent + * proxy - Proxy type + + Extraction Options: + * extract - Content extraction config + * jsonOptions - JSON extraction config + * actions - Actions to perform + idempotency_key (Optional[str]): Unique key to prevent duplicate requests Returns: - CrawlWatcher: An instance of CrawlWatcher to monitor the batch scrape job. + AsyncCrawlWatcher: An instance to monitor the batch scrape job via WebSocket + + Raises: + Exception: If batch scrape job fails to start """ crawl_response = self.async_batch_scrape_urls(urls, params, idempotency_key) if crawl_response['success'] and 'id' in crawl_response: @@ -964,16 +1032,16 @@ class FirecrawlApp: Returns information about batch scrape errors. Args: - id (str): The ID of the crawl job. + id (str): The ID of the crawl job. Returns: CrawlErrorsResponse: A response containing: - - errors (List[Dict[str, str]]): List of errors with fields: - - id (str): Error ID - - timestamp (str): When the error occurred - - url (str): URL that caused the error - - error (str): Error message - - robotsBlocked (List[str]): List of URLs blocked by robots.txt + * errors (List[Dict[str, str]]): List of errors with fields: + * id (str): Error ID + * timestamp (str): When the error occurred + * url (str): URL that caused the error + * error (str): Error message + * robotsBlocked (List[str]): List of URLs blocked by robots.txt """ headers = self._prepare_headers() response = self._get_request(f'{self.api_url}/v1/batch/scrape/{id}/errors', headers) @@ -997,19 +1065,19 @@ class FirecrawlApp: params: See ExtractParams model: - Extraction Config: - * prompt - Custom extraction prompt - * schema - JSON schema/Pydantic model - * systemPrompt - System context - - Behavior Options: - * allowExternalLinks - Follow external links - * enableWebSearch - Enable web search - * includeSubdomains - Include subdomains - * showSources - Include source URLs - - Scraping Options: - * scrapeOptions - Page scraping config + Extraction Config: + * prompt - Custom extraction prompt + * schema - JSON schema/Pydantic model + * systemPrompt - System context + + Behavior Options: + * allowExternalLinks - Follow external links + * enableWebSearch - Enable web search + * includeSubdomains - Include subdomains + * showSources - Include source URLs + + Scraping Options: + * scrapeOptions - Page scraping config Returns: ExtractResponse with: @@ -1120,32 +1188,40 @@ class FirecrawlApp: except Exception as e: raise ValueError(str(e), 500) - def async_extract(self, urls: List[str], params: Optional[Dict[str, Any]] = None, idempotency_key: Optional[str] = None) -> ExtractResponse[Any]: + def async_extract( + self, + urls: List[str], + params: Optional[ExtractParams] = None, + idempotency_key: Optional[str] = None) -> ExtractResponse[Any]: """ Initiate an asynchronous extract job. Args: - urls (List[str]): List of URLs to extract information from. Must be valid HTTP/HTTPS URLs. - params (Optional[Dict[str, Any]]): Extraction configuration parameters: - - prompt (str, optional): Custom prompt for extraction - - schema (Any, optional): JSON schema or Pydantic model for structured extraction - - systemPrompt (str, optional): System prompt for extraction - - allowExternalLinks (bool, optional): Allow following external links - - enableWebSearch (bool, optional): Enable web search during extraction - - includeSubdomains (bool, optional): Include content from subdomains - - origin (str, optional): Source of the extraction request - - showSources (bool, optional): Include source URLs in response - - scrapeOptions (CrawlScrapeOptions, optional): Configuration for scraping pages - idempotency_key (Optional[str]): Unique identifier to prevent duplicate requests. + urls (List[str]): URLs to extract information from + params (Optional[ExtractParams]): See ExtractParams model: + Extraction Config: + * prompt - Custom extraction prompt + * schema - JSON schema/Pydantic model + * systemPrompt - System context + + Behavior Options: + * allowExternalLinks - Follow external links + * enableWebSearch - Enable web search + * includeSubdomains - Include subdomains + * showSources - Include source URLs + + Scraping Options: + * scrapeOptions - Page scraping config + idempotency_key (Optional[str]): Unique key to prevent duplicate requests Returns: - ExtractResponse[Any]: A response containing: - - success (bool): Whether the extraction initiation was successful - - id (str): The unique identifier for the extract job - - error (str, optional): Error message if initiation failed + ExtractResponse containing: + * success (bool): Whether job started successfully + * id (str): Unique identifier for the job + * error (str, optional): Error message if start failed Raises: - ValueError: If neither prompt nor schema is provided, or if there is an error during initiation. + ValueError: If job initiation fails """ headers = self._prepare_headers(idempotency_key) @@ -1184,24 +1260,26 @@ class FirecrawlApp: Generate LLMs.txt for a given URL and poll until completion. Args: - url: Target URL to generate LLMs.txt from + url: Target URL to generate LLMs.txt from params: See GenerateLLMsTextParams model: + params: See GenerateLLMsTextParams model: - Generation Options: - * maxUrls - Maximum URLs to process (default: 10) - * showFullText - Include full text in output (default: False) - * __experimental_stream - Enable streaming of generation progress + params: See GenerateLLMsTextParams model: + + Generation Options: + * maxUrls - Maximum URLs to process (default: 10) + * showFullText - Include full text in output (default: False) Returns: - GenerateLLMsTextStatusResponse with: - * Generated LLMs.txt content - * Full version if requested - * Generation status - * Success/error information + GenerateLLMsTextStatusResponse with: + * Generated LLMs.txt content + * Full version if requested + * Generation status + * Success/error information Raises: - Exception: If generation fails + Exception: If generation fails """ if params is None: params = {} @@ -1238,20 +1316,19 @@ class FirecrawlApp: Initiate an asynchronous LLMs.txt generation operation. Args: - url (str): The target URL to generate LLMs.txt from. Must be a valid HTTP/HTTPS URL. - params (Optional[Union[Dict[str, Any], GenerateLLMsTextParams]]): Generation configuration parameters: - - maxUrls (int, optional): Maximum number of URLs to process (default: 10) - - showFullText (bool, optional): Include full text in output (default: False) - - __experimental_stream (bool, optional): Enable streaming of generation progress + url (str): The target URL to generate LLMs.txt from. Must be a valid HTTP/HTTPS URL. + params (Optional[Union[Dict[str, Any], GenerateLLMsTextParams]]): Generation configuration parameters: + * maxUrls (int, optional): Maximum number of URLs to process (default: 10) + * showFullText (bool, optional): Include full text in output (default: False) Returns: - GenerateLLMsTextResponse: A response containing: - - success (bool): Whether the generation initiation was successful - - id (str): The unique identifier for the generation job - - error (str, optional): Error message if initiation failed + GenerateLLMsTextResponse: A response containing: + - success (bool): Whether the generation initiation was successful + - id (str): The unique identifier for the generation job + - error (str, optional): Error message if initiation failed Raises: - Exception: If the generation job initiation fails. + Exception: If the generation job initiation fails. """ if params is None: params = {} @@ -1283,20 +1360,20 @@ class FirecrawlApp: Check the status of a LLMs.txt generation operation. Args: - id (str): The unique identifier of the LLMs.txt generation job to check status for. + id (str): The unique identifier of the LLMs.txt generation job to check status for. Returns: - GenerateLLMsTextStatusResponse: A response containing: - - success (bool): Whether the generation was successful - - status (str): Status of generation ("processing", "completed", "failed") - - data (Dict[str, str], optional): Generated text with fields: - - llmstxt (str): Generated LLMs.txt content - - llmsfulltxt (str, optional): Full version if requested - - error (str, optional): Error message if generation failed - - expiresAt (str): When the generated data expires + GenerateLLMsTextStatusResponse: A response containing: + * success (bool): Whether the generation was successful + * status (str): Status of generation ("processing", "completed", "failed") + * data (Dict[str, str], optional): Generated text with fields: + * llmstxt (str): Generated LLMs.txt content + * llmsfulltxt (str, optional): Full version if requested + * error (str, optional): Error message if generation failed + * expiresAt (str): When the generated data expires Raises: - Exception: If the status check fails. + Exception: If the status check fails. """ headers = self._prepare_headers() try: @@ -1525,37 +1602,37 @@ class FirecrawlApp: Initiates a deep research operation on a given query and polls until completion. Args: - query: Research query or topic to investigate + query: Research query or topic to investigate - params: See DeepResearchParams model: - Research Settings: - * maxDepth - Maximum research depth (default: 7) - * timeLimit - Time limit in seconds (default: 270) - * maxUrls - Maximum URLs to process (default: 20) + params: See DeepResearchParams model: + Research Settings: + * maxDepth - Maximum research depth (default: 7) + * timeLimit - Time limit in seconds (default: 270) + * maxUrls - Maximum URLs to process (default: 20) - Callbacks: - * on_activity - Progress callback receiving: - {type, status, message, timestamp, depth} - * on_source - Source discovery callback receiving: - {url, title, description} + Callbacks: + * on_activity - Progress callback receiving: + {type, status, message, timestamp, depth} + * on_source - Source discovery callback receiving: + {url, title, description} Returns: - DeepResearchResponse containing: + DeepResearchResponse containing: - Status: - * success - Whether research completed successfully - * status - Current state (processing/completed/failed) - * error - Error message if failed - - Results: - * id - Unique identifier for the research job - * data - Research findings and analysis - * sources - List of discovered sources - * activities - Research progress log - * summaries - Generated research summaries + Status: + * success - Whether research completed successfully + * status - Current state (processing/completed/failed) + * error - Error message if failed + + Results: + * id - Unique identifier for the research job + * data - Research findings and analysis + * sources - List of discovered sources + * activities - Research progress log + * summaries - Generated research summaries Raises: - Exception: If research fails + Exception: If research fails """ if params is None: params = {} @@ -1609,16 +1686,15 @@ class FirecrawlApp: Args: query (str): The research query to investigate. Should be a clear, specific question or topic. params (Optional[Union[Dict[str, Any], DeepResearchParams]]): Research configuration parameters: - - maxDepth (int, optional): Maximum depth of research exploration (default: 7) - - timeLimit (int, optional): Time limit in seconds for research (default: 270) - - maxUrls (int, optional): Maximum number of URLs to process (default: 20) - - __experimental_streamSteps (bool, optional): Enable streaming of research steps + * maxDepth (int, optional): Maximum depth of research exploration (default: 7) + * timeLimit (int, optional): Time limit in seconds for research (default: 270) + * maxUrls (int, optional): Maximum number of URLs to process (default: 20) Returns: - DeepResearchResponse: A response containing: - - success (bool): Whether the research initiation was successful - - id (str): The unique identifier for the research job - - error (str, optional): Error message if initiation failed + DeepResearchResponse: A response containing: + * success (bool): Whether the research initiation was successful + * id (str): The unique identifier for the research job + * error (str, optional): Error message if initiation failed Raises: Exception: If the research initiation fails. @@ -1689,6 +1765,7 @@ class FirecrawlApp: raise ValueError(str(e)) return {'success': False, 'error': 'Internal server error'} + class CrawlWatcher: """ A class to watch and handle crawl job events via WebSocket connection. @@ -1775,3 +1852,1336 @@ class CrawlWatcher: elif msg['type'] == 'document': self.data.append(msg['data']) self.dispatch_event('document', {'data': msg['data'], 'id': self.id}) + +class AsyncFirecrawlApp(FirecrawlApp): + """ + Asynchronous version of FirecrawlApp that implements async methods using aiohttp. + Provides non-blocking alternatives to all FirecrawlApp operations. + """ + + async def _async_post_request( + self, + url: str, + data: Dict[str, Any], + headers: Dict[str, str], + retries: int = 3, + backoff_factor: float = 0.5) -> Dict[str, Any]: + """ + Make an async POST request with exponential backoff retry logic. + + Args: + url (str): The URL to send the POST request to + data (Dict[str, Any]): The JSON data to include in the request body + headers (Dict[str, str]): Headers to include in the request + retries (int): Maximum number of retry attempts (default: 3) + backoff_factor (float): Factor to calculate delay between retries (default: 0.5) + Delay will be backoff_factor * (2 ** retry_count) + + Returns: + Dict[str, Any]: The parsed JSON response from the server + + Raises: + aiohttp.ClientError: If the request fails after all retries + Exception: If max retries are exceeded or other errors occur + """ + async with aiohttp.ClientSession() as session: + for attempt in range(retries): + try: + async with session.post(url, headers=headers, json=data) as response: + if response.status == 502: + await asyncio.sleep(backoff_factor * (2 ** attempt)) + continue + if response.status != 200: + await self._handle_error(response, "make POST request") + return await response.json() + except aiohttp.ClientError as e: + if attempt == retries - 1: + raise e + await asyncio.sleep(backoff_factor * (2 ** attempt)) + raise Exception("Max retries exceeded") + + async def _async_get_request( + self, + url: str, + headers: Dict[str, str], + retries: int = 3, + backoff_factor: float = 0.5) -> Dict[str, Any]: + """ + Make an async GET request with exponential backoff retry logic. + + Args: + url (str): The URL to send the GET request to + headers (Dict[str, str]): Headers to include in the request + retries (int): Maximum number of retry attempts (default: 3) + backoff_factor (float): Factor to calculate delay between retries (default: 0.5) + Delay will be backoff_factor * (2 ** retry_count) + + Returns: + Dict[str, Any]: The parsed JSON response from the server + + Raises: + aiohttp.ClientError: If the request fails after all retries + Exception: If max retries are exceeded or other errors occur + """ + async with aiohttp.ClientSession() as session: + for attempt in range(retries): + try: + async with session.get(url, headers=headers) as response: + if response.status == 502: + await asyncio.sleep(backoff_factor * (2 ** attempt)) + continue + if response.status != 200: + await self._handle_error(response, "make GET request") + return await response.json() + except aiohttp.ClientError as e: + if attempt == retries - 1: + raise e + await asyncio.sleep(backoff_factor * (2 ** attempt)) + raise Exception("Max retries exceeded") + + async def _handle_error(self, response: aiohttp.ClientResponse, action: str) -> None: + """ + Handle errors from async API responses with detailed error messages. + + Args: + response (aiohttp.ClientResponse): The response object from the failed request + action (str): Description of the action that was being attempted + + Raises: + aiohttp.ClientError: With a detailed error message based on the response status: + - 402: Payment Required + - 408: Request Timeout + - 409: Conflict + - 500: Internal Server Error + - Other: Unexpected error with status code + """ + try: + error_data = await response.json() + error_message = error_data.get('error', 'No error message provided.') + error_details = error_data.get('details', 'No additional error details provided.') + except: + raise aiohttp.ClientError(f'Failed to parse Firecrawl error response as JSON. Status code: {response.status}') + + if response.status == 402: + message = f"Payment Required: Failed to {action}. {error_message} - {error_details}" + elif response.status == 408: + message = f"Request Timeout: Failed to {action} as the request timed out. {error_message} - {error_details}" + elif response.status == 409: + message = f"Conflict: Failed to {action} due to a conflict. {error_message} - {error_details}" + elif response.status == 500: + message = f"Internal Server Error: Failed to {action}. {error_message} - {error_details}" + else: + message = f"Unexpected error during {action}: Status code {response.status}. {error_message} - {error_details}" + + raise aiohttp.ClientError(message) + + async def crawl_url_and_watch( + self, + url: str, + params: Optional[CrawlParams] = None, + idempotency_key: Optional[str] = None) -> 'AsyncCrawlWatcher': + """ + Initiate an async crawl job and return an AsyncCrawlWatcher to monitor progress via WebSocket. + + Args: + url (str): Target URL to start crawling from + params (Optional[CrawlParams]): See CrawlParams model for configuration: + URL Discovery: + * includePaths - Patterns of URLs to include + * excludePaths - Patterns of URLs to exclude + * maxDepth - Maximum crawl depth + * maxDiscoveryDepth - Maximum depth for finding new URLs + * limit - Maximum pages to crawl + + Link Following: + * allowBackwardLinks - Follow parent directory links + * allowExternalLinks - Follow external domain links + * ignoreSitemap - Skip sitemap.xml processing + + Advanced: + * scrapeOptions - Page scraping configuration + * webhook - Notification webhook settings + * deduplicateSimilarURLs - Remove similar URLs + * ignoreQueryParameters - Ignore URL parameters + * regexOnFullURL - Apply regex to full URLs + idempotency_key (Optional[str]): Unique key to prevent duplicate requests + + Returns: + AsyncCrawlWatcher: An instance to monitor the crawl job via WebSocket + + Raises: + Exception: If crawl job fails to start + """ + crawl_response = await self.async_crawl_url(url, params, idempotency_key) + if crawl_response.get('success') and 'id' in crawl_response: + return AsyncCrawlWatcher(crawl_response['id'], self) + else: + raise Exception("Crawl job failed to start") + + async def batch_scrape_urls_and_watch( + self, + urls: List[str], + params: Optional[ScrapeParams] = None, + idempotency_key: Optional[str] = None) -> 'AsyncCrawlWatcher': + """ + Initiate an async batch scrape job and return an AsyncCrawlWatcher to monitor progress. + + Args: + urls (List[str]): List of URLs to scrape + params (Optional[ScrapeParams]): See ScrapeParams model for configuration: + + Content Options: + * formats - Content formats to retrieve + * includeTags - HTML tags to include + * excludeTags - HTML tags to exclude + * onlyMainContent - Extract main content only + + Request Options: + * headers - Custom HTTP headers + * timeout - Request timeout (ms) + * mobile - Use mobile user agent + * proxy - Proxy type + + Extraction Options: + * extract - Content extraction config + * jsonOptions - JSON extraction config + * actions - Actions to perform + idempotency_key (Optional[str]): Unique key to prevent duplicate requests + + Returns: + AsyncCrawlWatcher: An instance to monitor the batch scrape job via WebSocket + + Raises: + Exception: If batch scrape job fails to start + """ + batch_response = await self.async_batch_scrape_urls(urls, params, idempotency_key) + if batch_response.get('success') and 'id' in batch_response: + return AsyncCrawlWatcher(batch_response['id'], self) + else: + raise Exception("Batch scrape job failed to start") + + async def scrape_url(self, url: str, params: Optional[Dict[str, Any]] = None) -> ScrapeResponse[Any]: + """ + Asynchronously scrape and extract content from a URL. + + Args: + url (str): Target URL to scrape + params (Optional[Dict[str, Any]]): See ScrapeParams model for configuration: + Content Options: + * formats - Content types to retrieve (markdown/html/etc) + * includeTags - HTML tags to include + * excludeTags - HTML tags to exclude + * onlyMainContent - Extract main content only + + Request Options: + * headers - Custom HTTP headers + * timeout - Request timeout (ms) + * mobile - Use mobile user agent + * proxy - Proxy type (basic/stealth) + + Extraction Options: + * extract - Content extraction settings + * jsonOptions - JSON extraction settings + * actions - Actions to perform + + Returns: + ScrapeResponse with: + * Requested content formats + * Page metadata + * Extraction results + * Success/error status + + Raises: + Exception: If scraping fails + """ + headers = self._prepare_headers() + scrape_params = {'url': url} + + if params: + extract = params.get('extract', {}) + if extract: + if 'schema' in extract and hasattr(extract['schema'], 'schema'): + extract['schema'] = extract['schema'].schema() + scrape_params['extract'] = extract + + for key, value in params.items(): + if key not in ['extract']: + scrape_params[key] = value + + endpoint = f'/v1/scrape' + response = await self._async_post_request( + f'{self.api_url}{endpoint}', + scrape_params, + headers + ) + + if response.get('success') and 'data' in response: + return response['data'] + elif "error" in response: + raise Exception(f'Failed to scrape URL. Error: {response["error"]}') + else: + raise Exception(f'Failed to scrape URL. Error: {response}') + + async def batch_scrape_urls(self, urls: List[str], params: Optional[ScrapeParams] = None) -> BatchScrapeStatusResponse: + """ + Asynchronously scrape multiple URLs and monitor until completion. + + Args: + urls (List[str]): URLs to scrape + params (Optional[ScrapeParams]): See ScrapeParams model: + Content Options: + * formats - Content formats to retrieve + * includeTags - HTML tags to include + * excludeTags - HTML tags to exclude + * onlyMainContent - Extract main content only + + Request Options: + * headers - Custom HTTP headers + * timeout - Request timeout (ms) + * mobile - Use mobile user agent + * proxy - Proxy type + + Extraction Options: + * extract - Content extraction config + * jsonOptions - JSON extraction config + * actions - Actions to perform + + Returns: + BatchScrapeStatusResponse with: + * Scraping status and progress + * Scraped content for each URL + * Success/error information + + Raises: + Exception: If batch scrape fails + """ + headers = self._prepare_headers() + json_data = {'urls': urls} + if params: + json_data.update(params) + + endpoint = f'/v1/batch/scrape' + response = await self._async_post_request( + f'{self.api_url}{endpoint}', + json_data, + headers + ) + + if response.get('success') and 'id' in response: + return await self._async_monitor_job_status(response['id'], headers) + else: + raise Exception(f'Failed to start batch scrape. Error: {response.get("error")}') + + async def async_batch_scrape_urls( + self, + urls: List[str], + params: Optional[ScrapeParams] = None, + idempotency_key: Optional[str] = None) -> BatchScrapeResponse: + """ + Initiate an asynchronous batch scrape job without waiting for completion. + + Args: + urls (List[str]): List of URLs to scrape + params (Optional[ScrapeParams]): See ScrapeParams model for configuration: + Content Options: + * formats - Content formats to retrieve + * includeTags - HTML tags to include + * excludeTags - HTML tags to exclude + * onlyMainContent - Extract main content only + + Request Options: + * headers - Custom HTTP headers + * timeout - Request timeout (ms) + * mobile - Use mobile user agent + * proxy - Proxy type + + Extraction Options: + * extract - Content extraction config + * jsonOptions - JSON extraction config + * actions - Actions to perform + idempotency_key (Optional[str]): Unique key to prevent duplicate requests + + Returns: + BatchScrapeResponse with: + * success - Whether job started successfully + * id - Unique identifier for the job + * url - Status check URL + * error - Error message if start failed + + Raises: + Exception: If job initiation fails + """ + headers = self._prepare_headers(idempotency_key) + json_data = {'urls': urls} + if params: + json_data.update(params) + + endpoint = f'/v1/batch/scrape' + return await self._async_post_request( + f'{self.api_url}{endpoint}', + json_data, + headers + ) + + async def crawl_url( + self, + url: str, + params: Optional[CrawlParams] = None, + poll_interval: int = 2, + idempotency_key: Optional[str] = None) -> CrawlStatusResponse: + """ + Asynchronously crawl a website starting from a URL and monitor until completion. + + Args: + url (str): Target URL to start crawling from + params (Optional[CrawlParams]): See CrawlParams model: + URL Discovery: + * includePaths - Patterns of URLs to include + * excludePaths - Patterns of URLs to exclude + * maxDepth - Maximum crawl depth + * maxDiscoveryDepth - Maximum depth for finding new URLs + * limit - Maximum pages to crawl + + Link Following: + * allowBackwardLinks - Follow parent directory links + * allowExternalLinks - Follow external domain links + * ignoreSitemap - Skip sitemap.xml processing + + Advanced: + * scrapeOptions - Page scraping configuration + * webhook - Notification webhook settings + * deduplicateSimilarURLs - Remove similar URLs + * ignoreQueryParameters - Ignore URL parameters + * regexOnFullURL - Apply regex to full URLs + poll_interval (int): Seconds between status checks (default: 2) + idempotency_key (Optional[str]): Unique key to prevent duplicate requests + + Returns: + CrawlStatusResponse with: + * Crawling status and progress + * Crawled page contents + * Success/error information + + Raises: + Exception: If crawl fails + """ + headers = self._prepare_headers(idempotency_key) + json_data = {'url': url} + if params: + json_data.update(params) + + endpoint = f'/v1/crawl' + response = await self._async_post_request( + f'{self.api_url}{endpoint}', + json_data, + headers + ) + + if response.get('success') and 'id' in response: + return await self._async_monitor_job_status(response['id'], headers, poll_interval) + else: + raise Exception(f'Failed to start crawl. Error: {response.get("error")}') + + async def async_crawl_url(self, url: str, params: Optional[Dict[str, Any]] = None, idempotency_key: Optional[str] = None) -> CrawlResponse: + """ + Initiate an asynchronous crawl job without waiting for completion. + + Args: + url (str): Target URL to start crawling from + params (Optional[Dict[str, Any]]): See CrawlParams model: + URL Discovery: + * includePaths - Patterns of URLs to include + * excludePaths - Patterns of URLs to exclude + * maxDepth - Maximum crawl depth + * maxDiscoveryDepth - Maximum depth for finding new URLs + * limit - Maximum pages to crawl + + Link Following: + * allowBackwardLinks - Follow parent directory links + * allowExternalLinks - Follow external domain links + * ignoreSitemap - Skip sitemap.xml processing + + Advanced: + * scrapeOptions - Page scraping configuration + * webhook - Notification webhook settings + * deduplicateSimilarURLs - Remove similar URLs + * ignoreQueryParameters - Ignore URL parameters + * regexOnFullURL - Apply regex to full URLs + idempotency_key (Optional[str]): Unique key to prevent duplicate requests + + Returns: + CrawlResponse with: + * success - Whether job started successfully + * id - Unique identifier for the job + * url - Status check URL + * error - Error message if start failed + + Raises: + Exception: If job initiation fails + """ + headers = self._prepare_headers(idempotency_key) + json_data = {'url': url} + if params: + json_data.update(params) + + endpoint = f'/v1/crawl' + return await self._async_post_request( + f'{self.api_url}{endpoint}', + json_data, + headers + ) + + async def check_crawl_status(self, id: str) -> CrawlStatusResponse: + """ + Check the status and results of an asynchronous crawl job. + + Args: + id (str): Unique identifier for the crawl job + + Returns: + CrawlStatusResponse containing: + Status Information: + * status - Current state (scraping/completed/failed/cancelled) + * completed - Number of pages crawled + * total - Total pages to crawl + * creditsUsed - API credits consumed + * expiresAt - Data expiration timestamp + + Results: + * data - List of crawled documents + * next - URL for next page of results (if paginated) + * success - Whether status check succeeded + * error - Error message if failed + + Raises: + Exception: If status check fails + """ + headers = self._prepare_headers() + endpoint = f'/v1/crawl/{id}' + + status_data = await self._async_get_request( + f'{self.api_url}{endpoint}', + headers + ) + + if status_data['status'] == 'completed': + if 'data' in status_data: + data = status_data['data'] + while 'next' in status_data: + if len(status_data['data']) == 0: + break + next_url = status_data.get('next') + if not next_url: + logger.warning("Expected 'next' URL is missing.") + break + next_data = await self._async_get_request(next_url, headers) + data.extend(next_data.get('data', [])) + status_data = next_data + status_data['data'] = data + + response = { + 'status': status_data.get('status'), + 'total': status_data.get('total'), + 'completed': status_data.get('completed'), + 'creditsUsed': status_data.get('creditsUsed'), + 'expiresAt': status_data.get('expiresAt'), + 'data': status_data.get('data') + } + + if 'error' in status_data: + response['error'] = status_data['error'] + + if 'next' in status_data: + response['next'] = status_data['next'] + + return { + 'success': False if 'error' in status_data else True, + **response + } + + async def _async_monitor_job_status(self, id: str, headers: Dict[str, str], poll_interval: int = 2) -> CrawlStatusResponse: + """ + Monitor the status of an asynchronous job until completion. + + Args: + id (str): The ID of the job to monitor + headers (Dict[str, str]): Headers to include in status check requests + poll_interval (int): Seconds between status checks (default: 2) + + Returns: + CrawlStatusResponse: The job results if completed successfully + + Raises: + Exception: If the job fails or an error occurs during status checks + """ + while True: + status_data = await self._async_get_request( + f'{self.api_url}/v1/crawl/{id}', + headers + ) + + if status_data['status'] == 'completed': + if 'data' in status_data: + data = status_data['data'] + while 'next' in status_data: + if len(status_data['data']) == 0: + break + next_url = status_data.get('next') + if not next_url: + logger.warning("Expected 'next' URL is missing.") + break + next_data = await self._async_get_request(next_url, headers) + data.extend(next_data.get('data', [])) + status_data = next_data + status_data['data'] = data + return status_data + else: + raise Exception('Job completed but no data was returned') + elif status_data['status'] in ['active', 'paused', 'pending', 'queued', 'waiting', 'scraping']: + await asyncio.sleep(max(poll_interval, 2)) + else: + raise Exception(f'Job failed or was stopped. Status: {status_data["status"]}') + + async def map_url(self, url: str, params: Optional[Dict[str, Any]] = None) -> MapResponse: + """ + Asynchronously map and discover links from a URL. + + Args: + url (str): Target URL to map + params (Optional[Dict[str, Any]]): See MapParams model: + Discovery Options: + * search - Filter pattern for URLs + * ignoreSitemap - Skip sitemap.xml + * includeSubdomains - Include subdomain links + * sitemapOnly - Only use sitemap.xml + + Limits: + * limit - Max URLs to return + * timeout - Request timeout (ms) + + Returns: + MapResponse with: + * Discovered URLs + * Success/error status + + Raises: + Exception: If mapping fails + """ + headers = self._prepare_headers() + json_data = {'url': url} + if params: + json_data.update(params) + + endpoint = f'/v1/map' + response = await self._async_post_request( + f'{self.api_url}{endpoint}', + json_data, + headers + ) + + if response.get('success') and 'links' in response: + return response + elif 'error' in response: + raise Exception(f'Failed to map URL. Error: {response["error"]}') + else: + raise Exception(f'Failed to map URL. Error: {response}') + + async def extract(self, urls: List[str], params: Optional[ExtractParams] = None) -> ExtractResponse[Any]: + """ + Asynchronously extract structured information from URLs. + + Args: + urls (List[str]): URLs to extract from + params (Optional[ExtractParams]): See ExtractParams model: + Extraction Config: + * prompt - Custom extraction prompt + * schema - JSON schema/Pydantic model + * systemPrompt - System context + + Behavior Options: + * allowExternalLinks - Follow external links + * enableWebSearch - Enable web search + * includeSubdomains - Include subdomains + * showSources - Include source URLs + + Scraping Options: + * scrapeOptions - Page scraping config + + Returns: + ExtractResponse with: + * Structured data matching schema + * Source information if requested + * Success/error status + + Raises: + ValueError: If prompt/schema missing or extraction fails + """ + headers = self._prepare_headers() + + if not params or (not params.get('prompt') and not params.get('schema')): + raise ValueError("Either prompt or schema is required") + + schema = params.get('schema') + if schema: + if hasattr(schema, 'model_json_schema'): + schema = schema.model_json_schema() + + request_data = { + 'urls': urls, + 'allowExternalLinks': params.get('allow_external_links', params.get('allowExternalLinks', False)), + 'enableWebSearch': params.get('enable_web_search', params.get('enableWebSearch', False)), + 'showSources': params.get('show_sources', params.get('showSources', False)), + 'schema': schema, + 'origin': 'api-sdk' + } + + if params.get('prompt'): + request_data['prompt'] = params['prompt'] + if params.get('system_prompt'): + request_data['systemPrompt'] = params['system_prompt'] + elif params.get('systemPrompt'): + request_data['systemPrompt'] = params['systemPrompt'] + + response = await self._async_post_request( + f'{self.api_url}/v1/extract', + request_data, + headers + ) + + if response.get('success'): + job_id = response.get('id') + if not job_id: + raise Exception('Job ID not returned from extract request.') + + while True: + status_data = await self._async_get_request( + f'{self.api_url}/v1/extract/{job_id}', + headers + ) + + if status_data['status'] == 'completed': + return status_data + elif status_data['status'] in ['failed', 'cancelled']: + raise Exception(f'Extract job {status_data["status"]}. Error: {status_data["error"]}') + + await asyncio.sleep(2) + else: + raise Exception(f'Failed to extract. Error: {response.get("error")}') + + async def check_batch_scrape_status(self, id: str) -> BatchScrapeStatusResponse: + """ + Check the status of an asynchronous batch scrape job. + + Args: + id (str): The ID of the batch scrape job + + Returns: + BatchScrapeStatusResponse containing: + Status Information: + * status - Current state (scraping/completed/failed/cancelled) + * completed - Number of URLs scraped + * total - Total URLs to scrape + * creditsUsed - API credits consumed + * expiresAt - Data expiration timestamp + + Results: + * data - List of scraped documents + * next - URL for next page of results (if paginated) + * success - Whether status check succeeded + * error - Error message if failed + + Raises: + Exception: If status check fails + """ + headers = self._prepare_headers() + endpoint = f'/v1/batch/scrape/{id}' + + status_data = await self._async_get_request( + f'{self.api_url}{endpoint}', + headers + ) + + if status_data['status'] == 'completed': + if 'data' in status_data: + data = status_data['data'] + while 'next' in status_data: + if len(status_data['data']) == 0: + break + next_url = status_data.get('next') + if not next_url: + logger.warning("Expected 'next' URL is missing.") + break + next_data = await self._async_get_request(next_url, headers) + data.extend(next_data.get('data', [])) + status_data = next_data + status_data['data'] = data + + response = { + 'status': status_data.get('status'), + 'total': status_data.get('total'), + 'completed': status_data.get('completed'), + 'creditsUsed': status_data.get('creditsUsed'), + 'expiresAt': status_data.get('expiresAt'), + 'data': status_data.get('data') + } + + if 'error' in status_data: + response['error'] = status_data['error'] + + if 'next' in status_data: + response['next'] = status_data['next'] + + return { + 'success': False if 'error' in status_data else True, + **response + } + + async def check_batch_scrape_errors(self, id: str) -> CrawlErrorsResponse: + """ + Get information about errors from an asynchronous batch scrape job. + + Args: + id (str): The ID of the batch scrape job + + Returns: + CrawlErrorsResponse containing: + errors (List[Dict[str, str]]): List of errors with fields: + * id (str): Error ID + * timestamp (str): When the error occurred + * url (str): URL that caused the error + * error (str): Error message + * robotsBlocked (List[str]): List of URLs blocked by robots.txt + + Raises: + Exception: If error check fails + """ + headers = self._prepare_headers() + return await self._async_get_request( + f'{self.api_url}/v1/batch/scrape/{id}/errors', + headers + ) + + async def check_crawl_errors(self, id: str) -> CrawlErrorsResponse: + """ + Get information about errors from an asynchronous crawl job. + + Args: + id (str): The ID of the crawl job + + Returns: + CrawlErrorsResponse containing: + * errors (List[Dict[str, str]]): List of errors with fields: + - id (str): Error ID + - timestamp (str): When the error occurred + - url (str): URL that caused the error + - error (str): Error message + * robotsBlocked (List[str]): List of URLs blocked by robots.txt + + Raises: + Exception: If error check fails + """ + headers = self._prepare_headers() + return await self._async_get_request( + f'{self.api_url}/v1/crawl/{id}/errors', + headers + ) + + async def cancel_crawl(self, id: str) -> Dict[str, Any]: + """ + Cancel an asynchronous crawl job. + + Args: + id (str): The ID of the crawl job to cancel + + Returns: + Dict[str, Any] containing: + * success (bool): Whether cancellation was successful + * error (str, optional): Error message if cancellation failed + + Raises: + Exception: If cancellation fails + """ + headers = self._prepare_headers() + async with aiohttp.ClientSession() as session: + async with session.delete(f'{self.api_url}/v1/crawl/{id}', headers=headers) as response: + return await response.json() + + async def get_extract_status(self, job_id: str) -> ExtractResponse[Any]: + """ + Check the status of an asynchronous extraction job. + + Args: + job_id (str): The ID of the extraction job + + Returns: + ExtractResponse containing: + * success (bool): Whether extraction completed successfully + * data (Any): Extracted structured data + * error (str, optional): Error message if extraction failed + * warning (str, optional): Warning message if any + * sources (List[str], optional): Source URLs if requested + + Raises: + ValueError: If status check fails + """ + headers = self._prepare_headers() + try: + return await self._async_get_request( + f'{self.api_url}/v1/extract/{job_id}', + headers + ) + except Exception as e: + raise ValueError(str(e)) + + async def async_extract( + self, + urls: List[str], + params: Optional[ExtractParams] = None, + idempotency_key: Optional[str] = None) -> ExtractResponse[Any]: + """ + Initiate an asynchronous extraction job without waiting for completion. + + Args: + urls (List[str]): URLs to extract information from + params (Optional[ExtractParams]): See ExtractParams model: + Extraction Config: + * prompt - Custom extraction prompt + * schema - JSON schema/Pydantic model + * systemPrompt - System context + + Behavior Options: + * allowExternalLinks - Follow external links + * enableWebSearch - Enable web search + * includeSubdomains - Include subdomains + * showSources - Include source URLs + + Scraping Options: + * scrapeOptions - Page scraping config + idempotency_key (Optional[str]): Unique key to prevent duplicate requests + + Returns: + ExtractResponse containing: + * success (bool): Whether job started successfully + * id (str): Unique identifier for the job + * error (str, optional): Error message if start failed + + Raises: + ValueError: If job initiation fails + """ + headers = self._prepare_headers(idempotency_key) + + schema = params.get('schema') if params else None + if schema: + if hasattr(schema, 'model_json_schema'): + schema = schema.model_json_schema() + + jsonData = {'urls': urls, **(params or {})} + request_data = { + **jsonData, + 'allowExternalLinks': params.get('allow_external_links', False) if params else False, + 'schema': schema, + 'origin': 'api-sdk' + } + + try: + return await self._async_post_request( + f'{self.api_url}/v1/extract', + request_data, + headers + ) + except Exception as e: + raise ValueError(str(e)) + + async def generate_llms_text(self, url: str, params: Optional[Union[Dict[str, Any], GenerateLLMsTextParams]] = None) -> GenerateLLMsTextStatusResponse: + """ + Generate LLMs.txt for a given URL and monitor until completion. + + Args: + url (str): Target URL to generate LLMs.txt from + params (Optional[Union[Dict[str, Any], GenerateLLMsTextParams]]): See GenerateLLMsTextParams model: + Generation Options: + * maxUrls - Maximum URLs to process (default: 10) + * showFullText - Include full text in output (default: False) + + Returns: + GenerateLLMsTextStatusResponse containing: + * success (bool): Whether generation completed successfully + * status (str): Status of generation (processing/completed/failed) + * data (Dict[str, str], optional): Generated text with fields: + - llmstxt (str): Generated LLMs.txt content + - llmsfulltxt (str, optional): Full version if requested + * error (str, optional): Error message if generation failed + * expiresAt (str): When the generated data expires + + Raises: + Exception: If generation fails + """ + if params is None: + params = {} + + if isinstance(params, dict): + generation_params = GenerateLLMsTextParams(**params) + else: + generation_params = params + + response = await self.async_generate_llms_text(url, generation_params) + if not response.get('success') or 'id' not in response: + return response + + job_id = response['id'] + while True: + status = await self.check_generate_llms_text_status(job_id) + + if status['status'] == 'completed': + return status + elif status['status'] == 'failed': + raise Exception(f'LLMs.txt generation failed. Error: {status.get("error")}') + elif status['status'] != 'processing': + break + + await asyncio.sleep(2) + + return {'success': False, 'error': 'LLMs.txt generation job terminated unexpectedly'} + + async def async_generate_llms_text(self, url: str, params: Optional[Union[Dict[str, Any], GenerateLLMsTextParams]] = None) -> GenerateLLMsTextResponse: + """ + Initiate an asynchronous LLMs.txt generation job without waiting for completion. + + Args: + url (str): Target URL to generate LLMs.txt from + params (Optional[Union[Dict[str, Any], GenerateLLMsTextParams]]): See GenerateLLMsTextParams model: + Generation Options: + * maxUrls - Maximum URLs to process (default: 10) + * showFullText - Include full text in output (default: False) + + Returns: + GenerateLLMsTextResponse containing: + * success (bool): Whether job started successfully + * id (str): Unique identifier for the job + * error (str, optional): Error message if start failed + + Raises: + ValueError: If job initiation fails + """ + if params is None: + params = {} + + if isinstance(params, dict): + generation_params = GenerateLLMsTextParams(**params) + else: + generation_params = params + + headers = self._prepare_headers() + json_data = {'url': url, **generation_params.dict(exclude_none=True)} + + try: + return await self._async_post_request( + f'{self.api_url}/v1/llmstxt', + json_data, + headers + ) + except Exception as e: + raise ValueError(str(e)) + + async def check_generate_llms_text_status(self, id: str) -> GenerateLLMsTextStatusResponse: + """ + Check the status of an asynchronous LLMs.txt generation job. + + Args: + id (str): The ID of the generation job + + Returns: + GenerateLLMsTextStatusResponse containing: + * success (bool): Whether generation completed successfully + * status (str): Status of generation (processing/completed/failed) + * data (Dict[str, str], optional): Generated text with fields: + - llmstxt (str): Generated LLMs.txt content + - llmsfulltxt (str, optional): Full version if requested + * error (str, optional): Error message if generation failed + * expiresAt (str): When the generated data expires + + Raises: + ValueError: If status check fails + """ + headers = self._prepare_headers() + try: + return await self._async_get_request( + f'{self.api_url}/v1/llmstxt/{id}', + headers + ) + except Exception as e: + raise ValueError(str(e)) + + async def deep_research( + self, + query: str, + params: Optional[Union[Dict[str, Any], DeepResearchParams]] = None, + on_activity: Optional[Callable[[Dict[str, Any]], None]] = None, + on_source: Optional[Callable[[Dict[str, Any]], None]] = None) -> DeepResearchStatusResponse: + """ + Initiates a deep research operation on a given query and polls until completion, providing real-time updates via callbacks. + + Args: + query: Research query or topic to investigate + + params: See DeepResearchParams model: + Research Settings: + * maxDepth - Maximum research depth (default: 7) + * timeLimit - Time limit in seconds (default: 270) + * maxUrls - Maximum URLs to process (default: 20) + + Callbacks: + * on_activity - Progress callback receiving: + {type, status, message, timestamp, depth} + * on_source - Source discovery callback receiving: + {url, title, description} + + Returns: + DeepResearchResponse containing: + + Status: + * success - Whether research completed successfully + * status - Current state (processing/completed/failed) + * error - Error message if failed + + Results: + * id - Unique identifier for the research job + * data - Research findings and analysis + * sources - List of discovered sources + * activities - Research progress log + * summaries - Generated research summaries + + Raises: + Exception: If research fails + """ + if params is None: + params = {} + + if isinstance(params, dict): + research_params = DeepResearchParams(**params) + else: + research_params = params + + response = await self.async_deep_research(query, research_params) + if not response.get('success') or 'id' not in response: + return response + + job_id = response['id'] + last_activity_count = 0 + last_source_count = 0 + + while True: + status = await self.check_deep_research_status(job_id) + + if on_activity and 'activities' in status: + new_activities = status['activities'][last_activity_count:] + for activity in new_activities: + on_activity(activity) + last_activity_count = len(status['activities']) + + if on_source and 'sources' in status: + new_sources = status['sources'][last_source_count:] + for source in new_sources: + on_source(source) + last_source_count = len(status['sources']) + + if status['status'] == 'completed': + return status + elif status['status'] == 'failed': + raise Exception(f'Deep research failed. Error: {status.get("error")}') + elif status['status'] != 'processing': + break + + await asyncio.sleep(2) + + return {'success': False, 'error': 'Deep research job terminated unexpectedly'} + + async def async_deep_research(self, query: str, params: Optional[Union[Dict[str, Any], DeepResearchParams]] = None) -> DeepResearchResponse: + """ + Initiate an asynchronous deep research job without waiting for completion. + + Args: + query (str): Research query or topic to investigate + params (Optional[Union[Dict[str, Any], DeepResearchParams]]): See DeepResearchParams model: + Research Settings: + * maxDepth - Maximum research depth (default: 7) + * timeLimit - Time limit in seconds (default: 270) + * maxUrls - Maximum URLs to process (default: 20) + + Returns: + DeepResearchResponse containing: + * success (bool): Whether job started successfully + * id (str): Unique identifier for the job + * error (str, optional): Error message if start failed + + Raises: + ValueError: If job initiation fails + """ + if params is None: + params = {} + + if isinstance(params, dict): + research_params = DeepResearchParams(**params) + else: + research_params = params + + headers = self._prepare_headers() + json_data = {'query': query, **research_params.dict(exclude_none=True)} + + try: + return await self._async_post_request( + f'{self.api_url}/v1/deep-research', + json_data, + headers + ) + except Exception as e: + raise ValueError(str(e)) + + async def check_deep_research_status(self, id: str) -> DeepResearchStatusResponse: + """ + Check the status of an asynchronous deep research job. + + Args: + id (str): The ID of the research job + + Returns: + DeepResearchStatusResponse containing: + * success (bool): Whether research completed successfully + * status (str): Current state (processing/completed/failed) + * data (Dict[str, Any], optional): Research findings and analysis + * error (str, optional): Error message if failed + * expiresAt (str): When the research data expires + * currentDepth (int): Current research depth + * maxDepth (int): Maximum research depth + * activities (List[Dict[str, Any]]): Research progress log + * sources (List[Dict[str, Any]]): Discovered sources + * summaries (List[str]): Generated research summaries + + Raises: + ValueError: If status check fails + """ + headers = self._prepare_headers() + try: + return await self._async_get_request( + f'{self.api_url}/v1/deep-research/{id}', + headers + ) + except Exception as e: + raise ValueError(str(e)) + + async def search(self, query: str, params: Optional[Union[Dict[str, Any], SearchParams]] = None) -> SearchResponse: + """ + Asynchronously search for content using Firecrawl. + + Args: + query (str): Search query string + params (Optional[Union[Dict[str, Any], SearchParams]]): See SearchParams model: + Search Options: + * limit - Max results (default: 5) + * tbs - Time filter (e.g. "qdr:d") + * filter - Custom result filter + + Localization: + * lang - Language code (default: "en") + * country - Country code (default: "us") + * location - Geo-targeting + + Request Options: + * timeout - Request timeout (ms) + * scrapeOptions - Result scraping config + + Returns: + SearchResponse containing: + * success (bool): Whether search completed successfully + * data (List[FirecrawlDocument]): Search results + * warning (str, optional): Warning message if any + * error (str, optional): Error message if search failed + + Raises: + Exception: If search fails + """ + if params is None: + params = {} + + if isinstance(params, dict): + search_params = SearchParams(query=query, **params) + else: + search_params = params + search_params.query = query + + return await self._async_post_request( + f"{self.api_url}/v1/search", + search_params.dict(exclude_none=True), + {"Authorization": f"Bearer {self.api_key}"} + ) + +class AsyncCrawlWatcher(CrawlWatcher): + """ + Async version of CrawlWatcher that properly handles async operations. + """ + def __init__(self, id: str, app: AsyncFirecrawlApp): + super().__init__(id, app) + + async def connect(self) -> None: + """ + Establishes async WebSocket connection and starts listening for messages. + """ + async with websockets.connect(self.ws_url, extra_headers={"Authorization": f"Bearer {self.app.api_key}"}) as websocket: + await self._listen(websocket) + + async def _listen(self, websocket) -> None: + """ + Listens for incoming WebSocket messages and handles them asynchronously. + + Args: + websocket: The WebSocket connection object + """ + async for message in websocket: + msg = json.loads(message) + await self._handle_message(msg) + + async def _handle_message(self, msg: Dict[str, Any]) -> None: + """ + Handles incoming WebSocket messages based on their type asynchronously. + + Args: + msg (Dict[str, Any]): The message to handle + """ + if msg['type'] == 'done': + self.status = 'completed' + self.dispatch_event('done', {'status': self.status, 'data': self.data, 'id': self.id}) + elif msg['type'] == 'error': + self.status = 'failed' + self.dispatch_event('error', {'status': self.status, 'data': self.data, 'error': msg['error'], 'id': self.id}) + elif msg['type'] == 'catchup': + self.status = msg['data']['status'] + self.data.extend(msg['data'].get('data', [])) + for doc in self.data: + self.dispatch_event('document', {'data': doc, 'id': self.id}) + elif msg['type'] == 'document': + self.data.append(msg['data']) + self.dispatch_event('document', {'data': msg['data'], 'id': self.id}) + + async def _handle_error(self, response: aiohttp.ClientResponse, action: str) -> None: + """ + Handle errors from async API responses. + """ + try: + error_data = await response.json() + error_message = error_data.get('error', 'No error message provided.') + error_details = error_data.get('details', 'No additional error details provided.') + except: + raise aiohttp.ClientError(f'Failed to parse Firecrawl error response as JSON. Status code: {response.status}') + + if response.status == 402: + message = f"Payment Required: Failed to {action}. {error_message} - {error_details}" + elif response.status == 408: + message = f"Request Timeout: Failed to {action} as the request timed out. {error_message} - {error_details}" + elif response.status == 409: + message = f"Conflict: Failed to {action} due to a conflict. {error_message} - {error_details}" + elif response.status == 500: + message = f"Internal Server Error: Failed to {action}. {error_message} - {error_details}" + else: + message = f"Unexpected error during {action}: Status code {response.status}. {error_message} - {error_details}" + + raise aiohttp.ClientError(message) diff --git a/apps/python-sdk/requirements.txt b/apps/python-sdk/requirements.txt index 5dcd8f6c..360d9e76 100644 --- a/apps/python-sdk/requirements.txt +++ b/apps/python-sdk/requirements.txt @@ -3,4 +3,5 @@ pytest python-dotenv websockets nest-asyncio -pydantic \ No newline at end of file +pydantic +aiohttp \ No newline at end of file