mirror of
https://git.mirrors.martin98.com/https://github.com/mendableai/firecrawl
synced 2025-06-22 04:22:07 +08:00
252 lines
10 KiB
Python
252 lines
10 KiB
Python
import os
|
|
import json
|
|
import time
|
|
import requests
|
|
from dotenv import load_dotenv
|
|
from serpapi.google_search import GoogleSearch
|
|
from together import Together
|
|
|
|
# ANSI color codes
|
|
class Colors:
|
|
CYAN = '\033[96m'
|
|
YELLOW = '\033[93m'
|
|
GREEN = '\033[92m'
|
|
RED = '\033[91m'
|
|
MAGENTA = '\033[95m'
|
|
BLUE = '\033[94m'
|
|
RESET = '\033[0m'
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Initialize clients
|
|
together_api_key = os.getenv("TOGETHER_API_KEY")
|
|
if not together_api_key:
|
|
print(f"{Colors.RED}Error: TOGETHER_API_KEY not found in environment variables{Colors.RESET}")
|
|
|
|
client = Together(api_key=together_api_key)
|
|
firecrawl_api_key = os.getenv("FIRECRAWL_API_KEY")
|
|
serp_api_key = os.getenv("SERP_API_KEY")
|
|
|
|
if not firecrawl_api_key:
|
|
print(f"{Colors.RED}Warning: FIRECRAWL_API_KEY not found in environment variables{Colors.RESET}")
|
|
|
|
def search_google(query):
|
|
"""Search Google using SerpAPI and return top results."""
|
|
print(f"{Colors.YELLOW}Searching Google for '{query}'...{Colors.RESET}")
|
|
search = GoogleSearch({"q": query, "api_key": serp_api_key})
|
|
return search.get_dict().get("organic_results", [])
|
|
|
|
def select_urls_with_gemini(company, objective, serp_results):
|
|
"""
|
|
Use Llama 4 Maverick to select URLs from SERP results.
|
|
Returns a list of URLs.
|
|
"""
|
|
try:
|
|
serp_data = [{"title": r.get("title"), "link": r.get("link"), "snippet": r.get("snippet")}
|
|
for r in serp_results if r.get("link")]
|
|
|
|
print(f"{Colors.CYAN}Found {len(serp_data)} search results to analyze{Colors.RESET}")
|
|
|
|
if not serp_data:
|
|
print(f"{Colors.YELLOW}No search results found to analyze{Colors.RESET}")
|
|
return []
|
|
|
|
prompt = (
|
|
"Task: Select the most relevant URLs from search results, prioritizing official sources.\n\n"
|
|
"Instructions:\n"
|
|
"1. PRIORITIZE official company websites, documentation, and press releases first\n"
|
|
"2. Select ONLY URLs that directly contain information about the requested topic\n"
|
|
"3. Return ONLY a JSON object with the following structure: {\"selected_urls\": [\"url1\", \"url2\"]}\n"
|
|
"4. Do not include social media links (Twitter, LinkedIn, Facebook, etc.)\n"
|
|
"5. Exclude any LinkedIn URLs as they cannot be accessed\n"
|
|
"6. Select a MAXIMUM of 3 most relevant URLs\n"
|
|
"7. Order URLs by relevance: official sources first, then trusted news/industry sources\n"
|
|
"8. IMPORTANT: Only output the JSON object, no other text or explanation\n\n"
|
|
f"Company: {company}\n"
|
|
f"Information Needed: {objective}\n"
|
|
f"Search Results: {json.dumps(serp_data, indent=2)}\n\n"
|
|
"Response Format: {\"selected_urls\": [\"https://example.com\", \"https://example2.com\"]}\n\n"
|
|
"Remember: Prioritize OFFICIAL sources and limit to 3 MOST RELEVANT URLs only."
|
|
)
|
|
|
|
try:
|
|
print(f"{Colors.YELLOW}Calling Together AI model...{Colors.RESET}")
|
|
response = client.chat.completions.create(
|
|
model="meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8",
|
|
messages=[{"role": "user", "content": prompt}],
|
|
)
|
|
print(f"{Colors.GREEN}Got response from Together AI{Colors.RESET}")
|
|
print(f"{Colors.CYAN}Raw response: {response.choices[0].message.content}{Colors.RESET}")
|
|
|
|
cleaned_response = response.choices[0].message.content.strip()
|
|
|
|
# Find the JSON object in the response
|
|
import re
|
|
json_match = re.search(r'\{[\s\S]*"selected_urls"[\s\S]*\}', cleaned_response)
|
|
if json_match:
|
|
cleaned_response = json_match.group(0)
|
|
print(f"{Colors.CYAN}Extracted JSON: {cleaned_response}{Colors.RESET}")
|
|
|
|
# Clean the response text
|
|
if cleaned_response.startswith('```'):
|
|
cleaned_response = cleaned_response.split('```')[1]
|
|
if cleaned_response.startswith('json'):
|
|
cleaned_response = cleaned_response[4:]
|
|
cleaned_response = cleaned_response.strip()
|
|
|
|
try:
|
|
# Parse JSON response
|
|
result = json.loads(cleaned_response)
|
|
if isinstance(result, dict) and "selected_urls" in result:
|
|
urls = result["selected_urls"]
|
|
else:
|
|
print(f"{Colors.YELLOW}Response did not contain the expected 'selected_urls' key{Colors.RESET}")
|
|
urls = []
|
|
except json.JSONDecodeError as e:
|
|
print(f"{Colors.YELLOW}Failed to parse JSON: {str(e)}{Colors.RESET}")
|
|
# Fallback to text parsing
|
|
urls = [line.strip() for line in cleaned_response.split('\n')
|
|
if line.strip().startswith(('http://', 'https://'))]
|
|
|
|
# Clean up URLs
|
|
cleaned_urls = [url.replace('/*', '').rstrip('/') for url in urls]
|
|
cleaned_urls = [url for url in cleaned_urls if url]
|
|
|
|
if not cleaned_urls:
|
|
print(f"{Colors.YELLOW}No valid URLs found in response.{Colors.RESET}")
|
|
return []
|
|
|
|
print(f"{Colors.CYAN}Selected URLs for extraction:{Colors.RESET}")
|
|
for url in cleaned_urls:
|
|
print(f"- {url}")
|
|
|
|
return cleaned_urls
|
|
|
|
except Exception as e:
|
|
print(f"{Colors.RED}Error calling Together AI: {str(e)}{Colors.RESET}")
|
|
return []
|
|
|
|
except Exception as e:
|
|
print(f"{Colors.RED}Error selecting URLs: {str(e)}{Colors.RESET}")
|
|
return []
|
|
|
|
def extract_company_info(urls, prompt, company, api_key):
|
|
"""Use requests to call Firecrawl's extract endpoint with selected URLs."""
|
|
print(f"{Colors.YELLOW}Extracting structured data from the provided URLs using Firecrawl...{Colors.RESET}")
|
|
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Bearer {api_key}'
|
|
}
|
|
|
|
payload = {
|
|
"urls": urls,
|
|
"prompt": prompt + " for " + company,
|
|
"enableWebSearch": True
|
|
}
|
|
|
|
try:
|
|
print(f"{Colors.CYAN}Making request to Firecrawl API...{Colors.RESET}")
|
|
response = requests.post(
|
|
"https://api.firecrawl.dev/v1/extract",
|
|
headers=headers,
|
|
json=payload,
|
|
timeout=120 # Increased timeout to 120 seconds
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
print(f"{Colors.RED}API returned status code {response.status_code}: {response.text}{Colors.RESET}")
|
|
return None
|
|
|
|
data = response.json()
|
|
|
|
if not data.get('success'):
|
|
print(f"{Colors.RED}API returned error: {data.get('error', 'No error message')}{Colors.RESET}")
|
|
return None
|
|
|
|
extraction_id = data.get('id')
|
|
if not extraction_id:
|
|
print(f"{Colors.RED}No extraction ID found in response.{Colors.RESET}")
|
|
return None
|
|
|
|
return poll_firecrawl_result(extraction_id, api_key, interval=5, max_attempts=120) # Increased polling attempts
|
|
|
|
except requests.exceptions.Timeout:
|
|
print(f"{Colors.RED}Request timed out. The operation might still be processing in the background.{Colors.RESET}")
|
|
print(f"{Colors.YELLOW}You may want to try again with fewer URLs or a more specific prompt.{Colors.RESET}")
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"{Colors.RED}Request failed: {e}{Colors.RESET}")
|
|
return None
|
|
except json.JSONDecodeError as e:
|
|
print(f"{Colors.RED}Failed to parse response: {e}{Colors.RESET}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"{Colors.RED}Failed to extract data: {e}{Colors.RESET}")
|
|
return None
|
|
|
|
def poll_firecrawl_result(extraction_id, api_key, interval=10, max_attempts=60):
|
|
"""Poll Firecrawl API to get the extraction result."""
|
|
url = f"https://api.firecrawl.dev/v1/extract/{extraction_id}"
|
|
headers = {
|
|
'Authorization': f'Bearer {api_key}'
|
|
}
|
|
|
|
print(f"{Colors.YELLOW}Waiting for extraction to complete...{Colors.RESET}")
|
|
|
|
for attempt in range(1, max_attempts + 1):
|
|
try:
|
|
response = requests.get(url, headers=headers, timeout=30)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if data.get('success') and data.get('data'):
|
|
print(f"{Colors.GREEN}Data successfully extracted:{Colors.RESET}")
|
|
print(json.dumps(data['data'], indent=2))
|
|
return data['data']
|
|
elif data.get('success') and not data.get('data'):
|
|
if attempt % 6 == 0:
|
|
print(f"{Colors.YELLOW}Still processing... (attempt {attempt}/{max_attempts}){Colors.RESET}")
|
|
time.sleep(interval)
|
|
else:
|
|
print(f"{Colors.RED}API Error: {data.get('error', 'No error message provided')}{Colors.RESET}")
|
|
return None
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"{Colors.RED}Request error: {str(e)}{Colors.RESET}")
|
|
return None
|
|
except json.JSONDecodeError as e:
|
|
print(f"{Colors.RED}JSON parsing error: {str(e)}{Colors.RESET}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"{Colors.RED}Unexpected error: {str(e)}{Colors.RESET}")
|
|
return None
|
|
|
|
print(f"{Colors.RED}Max polling attempts reached. Extraction did not complete in time.{Colors.RESET}")
|
|
return None
|
|
|
|
def main():
|
|
company = input(f"{Colors.BLUE}Enter the company name: {Colors.RESET}")
|
|
objective = input(f"{Colors.BLUE}Enter what information you want about the company: {Colors.RESET}")
|
|
|
|
serp_results = search_google(f"{company}")
|
|
if not serp_results:
|
|
print(f"{Colors.RED}No search results found.{Colors.RESET}")
|
|
return
|
|
|
|
selected_urls = select_urls_with_gemini(company, objective, serp_results)
|
|
|
|
if not selected_urls:
|
|
print(f"{Colors.RED}No URLs were selected.{Colors.RESET}")
|
|
return
|
|
|
|
data = extract_company_info(selected_urls, objective, company, firecrawl_api_key)
|
|
|
|
if data:
|
|
print(f"{Colors.GREEN}Extraction completed successfully.{Colors.RESET}")
|
|
else:
|
|
print(f"{Colors.RED}Failed to extract the requested information. Try refining your prompt or choosing a different company.{Colors.RESET}")
|
|
|
|
if __name__ == "__main__":
|
|
main() |