Update R1_web_crawler.py

This commit is contained in:
Aparup Ganguly 2025-01-31 02:58:56 +05:30 committed by GitHub
parent a3b5666cba
commit db740a0c96
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,10 +1,10 @@
import os
import json
import time
import requests
from dotenv import load_dotenv
from openai import OpenAI
from serpapi import GoogleSearch
from firecrawl import FirecrawlApp
from serpapi.google_search import GoogleSearch
# ANSI color codes
class Colors:
@ -22,17 +22,18 @@ load_dotenv()
# Initialize clients
client = OpenAI(api_key=os.getenv("DEEPSEEK_API_KEY"), base_url="https://api.deepseek.com")
firecrawl_api_key = os.getenv("FIRECRAWL_API_KEY")
serp_api_key = os.getenv("SERP_API_KEY")
def search_google(query):
"""Search Google using SerpAPI and return top results."""
print(f"{Colors.YELLOW}Searching Google for '{query}'...{Colors.RESET}")
search = GoogleSearch({"q": query, "api_key": os.getenv("SERP_API_KEY")})
search = GoogleSearch({"q": query, "api_key": serp_api_key})
return search.get_dict().get("organic_results", [])
def select_urls_with_r1(company, objective, serp_results):
"""
Use R1 to select the most relevant URLs from SERP results for the given company and objective.
Returns a JSON object with a "selected_urls" property that is an array of strings.
Returns a list of URLs.
"""
try:
# Prepare the data for R1
@ -44,7 +45,7 @@ def select_urls_with_r1(company, objective, serp_results):
messages=[
{
"role": "system",
"content": "You select URLs from the SERP results relevant to the company and objective."
"content": "You are a URL selector that always responds with valid JSON. You select URLs from the SERP results relevant to the company and objective. Your response must be a JSON object with a 'selected_urls' array property containing strings."
},
{
"role": "user",
@ -53,90 +54,127 @@ def select_urls_with_r1(company, objective, serp_results):
f"Objective: {objective}\n"
f"SERP Results: {json.dumps(serp_data)}\n\n"
"Return a JSON object with a property 'selected_urls' that contains an array "
"of URLs most likely to help meet the objective. If you think the data might not be on the homepage, add a /* to the end of the URL. Do not return any social media links. For example: {\"selected_urls\": [\"https://example.com\", \"https://example2.com\"]}"
"of URLs most likely to help meet the objective. Add a /* to the end of the URL if you think it should search all of the pages in the site. Do not return any social media links. For example: {\"selected_urls\": [\"https://example.com\", \"https://example2.com\"]}"
)
}
],
]
)
# The response is guaranteed to follow the specified JSON schema
result = json.loads(response.choices[0].message.content)
urls = result.get("selected_urls", [])
return urls
try:
# First try to parse as JSON
result = json.loads(response.choices[0].message.content)
if isinstance(result, dict) and "selected_urls" in result:
urls = result["selected_urls"]
else:
# If JSON doesn't have the expected structure, fall back to text parsing
response_text = response.choices[0].message.content
urls = [line.strip() for line in response_text.split('\n')
if line.strip().startswith(('http://', 'https://'))]
except json.JSONDecodeError:
# If JSON parsing fails, fall back to text parsing
response_text = response.choices[0].message.content
urls = [line.strip() for line in response_text.split('\n')
if line.strip().startswith(('http://', 'https://'))]
# Clean up URLs - remove wildcards and trailing slashes
cleaned_urls = [url.replace('/*', '').rstrip('/') for url in urls]
cleaned_urls = [url for url in cleaned_urls if url]
if not cleaned_urls:
print(f"{Colors.YELLOW}No valid URLs found.{Colors.RESET}")
return []
print(f"{Colors.CYAN}Selected URLs for extraction by R1:{Colors.RESET}")
for url in cleaned_urls:
print(f"- {url}")
return cleaned_urls
except Exception as e:
print(f"{Colors.RED}Error selecting URLs with R1: {e}{Colors.RESET}")
return []
def extract_company_info(urls, prompt, company, api_key):
"""Use requests to call Firecrawl's extract endpoint with selected URLs."""
print(f"{Colors.YELLOW}Extracting structured data from the provided URLs using Firecrawl's /extract endpoint...{Colors.RESET}")
app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY"))
print(f"{Colors.YELLOW}Extracting structured data from the provided URLs using Firecrawl...{Colors.RESET}")
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}'
}
payload = {
"urls": urls,
"prompt": prompt + " for " + company,
"enableWebSearch": True
}
try:
extract_prompt = prompt + " for " + company
response = app.extract(urls, {"prompt": extract_prompt, "enableWebSearch": True})
print(response)
return response
response = requests.post(
"https://api.firecrawl.dev/v1/extract",
headers=headers,
json=payload,
timeout=30
)
data = response.json()
if not data.get('success'):
print(f"{Colors.RED}API returned error: {data.get('error', 'No error message')}{Colors.RESET}")
return None
# Assuming Firecrawl provides a way to retrieve data with 'id'
extraction_id = data.get('id')
if not extraction_id:
print(f"{Colors.RED}No extraction ID found in response.{Colors.RESET}")
return None
# Polling for the extraction result
return poll_firecrawl_result(extraction_id, api_key)
except requests.exceptions.RequestException as e:
print(f"{Colors.RED}Request failed: {e}{Colors.RESET}")
return None
except json.JSONDecodeError as e:
print(f"{Colors.RED}Failed to parse response: {e}{Colors.RESET}")
return None
except Exception as e:
print(f"{Colors.RED}Failed to extract data: {e}{Colors.RESET}")
return None
def deduplicate_with_r1(data, company, objective):
"""Use R1 to deduplicate and consolidate extracted information."""
print(f"{Colors.YELLOW}Deduplicating and consolidating information using R1...{Colors.RESET}")
try:
# Ensure data is valid JSON before sending
if not data:
return {}
response = client.chat.completions.create(
model="deepseek-reasoner",
messages=[
{
"role": "system",
"content": "You are an expert at consolidating information and removing duplicates. Analyze the extracted data and provide a clean, consolidated response."
},
{
"role": "user",
"content": (
f"Company: {company}\n"
f"Objective: {objective}\n"
f"Extracted Data: {json.dumps(data, indent=2)}\n\n"
"Please analyze this data and:\n"
"1. Remove any duplicate information\n"
"2. Consolidate similar points\n"
"3. Format the response as a clean JSON object\n"
"4. Ensure all information is relevant to the objective\n"
"Return only the JSON response."
)
}
],
)
# Handle empty or invalid responses
response_text = response.choices[0].message.content.strip()
if not response_text:
return {}
def poll_firecrawl_result(extraction_id, api_key, interval=5, max_attempts=12):
"""Poll Firecrawl API to get the extraction result."""
url = f"https://api.firecrawl.dev/v1/extract/{extraction_id}"
headers = {
'Authorization': f'Bearer {api_key}'
}
for attempt in range(1, max_attempts + 1):
try:
consolidated_data = json.loads(response_text)
return consolidated_data
except json.JSONDecodeError:
# If JSON parsing fails, try to extract JSON from the response
# Look for content between curly braces
start = response_text.find('{')
end = response_text.rfind('}')
if start >= 0 and end >= 0:
json_str = response_text[start:end+1]
return json.loads(json_str)
return {}
# print(f"{Colors.YELLOW}Polling for extraction result (Attempt {attempt}/{max_attempts})...{Colors.RESET}")
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
data = response.json()
except Exception as e:
print(f"{Colors.RED}Error deduplicating data with R1: {e}{Colors.RESET}")
return data
if data.get('success') and data.get('data'):
print(f"{Colors.GREEN}Data successfully extracted:{Colors.RESET}")
print(json.dumps(data['data'], indent=2))
return data['data']
elif data.get('success') and not data.get('data'):
time.sleep(interval)
else:
print(f"{Colors.RED}API Error: {data.get('error', 'No error message provided')}{Colors.RESET}")
return None
except requests.exceptions.RequestException:
return None
except json.JSONDecodeError:
return None
except Exception:
return None
print(f"{Colors.RED}Max polling attempts reached. Extraction did not complete in time.{Colors.RESET}")
return None
def main():
company = input(f"{Colors.BLUE}Enter the company name: {Colors.RESET}")
@ -154,20 +192,12 @@ def main():
print(f"{Colors.RED}R1 did not return any URLs.{Colors.RESET}")
return
print(f"{Colors.CYAN}Selected URLs for extraction by R1:{Colors.RESET}")
for url in selected_urls:
print(f"- {url}")
data = extract_company_info(selected_urls, objective, company, firecrawl_api_key)
if data and data.get('success') and data.get('data'):
# Deduplicate and consolidate the extracted data
consolidated_data = deduplicate_with_r1(data['data'], company, objective)
print(f"\n{Colors.GREEN}Consolidated and deduplicated data:{Colors.RESET}")
print(json.dumps(consolidated_data, indent=2))
if data:
print(f"{Colors.GREEN}Extraction completed successfully.{Colors.RESET}")
else:
print(f"{Colors.RED}Failed to extract the requested information. Try refining your prompt or choosing a different company.{Colors.RESET}")
if __name__ == "__main__":
main()
main()