mirror of
https://git.mirrors.martin98.com/https://github.com/mendableai/firecrawl
synced 2025-08-14 07:55:56 +08:00
sdk(v3): removed deep research and llmtxt
This commit is contained in:
parent
c5b64bd294
commit
3d7ed80db7
@ -22,8 +22,8 @@ import websockets
|
||||
import aiohttp
|
||||
import asyncio
|
||||
from pydantic import Field
|
||||
from .utils import convert_dict_keys_to_snake_case, convert_to_dot_dict, DotDict, DeepResearchResponse, DeepResearchData, DeepResearchDataSource, parse_scrape_options, ensure_schema_dict, scrape_formats_transform, scrape_formats_response_transform, change_tracking_response_transform
|
||||
from .types import LocationConfig, WebhookConfig, ChangeTrackingOptions, ScrapeOptions, ScrapeResponse, SearchResponse, CrawlStatusResponse, WaitAction, ScreenshotAction, ClickAction, WriteAction, PressAction, ScrollAction, ScrapeAction, ExecuteJavascriptAction, JsonConfig, CrawlResponse, CrawlErrorsResponse, CrawlParams, MapParams, MapResponse, AgentOptions, BatchScrapeStatusResponse, BatchScrapeResponse, ExtractResponse, GenerateLLMsTextStatusResponse, GenerateLLMsTextParams, GenerateLLMsTextResponse, ScrapeParams, DeepResearchParams, DeepResearchStatusResponse, DeepResearchResponse, SearchParams
|
||||
from .utils import parse_scrape_options, ensure_schema_dict, scrape_formats_transform, scrape_formats_response_transform, change_tracking_response_transform
|
||||
from .types import LocationConfig, WebhookConfig, ChangeTrackingOptions, ScrapeOptions, ScrapeResponse, SearchResponse, CrawlStatusResponse, WaitAction, ScreenshotAction, ClickAction, WriteAction, PressAction, ScrollAction, ScrapeAction, ExecuteJavascriptAction, JsonConfig, CrawlResponse, CrawlErrorsResponse, CrawlParams, MapParams, MapResponse, AgentOptions, BatchScrapeStatusResponse, BatchScrapeResponse, ExtractResponse, ScrapeParams, SearchParams
|
||||
|
||||
def get_version():
|
||||
try:
|
||||
@ -1431,164 +1431,6 @@ class FirecrawlApp:
|
||||
except Exception as e:
|
||||
raise ValueError(str(e), 500)
|
||||
|
||||
def generate_llms_text(
|
||||
self,
|
||||
url: str,
|
||||
*,
|
||||
max_urls: Optional[int] = None,
|
||||
show_full_text: Optional[bool] = None,
|
||||
experimental_stream: Optional[bool] = None) -> GenerateLLMsTextStatusResponse:
|
||||
"""
|
||||
Generate LLMs.txt for a given URL and poll until completion.
|
||||
|
||||
Args:
|
||||
url (str): Target URL to generate LLMs.txt from
|
||||
max_urls (Optional[int]): Maximum URLs to process (default: 10)
|
||||
show_full_text (Optional[bool]): Include full text in output (default: False)
|
||||
experimental_stream (Optional[bool]): Enable experimental streaming
|
||||
|
||||
Returns:
|
||||
GenerateLLMsTextStatusResponse with:
|
||||
* Generated LLMs.txt content
|
||||
* Full version if requested
|
||||
* Generation status
|
||||
* Success/error information
|
||||
|
||||
Raises:
|
||||
Exception: If generation fails
|
||||
"""
|
||||
params = GenerateLLMsTextParams(
|
||||
maxUrls=max_urls,
|
||||
showFullText=show_full_text,
|
||||
__experimental_stream=experimental_stream
|
||||
)
|
||||
|
||||
response = self.async_generate_llms_text(
|
||||
url,
|
||||
max_urls=max_urls,
|
||||
show_full_text=show_full_text,
|
||||
experimental_stream=experimental_stream
|
||||
)
|
||||
|
||||
if not response.success or not response.id:
|
||||
return GenerateLLMsTextStatusResponse(
|
||||
success=False,
|
||||
error='Failed to start LLMs.txt generation',
|
||||
status='failed',
|
||||
expiresAt=''
|
||||
)
|
||||
|
||||
job_id = response.id
|
||||
while True:
|
||||
status = self.check_generate_llms_text_status(job_id)
|
||||
|
||||
if status.status == 'completed':
|
||||
return status
|
||||
elif status.status == 'failed':
|
||||
return status
|
||||
elif status.status != 'processing':
|
||||
return GenerateLLMsTextStatusResponse(
|
||||
success=False,
|
||||
error='LLMs.txt generation job terminated unexpectedly',
|
||||
status='failed',
|
||||
expiresAt=''
|
||||
)
|
||||
|
||||
time.sleep(2) # Polling interval
|
||||
|
||||
def async_generate_llms_text(
|
||||
self,
|
||||
url: str,
|
||||
*,
|
||||
max_urls: Optional[int] = None,
|
||||
show_full_text: Optional[bool] = None,
|
||||
experimental_stream: Optional[bool] = None) -> GenerateLLMsTextResponse:
|
||||
"""
|
||||
Initiate an asynchronous LLMs.txt generation operation.
|
||||
|
||||
Args:
|
||||
url (str): The target URL to generate LLMs.txt from. Must be a valid HTTP/HTTPS URL.
|
||||
max_urls (Optional[int]): Maximum URLs to process (default: 10)
|
||||
show_full_text (Optional[bool]): Include full text in output (default: False)
|
||||
experimental_stream (Optional[bool]): Enable experimental streaming
|
||||
|
||||
Returns:
|
||||
GenerateLLMsTextResponse: A response containing:
|
||||
* success (bool): Whether the generation initiation was successful
|
||||
* id (str): The unique identifier for the generation job
|
||||
* error (str, optional): Error message if initiation failed
|
||||
|
||||
Raises:
|
||||
Exception: If the generation job initiation fails.
|
||||
"""
|
||||
params = GenerateLLMsTextParams(
|
||||
maxUrls=max_urls,
|
||||
showFullText=show_full_text,
|
||||
__experimental_stream=experimental_stream
|
||||
)
|
||||
|
||||
headers = self._prepare_headers()
|
||||
json_data = {'url': url, **params.model_dump(exclude_none=True)}
|
||||
json_data['origin'] = f"python-sdk@{version}"
|
||||
|
||||
try:
|
||||
req = self._post_request(f'{self.api_url}/v1/llmstxt', json_data, headers)
|
||||
response = req.json()
|
||||
print("json_data", json_data)
|
||||
print("response", response)
|
||||
if response.get('success'):
|
||||
try:
|
||||
return GenerateLLMsTextResponse(**response)
|
||||
except:
|
||||
raise Exception('Failed to parse Firecrawl response as JSON.')
|
||||
else:
|
||||
self._handle_error(response, 'start LLMs.txt generation')
|
||||
except Exception as e:
|
||||
raise ValueError(str(e))
|
||||
|
||||
return GenerateLLMsTextResponse(
|
||||
success=False,
|
||||
error='Internal server error'
|
||||
)
|
||||
|
||||
def check_generate_llms_text_status(self, id: str) -> GenerateLLMsTextStatusResponse:
|
||||
"""
|
||||
Check the status of a LLMs.txt generation operation.
|
||||
|
||||
Args:
|
||||
id (str): The unique identifier of the LLMs.txt generation job to check status for.
|
||||
|
||||
Returns:
|
||||
GenerateLLMsTextStatusResponse: A response containing:
|
||||
* success (bool): Whether the generation was successful
|
||||
* status (str): Status of generation ("processing", "completed", "failed")
|
||||
* data (Dict[str, str], optional): Generated text with fields:
|
||||
* llmstxt (str): Generated LLMs.txt content
|
||||
* llmsfulltxt (str, optional): Full version if requested
|
||||
* error (str, optional): Error message if generation failed
|
||||
* expiresAt (str): When the generated data expires
|
||||
|
||||
Raises:
|
||||
Exception: If the status check fails.
|
||||
"""
|
||||
headers = self._prepare_headers()
|
||||
try:
|
||||
response = self._get_request(f'{self.api_url}/v1/llmstxt/{id}', headers)
|
||||
if response.status_code == 200:
|
||||
try:
|
||||
json_data = response.json()
|
||||
return GenerateLLMsTextStatusResponse(**json_data)
|
||||
except Exception as e:
|
||||
raise Exception(f'Failed to parse Firecrawl response as GenerateLLMsTextStatusResponse: {str(e)}')
|
||||
elif response.status_code == 404:
|
||||
raise Exception('LLMs.txt generation job not found')
|
||||
else:
|
||||
self._handle_error(response, 'check LLMs.txt generation status')
|
||||
except Exception as e:
|
||||
raise ValueError(str(e))
|
||||
|
||||
return GenerateLLMsTextStatusResponse(success=False, error='Internal server error', status='failed', expiresAt='')
|
||||
|
||||
def _prepare_headers(
|
||||
self,
|
||||
idempotency_key: Optional[str] = None) -> Dict[str, str]:
|
||||
@ -1805,226 +1647,6 @@ class FirecrawlApp:
|
||||
else:
|
||||
return f"Unexpected error during {action}: Status code {status_code}. {error_message} - {error_details}"
|
||||
|
||||
def deep_research(
|
||||
self,
|
||||
query: str,
|
||||
*,
|
||||
max_depth: Optional[int] = None,
|
||||
time_limit: Optional[int] = None,
|
||||
max_urls: Optional[int] = None,
|
||||
analysis_prompt: Optional[str] = None,
|
||||
system_prompt: Optional[str] = None,
|
||||
__experimental_stream_steps: Optional[bool] = None,
|
||||
on_activity: Optional[Callable[[Dict[str, Any]], None]] = None,
|
||||
on_source: Optional[Callable[[Dict[str, Any]], None]] = None) -> Union[DotDict[DeepResearchResponse], Dict[str, Any]]:
|
||||
"""
|
||||
Initiates a deep research operation on a given query and polls until completion.
|
||||
|
||||
Args:
|
||||
query (str): Research query or topic to investigate
|
||||
max_depth (Optional[int]): Maximum depth of research exploration
|
||||
time_limit (Optional[int]): Time limit in seconds for research
|
||||
max_urls (Optional[int]): Maximum number of URLs to process
|
||||
analysis_prompt (Optional[str]): Custom prompt for analysis
|
||||
system_prompt (Optional[str]): Custom system prompt
|
||||
__experimental_stream_steps (Optional[bool]): Enable experimental streaming
|
||||
on_activity (Optional[Callable]): Progress callback receiving {type, status, message, timestamp, depth}
|
||||
on_source (Optional[Callable]): Source discovery callback receiving {url, title, description}
|
||||
|
||||
Returns:
|
||||
DeepResearchStatusResponse containing:
|
||||
* success (bool): Whether research completed successfully
|
||||
* status (str): Current state (processing/completed/failed)
|
||||
* error (Optional[str]): Error message if failed
|
||||
* id (str): Unique identifier for the research job
|
||||
* data (Any): Research findings and analysis with dot notation access
|
||||
* final_analysis (str): Final analysis of the research (converted from camelCase)
|
||||
* sources (List[Dict]): List of discovered sources
|
||||
* activities (List[Dict]): Research progress log
|
||||
* summaries (List[str]): Generated research summaries
|
||||
|
||||
Raises:
|
||||
Exception: If research fails
|
||||
"""
|
||||
research_params = {}
|
||||
if max_depth is not None:
|
||||
research_params['maxDepth'] = max_depth
|
||||
if time_limit is not None:
|
||||
research_params['timeLimit'] = time_limit
|
||||
if max_urls is not None:
|
||||
research_params['maxUrls'] = max_urls
|
||||
if analysis_prompt is not None:
|
||||
research_params['analysisPrompt'] = analysis_prompt
|
||||
if system_prompt is not None:
|
||||
research_params['systemPrompt'] = system_prompt
|
||||
if __experimental_stream_steps is not None:
|
||||
research_params['__experimental_streamSteps'] = __experimental_stream_steps
|
||||
research_params = DeepResearchParams(**research_params)
|
||||
|
||||
response = self.async_deep_research(
|
||||
query,
|
||||
max_depth=max_depth,
|
||||
time_limit=time_limit,
|
||||
max_urls=max_urls,
|
||||
analysis_prompt=analysis_prompt,
|
||||
system_prompt=system_prompt
|
||||
)
|
||||
|
||||
dot_dict_response = convert_to_dot_dict(response)
|
||||
|
||||
if not dot_dict_response.get('success') or 'id' not in dot_dict_response:
|
||||
return dot_dict_response
|
||||
|
||||
job_id = dot_dict_response.id
|
||||
last_activity_count = 0
|
||||
last_source_count = 0
|
||||
|
||||
while True:
|
||||
status = self.check_deep_research_status(job_id)
|
||||
|
||||
if on_activity and hasattr(status, 'activities'):
|
||||
new_activities = status.activities[last_activity_count:]
|
||||
for activity in new_activities:
|
||||
on_activity(activity)
|
||||
last_activity_count = len(status.activities)
|
||||
|
||||
if on_source and hasattr(status, 'sources'):
|
||||
new_sources = status.sources[last_source_count:]
|
||||
for source in new_sources:
|
||||
on_source(source)
|
||||
last_source_count = len(status.sources)
|
||||
|
||||
if status.status == 'completed':
|
||||
return status
|
||||
elif status.status == 'failed':
|
||||
raise Exception(f'Deep research failed. Error: {status.get("error")}')
|
||||
elif status.status != 'processing':
|
||||
break
|
||||
|
||||
time.sleep(2) # Polling interval
|
||||
|
||||
return convert_to_dot_dict({'success': False, 'error': 'Deep research job terminated unexpectedly'})
|
||||
|
||||
def async_deep_research(
|
||||
self,
|
||||
query: str,
|
||||
*,
|
||||
max_depth: Optional[int] = None,
|
||||
time_limit: Optional[int] = None,
|
||||
max_urls: Optional[int] = None,
|
||||
analysis_prompt: Optional[str] = None,
|
||||
system_prompt: Optional[str] = None,
|
||||
__experimental_stream_steps: Optional[bool] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Initiates an asynchronous deep research operation.
|
||||
|
||||
Args:
|
||||
query (str): Research query or topic to investigate
|
||||
max_depth (Optional[int]): Maximum depth of research exploration
|
||||
time_limit (Optional[int]): Time limit in seconds for research
|
||||
max_urls (Optional[int]): Maximum number of URLs to process
|
||||
analysis_prompt (Optional[str]): Custom prompt for analysis
|
||||
system_prompt (Optional[str]): Custom system prompt
|
||||
__experimental_stream_steps (Optional[bool]): Enable experimental streaming
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: A response containing:
|
||||
* success (bool): Whether the research initiation was successful
|
||||
* id (str): The unique identifier for the research job
|
||||
* error (str, optional): Error message if initiation failed
|
||||
|
||||
Raises:
|
||||
Exception: If the research initiation fails.
|
||||
"""
|
||||
research_params = {}
|
||||
if max_depth is not None:
|
||||
research_params['maxDepth'] = max_depth
|
||||
if time_limit is not None:
|
||||
research_params['timeLimit'] = time_limit
|
||||
if max_urls is not None:
|
||||
research_params['maxUrls'] = max_urls
|
||||
if analysis_prompt is not None:
|
||||
research_params['analysisPrompt'] = analysis_prompt
|
||||
if system_prompt is not None:
|
||||
research_params['systemPrompt'] = system_prompt
|
||||
if __experimental_stream_steps is not None:
|
||||
research_params['__experimental_streamSteps'] = __experimental_stream_steps
|
||||
research_params = DeepResearchParams(**research_params)
|
||||
|
||||
headers = self._prepare_headers()
|
||||
|
||||
json_data = {'query': query, **research_params.model_dump(exclude_none=True)}
|
||||
json_data['origin'] = f"python-sdk@{version}"
|
||||
|
||||
# Handle json options schema if present
|
||||
if 'jsonOptions' in json_data:
|
||||
json_opts = json_data['jsonOptions']
|
||||
if json_opts and 'schema' in json_opts and hasattr(json_opts['schema'], 'schema'):
|
||||
json_data['jsonOptions']['schema'] = json_opts['schema'].schema()
|
||||
|
||||
try:
|
||||
response = self._post_request(f'{self.api_url}/v1/deep-research', json_data, headers)
|
||||
if response.status_code == 200:
|
||||
try:
|
||||
return response.json()
|
||||
except:
|
||||
raise Exception('Failed to parse Firecrawl response as JSON.')
|
||||
else:
|
||||
self._handle_error(response, 'start deep research')
|
||||
except Exception as e:
|
||||
raise ValueError(str(e))
|
||||
|
||||
return {'success': False, 'error': 'Internal server error'}
|
||||
|
||||
def check_deep_research_status(self, id: str) -> Union[DotDict[DeepResearchResponse], Dict[str, Any]]:
|
||||
"""
|
||||
Check the status of a deep research operation.
|
||||
|
||||
Args:
|
||||
id (str): The ID of the deep research operation.
|
||||
|
||||
Returns:
|
||||
DeepResearchResponse containing:
|
||||
|
||||
Status:
|
||||
* success - Whether research completed successfully
|
||||
* status - Current state (processing/completed/failed)
|
||||
* error - Error message if failed
|
||||
|
||||
Results:
|
||||
* id - Unique identifier for the research job
|
||||
* data - Research findings and analysis with dot notation access
|
||||
* final_analysis - Final analysis of the research (converted from camelCase)
|
||||
* sources - List of discovered sources
|
||||
* activities - Research progress log
|
||||
* summaries - Generated research summaries
|
||||
|
||||
Raises:
|
||||
Exception: If the status check fails.
|
||||
"""
|
||||
headers = self._prepare_headers()
|
||||
try:
|
||||
response = self._get_request(f'{self.api_url}/v1/deep-research/{id}', headers)
|
||||
if response.status_code == 200:
|
||||
try:
|
||||
json_response = response.json()
|
||||
|
||||
snake_case_response = convert_dict_keys_to_snake_case(json_response)
|
||||
|
||||
dot_dict_response = convert_to_dot_dict(snake_case_response)
|
||||
|
||||
return dot_dict_response
|
||||
except:
|
||||
raise Exception('Failed to parse Firecrawl response as JSON.')
|
||||
elif response.status_code == 404:
|
||||
raise Exception('Deep research job not found')
|
||||
else:
|
||||
self._handle_error(response, 'check deep research status')
|
||||
except Exception as e:
|
||||
raise ValueError(str(e))
|
||||
|
||||
return {'success': False, 'error': 'Internal server error'}
|
||||
|
||||
def _validate_kwargs(self, kwargs: Dict[str, Any], method_name: str) -> None:
|
||||
"""
|
||||
Validate additional keyword arguments before they are passed to the API.
|
||||
@ -3452,338 +3074,6 @@ class AsyncFirecrawlApp(FirecrawlApp):
|
||||
except Exception as e:
|
||||
raise ValueError(str(e))
|
||||
|
||||
async def generate_llms_text(
|
||||
self,
|
||||
url: str,
|
||||
*,
|
||||
max_urls: Optional[int] = None,
|
||||
show_full_text: Optional[bool] = None,
|
||||
experimental_stream: Optional[bool] = None) -> GenerateLLMsTextStatusResponse:
|
||||
"""
|
||||
Generate LLMs.txt for a given URL and monitor until completion.
|
||||
|
||||
Args:
|
||||
url (str): Target URL to generate LLMs.txt from
|
||||
max_urls (Optional[int]): Maximum URLs to process (default: 10)
|
||||
show_full_text (Optional[bool]): Include full text in output (default: False)
|
||||
experimental_stream (Optional[bool]): Enable experimental streaming
|
||||
|
||||
Returns:
|
||||
GenerateLLMsTextStatusResponse containing:
|
||||
* success (bool): Whether generation completed successfully
|
||||
* status (str): Status of generation (processing/completed/failed)
|
||||
* data (Dict[str, str], optional): Generated text with fields:
|
||||
- llmstxt (str): Generated LLMs.txt content
|
||||
- llmsfulltxt (str, optional): Full version if requested
|
||||
* error (str, optional): Error message if generation failed
|
||||
* expiresAt (str): When the generated data expires
|
||||
|
||||
Raises:
|
||||
Exception: If generation fails
|
||||
"""
|
||||
params = {}
|
||||
if max_urls is not None:
|
||||
params['maxUrls'] = max_urls
|
||||
if show_full_text is not None:
|
||||
params['showFullText'] = show_full_text
|
||||
if experimental_stream is not None:
|
||||
params['__experimental_stream'] = experimental_stream
|
||||
|
||||
response = await self.async_generate_llms_text(
|
||||
url,
|
||||
max_urls=max_urls,
|
||||
show_full_text=show_full_text,
|
||||
experimental_stream=experimental_stream
|
||||
)
|
||||
if not response.get('success') or 'id' not in response:
|
||||
return response
|
||||
|
||||
job_id = response['id']
|
||||
while True:
|
||||
status = await self.check_generate_llms_text_status(job_id)
|
||||
|
||||
if status['status'] == 'completed':
|
||||
return status
|
||||
elif status['status'] == 'failed':
|
||||
raise Exception(f'LLMs.txt generation failed. Error: {status.get("error")}')
|
||||
elif status['status'] != 'processing':
|
||||
break
|
||||
|
||||
await asyncio.sleep(2)
|
||||
|
||||
return GenerateLLMsTextStatusResponse(success=False, error='LLMs.txt generation job terminated unexpectedly')
|
||||
|
||||
async def async_generate_llms_text(
|
||||
self,
|
||||
url: str,
|
||||
*,
|
||||
max_urls: Optional[int] = None,
|
||||
show_full_text: Optional[bool] = None,
|
||||
experimental_stream: Optional[bool] = None) -> GenerateLLMsTextResponse:
|
||||
"""
|
||||
Initiate an asynchronous LLMs.txt generation job without waiting for completion.
|
||||
|
||||
Args:
|
||||
url (str): Target URL to generate LLMs.txt from
|
||||
max_urls (Optional[int]): Maximum URLs to process (default: 10)
|
||||
show_full_text (Optional[bool]): Include full text in output (default: False)
|
||||
experimental_stream (Optional[bool]): Enable experimental streaming
|
||||
|
||||
Returns:
|
||||
GenerateLLMsTextResponse containing:
|
||||
* success (bool): Whether job started successfully
|
||||
* id (str): Unique identifier for the job
|
||||
* error (str, optional): Error message if start failed
|
||||
|
||||
Raises:
|
||||
ValueError: If job initiation fails
|
||||
"""
|
||||
params = {}
|
||||
if max_urls is not None:
|
||||
params['maxUrls'] = max_urls
|
||||
if show_full_text is not None:
|
||||
params['showFullText'] = show_full_text
|
||||
if experimental_stream is not None:
|
||||
params['__experimental_stream'] = experimental_stream
|
||||
|
||||
params = GenerateLLMsTextParams(
|
||||
maxUrls=max_urls,
|
||||
showFullText=show_full_text,
|
||||
__experimental_stream=experimental_stream
|
||||
)
|
||||
|
||||
headers = self._prepare_headers()
|
||||
json_data = {'url': url, **params.model_dump(exclude_none=True)}
|
||||
json_data['origin'] = f"python-sdk@{version}"
|
||||
|
||||
try:
|
||||
return await self._async_post_request(
|
||||
f'{self.api_url}/v1/llmstxt',
|
||||
json_data,
|
||||
headers
|
||||
)
|
||||
except Exception as e:
|
||||
raise ValueError(str(e))
|
||||
|
||||
async def check_generate_llms_text_status(self, id: str) -> GenerateLLMsTextStatusResponse:
|
||||
"""
|
||||
Check the status of an asynchronous LLMs.txt generation job.
|
||||
|
||||
Args:
|
||||
id (str): The ID of the generation job
|
||||
|
||||
Returns:
|
||||
GenerateLLMsTextStatusResponse containing:
|
||||
* success (bool): Whether generation completed successfully
|
||||
* status (str): Status of generation (processing/completed/failed)
|
||||
* data (Dict[str, str], optional): Generated text with fields:
|
||||
- llmstxt (str): Generated LLMs.txt content
|
||||
- llmsfulltxt (str, optional): Full version if requested
|
||||
* error (str, optional): Error message if generation failed
|
||||
* expiresAt (str): When the generated data expires
|
||||
|
||||
Raises:
|
||||
ValueError: If status check fails
|
||||
"""
|
||||
headers = self._prepare_headers()
|
||||
try:
|
||||
return await self._async_get_request(
|
||||
f'{self.api_url}/v1/llmstxt/{id}',
|
||||
headers
|
||||
)
|
||||
except Exception as e:
|
||||
raise ValueError(str(e))
|
||||
|
||||
async def deep_research(
|
||||
self,
|
||||
query: str,
|
||||
*,
|
||||
max_depth: Optional[int] = None,
|
||||
time_limit: Optional[int] = None,
|
||||
max_urls: Optional[int] = None,
|
||||
analysis_prompt: Optional[str] = None,
|
||||
system_prompt: Optional[str] = None,
|
||||
__experimental_stream_steps: Optional[bool] = None,
|
||||
on_activity: Optional[Callable[[Dict[str, Any]], None]] = None,
|
||||
on_source: Optional[Callable[[Dict[str, Any]], None]] = None) -> DeepResearchStatusResponse:
|
||||
"""
|
||||
Initiates a deep research operation on a given query and polls until completion.
|
||||
|
||||
Args:
|
||||
query (str): Research query or topic to investigate
|
||||
max_depth (Optional[int]): Maximum depth of research exploration
|
||||
time_limit (Optional[int]): Time limit in seconds for research
|
||||
max_urls (Optional[int]): Maximum number of URLs to process
|
||||
analysis_prompt (Optional[str]): Custom prompt for analysis
|
||||
system_prompt (Optional[str]): Custom system prompt
|
||||
__experimental_stream_steps (Optional[bool]): Enable experimental streaming
|
||||
on_activity (Optional[Callable]): Progress callback receiving {type, status, message, timestamp, depth}
|
||||
on_source (Optional[Callable]): Source discovery callback receiving {url, title, description}
|
||||
|
||||
Returns:
|
||||
DeepResearchStatusResponse containing:
|
||||
* success (bool): Whether research completed successfully
|
||||
* status (str): Current state (processing/completed/failed)
|
||||
* error (Optional[str]): Error message if failed
|
||||
* id (str): Unique identifier for the research job
|
||||
* data (Any): Research findings and analysis
|
||||
* sources (List[Dict]): List of discovered sources
|
||||
* activities (List[Dict]): Research progress log
|
||||
* summaries (List[str]): Generated research summaries
|
||||
|
||||
Raises:
|
||||
Exception: If research fails
|
||||
"""
|
||||
research_params = {}
|
||||
if max_depth is not None:
|
||||
research_params['maxDepth'] = max_depth
|
||||
if time_limit is not None:
|
||||
research_params['timeLimit'] = time_limit
|
||||
if max_urls is not None:
|
||||
research_params['maxUrls'] = max_urls
|
||||
if analysis_prompt is not None:
|
||||
research_params['analysisPrompt'] = analysis_prompt
|
||||
if system_prompt is not None:
|
||||
research_params['systemPrompt'] = system_prompt
|
||||
if __experimental_stream_steps is not None:
|
||||
research_params['__experimental_streamSteps'] = __experimental_stream_steps
|
||||
research_params = DeepResearchParams(**research_params)
|
||||
|
||||
response = await self.async_deep_research(
|
||||
query,
|
||||
max_depth=max_depth,
|
||||
time_limit=time_limit,
|
||||
max_urls=max_urls,
|
||||
analysis_prompt=analysis_prompt,
|
||||
system_prompt=system_prompt
|
||||
)
|
||||
if not response.get('success') or 'id' not in response:
|
||||
return response
|
||||
|
||||
job_id = response['id']
|
||||
last_activity_count = 0
|
||||
last_source_count = 0
|
||||
|
||||
while True:
|
||||
status = await self.check_deep_research_status(job_id)
|
||||
|
||||
if on_activity and 'activities' in status:
|
||||
new_activities = status['activities'][last_activity_count:]
|
||||
for activity in new_activities:
|
||||
on_activity(activity)
|
||||
last_activity_count = len(status['activities'])
|
||||
|
||||
if on_source and 'sources' in status:
|
||||
new_sources = status['sources'][last_source_count:]
|
||||
for source in new_sources:
|
||||
on_source(source)
|
||||
last_source_count = len(status['sources'])
|
||||
|
||||
if status['status'] == 'completed':
|
||||
return status
|
||||
elif status['status'] == 'failed':
|
||||
raise Exception(f'Deep research failed. Error: {status.get("error")}')
|
||||
elif status['status'] != 'processing':
|
||||
break
|
||||
|
||||
await asyncio.sleep(2)
|
||||
|
||||
return DeepResearchStatusResponse(success=False, error='Deep research job terminated unexpectedly')
|
||||
|
||||
async def async_deep_research(
|
||||
self,
|
||||
query: str,
|
||||
*,
|
||||
max_depth: Optional[int] = None,
|
||||
time_limit: Optional[int] = None,
|
||||
max_urls: Optional[int] = None,
|
||||
analysis_prompt: Optional[str] = None,
|
||||
system_prompt: Optional[str] = None,
|
||||
__experimental_stream_steps: Optional[bool] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Initiates an asynchronous deep research operation.
|
||||
|
||||
Args:
|
||||
query (str): Research query or topic to investigate
|
||||
max_depth (Optional[int]): Maximum depth of research exploration
|
||||
time_limit (Optional[int]): Time limit in seconds for research
|
||||
max_urls (Optional[int]): Maximum number of URLs to process
|
||||
analysis_prompt (Optional[str]): Custom prompt for analysis
|
||||
system_prompt (Optional[str]): Custom system prompt
|
||||
__experimental_stream_steps (Optional[bool]): Enable experimental streaming
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: A response containing:
|
||||
* success (bool): Whether the research initiation was successful
|
||||
* id (str): The unique identifier for the research job
|
||||
* error (str, optional): Error message if initiation failed
|
||||
|
||||
Raises:
|
||||
Exception: If the research initiation fails.
|
||||
"""
|
||||
research_params = {}
|
||||
if max_depth is not None:
|
||||
research_params['maxDepth'] = max_depth
|
||||
if time_limit is not None:
|
||||
research_params['timeLimit'] = time_limit
|
||||
if max_urls is not None:
|
||||
research_params['maxUrls'] = max_urls
|
||||
if analysis_prompt is not None:
|
||||
research_params['analysisPrompt'] = analysis_prompt
|
||||
if system_prompt is not None:
|
||||
research_params['systemPrompt'] = system_prompt
|
||||
if __experimental_stream_steps is not None:
|
||||
research_params['__experimental_streamSteps'] = __experimental_stream_steps
|
||||
research_params = DeepResearchParams(**research_params)
|
||||
|
||||
headers = self._prepare_headers()
|
||||
|
||||
json_data = {'query': query, **research_params.model_dump(exclude_none=True)}
|
||||
json_data['origin'] = f"python-sdk@{version}"
|
||||
|
||||
try:
|
||||
return await self._async_post_request(
|
||||
f'{self.api_url}/v1/deep-research',
|
||||
json_data,
|
||||
headers
|
||||
)
|
||||
except Exception as e:
|
||||
raise ValueError(str(e))
|
||||
|
||||
async def check_deep_research_status(self, id: str) -> DeepResearchStatusResponse:
|
||||
"""
|
||||
Check the status of a deep research operation.
|
||||
|
||||
Args:
|
||||
id (str): The ID of the deep research operation.
|
||||
|
||||
Returns:
|
||||
DeepResearchResponse containing:
|
||||
|
||||
Status:
|
||||
* success - Whether research completed successfully
|
||||
* status - Current state (processing/completed/failed)
|
||||
* error - Error message if failed
|
||||
|
||||
Results:
|
||||
* id - Unique identifier for the research job
|
||||
* data - Research findings and analysis
|
||||
* sources - List of discovered sources
|
||||
* activities - Research progress log
|
||||
* summaries - Generated research summaries
|
||||
|
||||
Raises:
|
||||
Exception: If the status check fails.
|
||||
"""
|
||||
headers = self._prepare_headers()
|
||||
try:
|
||||
return await self._async_get_request(
|
||||
f'{self.api_url}/v1/deep-research/{id}',
|
||||
headers
|
||||
)
|
||||
except Exception as e:
|
||||
raise ValueError(str(e))
|
||||
|
||||
async def search(
|
||||
self,
|
||||
query: str,
|
||||
|
@ -162,7 +162,6 @@ class ExecuteJavascriptAction(pydantic.BaseModel):
|
||||
type: Literal["executeJavascript"] = pydantic.Field(default="executeJavascript")
|
||||
script: str
|
||||
|
||||
|
||||
class ExtractAgent(pydantic.BaseModel):
|
||||
"""Configuration for the agent in extract operations."""
|
||||
model: Literal["FIRE-1"] = "FIRE-1"
|
||||
@ -303,66 +302,6 @@ class SearchResponse(pydantic.BaseModel):
|
||||
data: List[FirecrawlDocument]
|
||||
warning: Optional[str] = None
|
||||
error: Optional[str] = None
|
||||
|
||||
class GenerateLLMsTextParams(pydantic.BaseModel):
|
||||
"""
|
||||
Parameters for the LLMs.txt generation operation.
|
||||
"""
|
||||
max_urls: Optional[int] = 10
|
||||
show_full_text: Optional[bool] = False
|
||||
__experimental_stream: Optional[bool] = None
|
||||
|
||||
class DeepResearchParams(pydantic.BaseModel):
|
||||
"""
|
||||
Parameters for the deep research operation.
|
||||
"""
|
||||
max_depth: Optional[int] = 7
|
||||
time_limit: Optional[int] = 270
|
||||
max_urls: Optional[int] = 20
|
||||
analysis_prompt: Optional[str] = None
|
||||
system_prompt: Optional[str] = None
|
||||
__experimental_stream_steps: Optional[bool] = None
|
||||
|
||||
class DeepResearchResponse(pydantic.BaseModel):
|
||||
"""
|
||||
Response from the deep research operation.
|
||||
"""
|
||||
success: bool
|
||||
id: str
|
||||
error: Optional[str] = None
|
||||
|
||||
class DeepResearchStatusResponse(pydantic.BaseModel):
|
||||
"""
|
||||
Status response from the deep research operation.
|
||||
"""
|
||||
success: bool
|
||||
data: Optional[Dict[str, Any]] = None
|
||||
status: str
|
||||
error: Optional[str] = None
|
||||
expires_at: str
|
||||
current_depth: int
|
||||
max_depth: int
|
||||
activities: List[Dict[str, Any]]
|
||||
sources: List[Dict[str, Any]]
|
||||
summaries: List[str]
|
||||
|
||||
class GenerateLLMsTextResponse(pydantic.BaseModel):
|
||||
"""Response from LLMs.txt generation operations."""
|
||||
success: bool = True
|
||||
id: str
|
||||
error: Optional[str] = None
|
||||
|
||||
class GenerateLLMsTextStatusResponseData(pydantic.BaseModel):
|
||||
llmstxt: str
|
||||
llmsfulltxt: Optional[str] = None
|
||||
|
||||
class GenerateLLMsTextStatusResponse(pydantic.BaseModel):
|
||||
"""Status response from LLMs.txt generation operations."""
|
||||
success: bool = True
|
||||
data: Optional[GenerateLLMsTextStatusResponseData] = None
|
||||
status: Literal["processing", "completed", "failed"]
|
||||
error: Optional[str] = None
|
||||
expires_at: str
|
||||
|
||||
class SearchResponse(pydantic.BaseModel):
|
||||
"""
|
||||
|
@ -1,116 +1,9 @@
|
||||
"""
|
||||
Utility functions for the Firecrawl SDK.
|
||||
"""
|
||||
import re
|
||||
from typing import Any, Dict, List, Union, TypeVar, Generic, Optional, TypedDict, Literal
|
||||
from typing import Any, Dict, List, Union, TypeVar, Optional, Literal
|
||||
from .types import LocationConfig, JsonConfig, ChangeTrackingOptions, WaitAction, ScreenshotAction, WriteAction, PressAction, ScrollAction, ExecuteJavascriptAction, AgentOptions
|
||||
T = TypeVar('T')
|
||||
|
||||
class DeepResearchDataSource(TypedDict, total=False):
|
||||
"""Type definition for a source in deep research data."""
|
||||
url: str
|
||||
title: str
|
||||
content: str
|
||||
summary: str
|
||||
|
||||
|
||||
class DeepResearchData(TypedDict, total=False):
|
||||
"""Type definition for deep research data."""
|
||||
final_analysis: str
|
||||
sources: List[DeepResearchDataSource]
|
||||
|
||||
|
||||
class DeepResearchResponse(TypedDict, total=False):
|
||||
"""Type definition for deep research response."""
|
||||
success: bool
|
||||
status: str
|
||||
current_depth: int
|
||||
max_depth: int
|
||||
activities: List[Dict[str, Any]]
|
||||
summaries: List[str]
|
||||
data: DeepResearchData
|
||||
|
||||
|
||||
def camel_to_snake(name: str) -> str:
|
||||
"""
|
||||
Convert a camelCase string to snake_case.
|
||||
|
||||
Args:
|
||||
name (str): The camelCase string to convert.
|
||||
|
||||
Returns:
|
||||
str: The snake_case string.
|
||||
"""
|
||||
if not name:
|
||||
return name
|
||||
|
||||
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
|
||||
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
|
||||
|
||||
|
||||
def convert_dict_keys_to_snake_case(data: Any) -> Any:
|
||||
"""
|
||||
Recursively convert all dictionary keys from camelCase to snake_case.
|
||||
|
||||
Args:
|
||||
data (Any): The data to convert. Can be a dictionary, list, or primitive type.
|
||||
|
||||
Returns:
|
||||
Any: The converted data with snake_case keys.
|
||||
"""
|
||||
if isinstance(data, dict):
|
||||
return {camel_to_snake(k): convert_dict_keys_to_snake_case(v) for k, v in data.items()}
|
||||
elif isinstance(data, list):
|
||||
return [convert_dict_keys_to_snake_case(item) for item in data]
|
||||
else:
|
||||
return data
|
||||
|
||||
|
||||
class DotDict(dict, Generic[T]):
|
||||
"""
|
||||
A dictionary that supports dot notation access to its items.
|
||||
|
||||
Example:
|
||||
>>> d = DotDict({'foo': 'bar'})
|
||||
>>> d.foo
|
||||
'bar'
|
||||
>>> d['foo']
|
||||
'bar'
|
||||
"""
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
for key, value in self.items():
|
||||
if isinstance(value, dict):
|
||||
self[key] = DotDict(value)
|
||||
elif isinstance(value, list):
|
||||
self[key] = [DotDict(item) if isinstance(item, dict) else item for item in value]
|
||||
|
||||
def __getattr__(self, key: str) -> Any:
|
||||
try:
|
||||
return self[key]
|
||||
except KeyError:
|
||||
raise AttributeError(f"'DotDict' object has no attribute '{key}'")
|
||||
|
||||
def __setattr__(self, key: str, value: Any) -> None:
|
||||
self[key] = value
|
||||
|
||||
|
||||
def convert_to_dot_dict(data: Union[Dict[str, Any], List[Any], Any]) -> Union[DotDict[Any], List[Any], Any]:
|
||||
"""
|
||||
Convert a dictionary or list of dictionaries to DotDict objects.
|
||||
|
||||
Args:
|
||||
data (Union[Dict[str, Any], List[Any], Any]): The data to convert.
|
||||
|
||||
Returns:
|
||||
Union[DotDict[Any], List[Any], Any]: The converted data with DotDict objects.
|
||||
"""
|
||||
if isinstance(data, dict):
|
||||
return DotDict(data)
|
||||
elif isinstance(data, list):
|
||||
return [convert_to_dot_dict(item) for item in data]
|
||||
else:
|
||||
return data
|
||||
|
||||
def ensure_schema_dict(schema):
|
||||
"""
|
||||
|
@ -1,48 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
import pytest
|
||||
from dotenv import load_dotenv
|
||||
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
|
||||
from firecrawl.firecrawl import AsyncFirecrawlApp
|
||||
|
||||
load_dotenv()
|
||||
|
||||
API_URL = "https://api.firecrawl.dev"
|
||||
API_KEY = os.getenv("TEST_API_KEY")
|
||||
|
||||
app = AsyncFirecrawlApp(api_url=API_URL, api_key=API_KEY)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deep_research_async_simple():
|
||||
result = await app.deep_research("What is the capital of France?", max_urls=2)
|
||||
assert hasattr(result, "status")
|
||||
assert result.status == "completed"
|
||||
assert hasattr(result, "success")
|
||||
assert result.success
|
||||
assert hasattr(result, "data")
|
||||
assert result.data is not None
|
||||
assert any("Paris" in str(result.data) or "France" in str(result.data) for _ in [0])
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deep_research_async_all_params():
|
||||
result = await app.deep_research(
|
||||
"What are the latest advancements in AI?",
|
||||
max_depth=2,
|
||||
time_limit=60,
|
||||
max_urls=3,
|
||||
analysis_prompt="Summarize the most important recent AI advancements.",
|
||||
system_prompt="You are an expert AI researcher."
|
||||
)
|
||||
assert hasattr(result, "status")
|
||||
assert result.status == "completed"
|
||||
assert hasattr(result, "success")
|
||||
assert result.success
|
||||
assert hasattr(result, "data")
|
||||
assert hasattr(result, "activities")
|
||||
assert isinstance(result.activities, list)
|
||||
assert result.data is not None
|
||||
assert hasattr(result.data, "sources")
|
||||
assert isinstance(result.data.sources, list)
|
||||
assert hasattr(result.data, "final_analysis")
|
||||
assert isinstance(result.data.final_analysis, str)
|
@ -1,43 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
import pytest
|
||||
from dotenv import load_dotenv
|
||||
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')))
|
||||
from firecrawl.firecrawl import AsyncFirecrawlApp
|
||||
|
||||
load_dotenv()
|
||||
|
||||
API_URL = "https://api.firecrawl.dev"
|
||||
API_KEY = os.getenv("TEST_API_KEY")
|
||||
|
||||
app = AsyncFirecrawlApp(api_url=API_URL, api_key=API_KEY)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_llms_text_async_simple():
|
||||
result = await app.generate_llms_text("https://example.com")
|
||||
assert hasattr(result, "status")
|
||||
assert result.status == "completed"
|
||||
assert hasattr(result, "data")
|
||||
assert result.data is not None
|
||||
assert hasattr(result.data, "llmstxt")
|
||||
assert isinstance(result.data.llmstxt, str)
|
||||
assert len(result.data.llmstxt) > 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_llms_text_async_all_params():
|
||||
result = await app.generate_llms_text(
|
||||
"https://www.iana.org",
|
||||
max_urls=5,
|
||||
show_full_text=True,
|
||||
experimental_stream=True
|
||||
)
|
||||
assert hasattr(result, "status")
|
||||
assert result.status == "completed"
|
||||
assert hasattr(result, "data")
|
||||
assert result.data is not None
|
||||
assert hasattr(result.data, "llmstxt")
|
||||
assert isinstance(result.data.llmstxt, str)
|
||||
assert len(result.data.llmstxt) > 0
|
||||
assert hasattr(result.data, "llmsfulltxt")
|
||||
assert result.data.llmsfulltxt is None or isinstance(result.data.llmsfulltxt, str)
|
@ -1,46 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
import pytest
|
||||
from dotenv import load_dotenv
|
||||
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../')))
|
||||
from firecrawl.firecrawl import FirecrawlApp
|
||||
|
||||
load_dotenv()
|
||||
|
||||
API_URL = "https://api.firecrawl.dev"
|
||||
API_KEY = os.getenv("TEST_API_KEY")
|
||||
|
||||
app = FirecrawlApp(api_url=API_URL, api_key=API_KEY)
|
||||
|
||||
def test_deep_research_simple():
|
||||
result = app.deep_research("What is the capital of France?", max_urls=2)
|
||||
assert hasattr(result, "status")
|
||||
assert result.status == "completed"
|
||||
assert hasattr(result, "success")
|
||||
assert result.success
|
||||
assert hasattr(result, "data")
|
||||
assert result.data is not None
|
||||
assert any("Paris" in str(result.data) or "France" in str(result.data) for _ in [0])
|
||||
|
||||
def test_deep_research_all_params():
|
||||
result = app.deep_research(
|
||||
"What are the latest advancements in AI?",
|
||||
max_depth=2,
|
||||
time_limit=60,
|
||||
max_urls=3,
|
||||
analysis_prompt="Summarize the most important recent AI advancements.",
|
||||
system_prompt="You are an expert AI researcher."
|
||||
)
|
||||
assert hasattr(result, "status")
|
||||
assert result.status == "completed"
|
||||
assert hasattr(result, "success")
|
||||
assert result.success
|
||||
assert hasattr(result, "data")
|
||||
assert hasattr(result, "activities")
|
||||
assert isinstance(result.activities, list)
|
||||
assert result.data is not None
|
||||
assert hasattr(result.data, "sources")
|
||||
assert isinstance(result.data.sources, list)
|
||||
assert hasattr(result.data, "final_analysis")
|
||||
assert isinstance(result.data.final_analysis, str)
|
@ -1,41 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
import pytest
|
||||
from dotenv import load_dotenv
|
||||
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../')))
|
||||
from firecrawl.firecrawl import FirecrawlApp
|
||||
|
||||
load_dotenv()
|
||||
|
||||
API_URL = "https://api.firecrawl.dev"
|
||||
API_KEY = os.getenv("TEST_API_KEY")
|
||||
|
||||
app = FirecrawlApp(api_url=API_URL, api_key=API_KEY)
|
||||
|
||||
def test_generate_llms_text_simple():
|
||||
result = app.generate_llms_text("https://example.com")
|
||||
assert hasattr(result, "status")
|
||||
assert result.status == "completed"
|
||||
assert hasattr(result, "data")
|
||||
assert result.data is not None
|
||||
assert hasattr(result.data, "llmstxt")
|
||||
assert isinstance(result.data.llmstxt, str)
|
||||
assert len(result.data.llmstxt) > 0
|
||||
|
||||
def test_generate_llms_text_all_params():
|
||||
result = app.generate_llms_text(
|
||||
"https://www.iana.org",
|
||||
max_urls=5,
|
||||
show_full_text=True,
|
||||
experimental_stream=True
|
||||
)
|
||||
assert hasattr(result, "status")
|
||||
assert result.status == "completed"
|
||||
assert hasattr(result, "data")
|
||||
assert result.data is not None
|
||||
assert hasattr(result.data, "llmstxt")
|
||||
assert isinstance(result.data.llmstxt, str)
|
||||
assert len(result.data.llmstxt) > 0
|
||||
assert hasattr(result.data, "llmsfulltxt")
|
||||
assert result.data.llmsfulltxt is None or isinstance(result.data.llmsfulltxt, str)
|
Loading…
x
Reference in New Issue
Block a user