diff --git a/examples/R1_web_crawler/R1_web_crawler.py b/examples/R1_web_crawler/R1_web_crawler.py index 10dac06c..29e719d6 100644 --- a/examples/R1_web_crawler/R1_web_crawler.py +++ b/examples/R1_web_crawler/R1_web_crawler.py @@ -1,10 +1,10 @@ import os import json +import time import requests from dotenv import load_dotenv from openai import OpenAI -from serpapi import GoogleSearch -from firecrawl import FirecrawlApp +from serpapi.google_search import GoogleSearch # ANSI color codes class Colors: @@ -22,17 +22,18 @@ load_dotenv() # Initialize clients client = OpenAI(api_key=os.getenv("DEEPSEEK_API_KEY"), base_url="https://api.deepseek.com") firecrawl_api_key = os.getenv("FIRECRAWL_API_KEY") +serp_api_key = os.getenv("SERP_API_KEY") def search_google(query): """Search Google using SerpAPI and return top results.""" print(f"{Colors.YELLOW}Searching Google for '{query}'...{Colors.RESET}") - search = GoogleSearch({"q": query, "api_key": os.getenv("SERP_API_KEY")}) + search = GoogleSearch({"q": query, "api_key": serp_api_key}) return search.get_dict().get("organic_results", []) def select_urls_with_r1(company, objective, serp_results): """ Use R1 to select the most relevant URLs from SERP results for the given company and objective. - Returns a JSON object with a "selected_urls" property that is an array of strings. + Returns a list of URLs. """ try: # Prepare the data for R1 @@ -44,7 +45,7 @@ def select_urls_with_r1(company, objective, serp_results): messages=[ { "role": "system", - "content": "You select URLs from the SERP results relevant to the company and objective." + "content": "You are a URL selector that always responds with valid JSON. You select URLs from the SERP results relevant to the company and objective. Your response must be a JSON object with a 'selected_urls' array property containing strings." }, { "role": "user", @@ -53,90 +54,127 @@ def select_urls_with_r1(company, objective, serp_results): f"Objective: {objective}\n" f"SERP Results: {json.dumps(serp_data)}\n\n" "Return a JSON object with a property 'selected_urls' that contains an array " - "of URLs most likely to help meet the objective. If you think the data might not be on the homepage, add a /* to the end of the URL. Do not return any social media links. For example: {\"selected_urls\": [\"https://example.com\", \"https://example2.com\"]}" + "of URLs most likely to help meet the objective. Add a /* to the end of the URL if you think it should search all of the pages in the site. Do not return any social media links. For example: {\"selected_urls\": [\"https://example.com\", \"https://example2.com\"]}" ) } - ], + ] ) - # The response is guaranteed to follow the specified JSON schema - result = json.loads(response.choices[0].message.content) - urls = result.get("selected_urls", []) - return urls + try: + # First try to parse as JSON + result = json.loads(response.choices[0].message.content) + if isinstance(result, dict) and "selected_urls" in result: + urls = result["selected_urls"] + else: + # If JSON doesn't have the expected structure, fall back to text parsing + response_text = response.choices[0].message.content + urls = [line.strip() for line in response_text.split('\n') + if line.strip().startswith(('http://', 'https://'))] + except json.JSONDecodeError: + # If JSON parsing fails, fall back to text parsing + response_text = response.choices[0].message.content + urls = [line.strip() for line in response_text.split('\n') + if line.strip().startswith(('http://', 'https://'))] + + # Clean up URLs - remove wildcards and trailing slashes + cleaned_urls = [url.replace('/*', '').rstrip('/') for url in urls] + cleaned_urls = [url for url in cleaned_urls if url] + + if not cleaned_urls: + print(f"{Colors.YELLOW}No valid URLs found.{Colors.RESET}") + return [] + + print(f"{Colors.CYAN}Selected URLs for extraction by R1:{Colors.RESET}") + for url in cleaned_urls: + print(f"- {url}") + + return cleaned_urls except Exception as e: print(f"{Colors.RED}Error selecting URLs with R1: {e}{Colors.RESET}") return [] - - def extract_company_info(urls, prompt, company, api_key): """Use requests to call Firecrawl's extract endpoint with selected URLs.""" - print(f"{Colors.YELLOW}Extracting structured data from the provided URLs using Firecrawl's /extract endpoint...{Colors.RESET}") - app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY")) + print(f"{Colors.YELLOW}Extracting structured data from the provided URLs using Firecrawl...{Colors.RESET}") + + headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {api_key}' + } + + payload = { + "urls": urls, + "prompt": prompt + " for " + company, + "enableWebSearch": True + } + try: - extract_prompt = prompt + " for " + company - response = app.extract(urls, {"prompt": extract_prompt, "enableWebSearch": True}) - print(response) - return response + response = requests.post( + "https://api.firecrawl.dev/v1/extract", + headers=headers, + json=payload, + timeout=30 + ) + + data = response.json() + + if not data.get('success'): + print(f"{Colors.RED}API returned error: {data.get('error', 'No error message')}{Colors.RESET}") + return None + + # Assuming Firecrawl provides a way to retrieve data with 'id' + extraction_id = data.get('id') + if not extraction_id: + print(f"{Colors.RED}No extraction ID found in response.{Colors.RESET}") + return None + + # Polling for the extraction result + return poll_firecrawl_result(extraction_id, api_key) + + except requests.exceptions.RequestException as e: + print(f"{Colors.RED}Request failed: {e}{Colors.RESET}") + return None + except json.JSONDecodeError as e: + print(f"{Colors.RED}Failed to parse response: {e}{Colors.RESET}") + return None except Exception as e: print(f"{Colors.RED}Failed to extract data: {e}{Colors.RESET}") return None -def deduplicate_with_r1(data, company, objective): - """Use R1 to deduplicate and consolidate extracted information.""" - print(f"{Colors.YELLOW}Deduplicating and consolidating information using R1...{Colors.RESET}") - - try: - # Ensure data is valid JSON before sending - if not data: - return {} - - response = client.chat.completions.create( - model="deepseek-reasoner", - messages=[ - { - "role": "system", - "content": "You are an expert at consolidating information and removing duplicates. Analyze the extracted data and provide a clean, consolidated response." - }, - { - "role": "user", - "content": ( - f"Company: {company}\n" - f"Objective: {objective}\n" - f"Extracted Data: {json.dumps(data, indent=2)}\n\n" - "Please analyze this data and:\n" - "1. Remove any duplicate information\n" - "2. Consolidate similar points\n" - "3. Format the response as a clean JSON object\n" - "4. Ensure all information is relevant to the objective\n" - "Return only the JSON response." - ) - } - ], - ) - - # Handle empty or invalid responses - response_text = response.choices[0].message.content.strip() - if not response_text: - return {} - +def poll_firecrawl_result(extraction_id, api_key, interval=5, max_attempts=12): + """Poll Firecrawl API to get the extraction result.""" + url = f"https://api.firecrawl.dev/v1/extract/{extraction_id}" + headers = { + 'Authorization': f'Bearer {api_key}' + } + + for attempt in range(1, max_attempts + 1): try: - consolidated_data = json.loads(response_text) - return consolidated_data + # print(f"{Colors.YELLOW}Polling for extraction result (Attempt {attempt}/{max_attempts})...{Colors.RESET}") + response = requests.get(url, headers=headers, timeout=30) + response.raise_for_status() + data = response.json() + + if data.get('success') and data.get('data'): + print(f"{Colors.GREEN}Data successfully extracted:{Colors.RESET}") + print(json.dumps(data['data'], indent=2)) + return data['data'] + elif data.get('success') and not data.get('data'): + time.sleep(interval) + else: + print(f"{Colors.RED}API Error: {data.get('error', 'No error message provided')}{Colors.RESET}") + return None + + except requests.exceptions.RequestException: + return None except json.JSONDecodeError: - # If JSON parsing fails, try to extract JSON from the response - # Look for content between curly braces - start = response_text.find('{') - end = response_text.rfind('}') - if start >= 0 and end >= 0: - json_str = response_text[start:end+1] - return json.loads(json_str) - return {} - - except Exception as e: - print(f"{Colors.RED}Error deduplicating data with R1: {e}{Colors.RESET}") - return data + return None + except Exception: + return None + + print(f"{Colors.RED}Max polling attempts reached. Extraction did not complete in time.{Colors.RESET}") + return None def main(): company = input(f"{Colors.BLUE}Enter the company name: {Colors.RESET}") @@ -154,20 +192,12 @@ def main(): print(f"{Colors.RED}R1 did not return any URLs.{Colors.RESET}") return - print(f"{Colors.CYAN}Selected URLs for extraction by R1:{Colors.RESET}") - for url in selected_urls: - print(f"- {url}") - data = extract_company_info(selected_urls, objective, company, firecrawl_api_key) - if data and data.get('success') and data.get('data'): - # Deduplicate and consolidate the extracted data - consolidated_data = deduplicate_with_r1(data['data'], company, objective) - - print(f"\n{Colors.GREEN}Consolidated and deduplicated data:{Colors.RESET}") - print(json.dumps(consolidated_data, indent=2)) + if data: + print(f"{Colors.GREEN}Extraction completed successfully.{Colors.RESET}") else: print(f"{Colors.RED}Failed to extract the requested information. Try refining your prompt or choosing a different company.{Colors.RESET}") if __name__ == "__main__": - main() + main()