mirror of
https://git.mirrors.martin98.com/https://github.com/mendableai/firecrawl
synced 2025-08-12 03:29:01 +08:00
Add detection of PDF/image sub-links and extract text via Gemini
This commit is contained in:
parent
c67b052f4a
commit
cb3bc5e445
@ -1,10 +1,14 @@
|
|||||||
import os
|
import os
|
||||||
from firecrawl import FirecrawlApp
|
from firecrawl import FirecrawlApp
|
||||||
import json
|
import json
|
||||||
|
import requests
|
||||||
|
from google.generativeai import types as genai_types
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
import google.generativeai as genai
|
import google.generativeai as genai
|
||||||
|
|
||||||
# ANSI color codes
|
# ANSI color codes
|
||||||
|
|
||||||
|
|
||||||
class Colors:
|
class Colors:
|
||||||
CYAN = '\033[96m'
|
CYAN = '\033[96m'
|
||||||
YELLOW = '\033[93m'
|
YELLOW = '\033[93m'
|
||||||
@ -14,6 +18,68 @@ class Colors:
|
|||||||
BLUE = '\033[94m'
|
BLUE = '\033[94m'
|
||||||
RESET = '\033[0m'
|
RESET = '\033[0m'
|
||||||
|
|
||||||
|
|
||||||
|
def is_pdf_url(u: str) -> bool:
|
||||||
|
return u.lower().split('?')[0].endswith('.pdf')
|
||||||
|
|
||||||
|
|
||||||
|
def is_image_url(u: str) -> bool:
|
||||||
|
exts = ['.jpg', '.jpeg', '.png', '.gif', '.webp', '.heic', '.heif']
|
||||||
|
url_no_q = u.lower().split('?')[0]
|
||||||
|
return any(url_no_q.endswith(ext) for ext in exts)
|
||||||
|
|
||||||
|
|
||||||
|
def gemini_extract_pdf_content(pdf_url):
|
||||||
|
"""
|
||||||
|
Downloads a PDF from pdf_url, then calls Gemini to extract text.
|
||||||
|
Returns a string with the extracted text only.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
pdf_data = requests.get(pdf_url, timeout=15).content
|
||||||
|
model = genai.GenerativeModel('gemini-pro')
|
||||||
|
response = model.generate_content([
|
||||||
|
genai_types.Part.from_bytes(pdf_data, mime_type='application/pdf'),
|
||||||
|
"Extract all textual information from this PDF. Return only text."
|
||||||
|
])
|
||||||
|
return response.text.strip()
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error using Gemini to process PDF '{pdf_url}': {str(e)}")
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def gemini_extract_image_data(image_url):
|
||||||
|
"""
|
||||||
|
Downloads an image from image_url, then calls Gemini to:
|
||||||
|
1) Summarize what's in the image
|
||||||
|
2) Return bounding boxes for the main objects
|
||||||
|
Returns a string merging the summary and bounding box info.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
image_data = requests.get(image_url, timeout=15).content
|
||||||
|
model = genai.GenerativeModel('gemini-pro')
|
||||||
|
|
||||||
|
# 1) Summarize
|
||||||
|
resp_summary = model.generate_content([
|
||||||
|
genai_types.Part.from_bytes(image_data, mime_type='image/jpeg'),
|
||||||
|
"Describe the contents of this image in a short paragraph."
|
||||||
|
])
|
||||||
|
summary_text = resp_summary.text.strip()
|
||||||
|
|
||||||
|
# 2) Get bounding boxes
|
||||||
|
resp_bbox = model.generate_content([
|
||||||
|
genai_types.Part.from_bytes(image_data, mime_type='image/jpeg'),
|
||||||
|
("Return bounding boxes for the objects in this image in the "
|
||||||
|
"format: [{'object':'cat','bbox':[y_min,x_min,y_max,x_max]}, ...]. "
|
||||||
|
"Coordinates 0-1000. Output valid JSON only.")
|
||||||
|
])
|
||||||
|
bbox_text = resp_bbox.text.strip()
|
||||||
|
|
||||||
|
return f"**Image Summary**:\n{summary_text}\n\n**Bounding Boxes**:\n{bbox_text}"
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error using Gemini to process Image '{image_url}': {str(e)}")
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
# Load environment variables
|
# Load environment variables
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
@ -25,6 +91,7 @@ gemini_api_key = os.getenv("GEMINI_API_KEY")
|
|||||||
app = FirecrawlApp(api_key=firecrawl_api_key)
|
app = FirecrawlApp(api_key=firecrawl_api_key)
|
||||||
genai.configure(api_key=gemini_api_key) # Configure Gemini API
|
genai.configure(api_key=gemini_api_key) # Configure Gemini API
|
||||||
|
|
||||||
|
|
||||||
def find_relevant_page_via_map(objective, url, app):
|
def find_relevant_page_via_map(objective, url, app):
|
||||||
try:
|
try:
|
||||||
print(f"{Colors.CYAN}Understood. The objective is: {objective}{Colors.RESET}")
|
print(f"{Colors.CYAN}Understood. The objective is: {objective}{Colors.RESET}")
|
||||||
@ -35,14 +102,18 @@ def find_relevant_page_via_map(objective, url, app):
|
|||||||
Respond with ONLY 1-2 words, no other text or formatting.
|
Respond with ONLY 1-2 words, no other text or formatting.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
print(f"{Colors.YELLOW}Analyzing objective to determine optimal search parameter...{Colors.RESET}")
|
print(
|
||||||
model = genai.GenerativeModel('gemini-pro') # Use gemini-pro instead of gemini-2.0-flash
|
f"{Colors.YELLOW}Analyzing objective to determine optimal search parameter...{Colors.RESET}")
|
||||||
|
# Use gemini-pro instead of gemini-2.0-flash
|
||||||
|
model = genai.GenerativeModel('gemini-pro')
|
||||||
response = model.generate_content(map_prompt)
|
response = model.generate_content(map_prompt)
|
||||||
|
|
||||||
map_search_parameter = response.text.strip()
|
map_search_parameter = response.text.strip()
|
||||||
print(f"{Colors.GREEN}Optimal search parameter identified: {map_search_parameter}{Colors.RESET}")
|
print(
|
||||||
|
f"{Colors.GREEN}Optimal search parameter identified: {map_search_parameter}{Colors.RESET}")
|
||||||
|
|
||||||
print(f"{Colors.YELLOW}Mapping website using the identified search parameter...{Colors.RESET}")
|
print(
|
||||||
|
f"{Colors.YELLOW}Mapping website using the identified search parameter...{Colors.RESET}")
|
||||||
map_website = app.map_url(url, params={"search": map_search_parameter})
|
map_website = app.map_url(url, params={"search": map_search_parameter})
|
||||||
|
|
||||||
print(f"{Colors.MAGENTA}Debug - Map response structure: {json.dumps(map_website, indent=2)}{Colors.RESET}")
|
print(f"{Colors.MAGENTA}Debug - Map response structure: {json.dumps(map_website, indent=2)}{Colors.RESET}")
|
||||||
@ -105,7 +176,8 @@ def find_relevant_page_via_map(objective, url, app):
|
|||||||
end_idx = response_text.rfind(']') + 1
|
end_idx = response_text.rfind(']') + 1
|
||||||
json_str = response_text[start_idx:end_idx]
|
json_str = response_text[start_idx:end_idx]
|
||||||
|
|
||||||
print(f"{Colors.MAGENTA}Debug - Extracted JSON string:{Colors.RESET}")
|
print(
|
||||||
|
f"{Colors.MAGENTA}Debug - Extracted JSON string:{Colors.RESET}")
|
||||||
print(json_str)
|
print(json_str)
|
||||||
|
|
||||||
ranked_results = json.loads(json_str)
|
ranked_results = json.loads(json_str)
|
||||||
@ -118,7 +190,8 @@ def find_relevant_page_via_map(objective, url, app):
|
|||||||
print(f"{Colors.CYAN}Top 3 ranked URLs:{Colors.RESET}")
|
print(f"{Colors.CYAN}Top 3 ranked URLs:{Colors.RESET}")
|
||||||
for result in ranked_results:
|
for result in ranked_results:
|
||||||
print(f"{Colors.GREEN}URL: {result['url']}{Colors.RESET}")
|
print(f"{Colors.GREEN}URL: {result['url']}{Colors.RESET}")
|
||||||
print(f"{Colors.YELLOW}Relevance Score: {result['relevance_score']}{Colors.RESET}")
|
print(
|
||||||
|
f"{Colors.YELLOW}Relevance Score: {result['relevance_score']}{Colors.RESET}")
|
||||||
print(f"{Colors.BLUE}Reason: {result['reason']}{Colors.RESET}")
|
print(f"{Colors.BLUE}Reason: {result['reason']}{Colors.RESET}")
|
||||||
print("---")
|
print("---")
|
||||||
|
|
||||||
@ -138,9 +211,11 @@ def find_relevant_page_via_map(objective, url, app):
|
|||||||
return links
|
return links
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"{Colors.RED}Error encountered during relevant page identification: {str(e)}{Colors.RESET}")
|
print(
|
||||||
|
f"{Colors.RED}Error encountered during relevant page identification: {str(e)}{Colors.RESET}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def find_objective_in_top_pages(map_website, objective, app):
|
def find_objective_in_top_pages(map_website, objective, app):
|
||||||
try:
|
try:
|
||||||
if not map_website:
|
if not map_website:
|
||||||
@ -148,12 +223,38 @@ def find_objective_in_top_pages(map_website, objective, app):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
top_links = map_website[:3]
|
top_links = map_website[:3]
|
||||||
print(f"{Colors.CYAN}Proceeding to analyze top {len(top_links)} links: {top_links}{Colors.RESET}")
|
print(
|
||||||
|
f"{Colors.CYAN}Proceeding to analyze top {len(top_links)} links: {top_links}{Colors.RESET}")
|
||||||
|
|
||||||
for link in top_links:
|
for link in top_links:
|
||||||
print(f"{Colors.YELLOW}Initiating scrape of page: {link}{Colors.RESET}")
|
print(f"{Colors.YELLOW}Initiating scrape of page: {link}{Colors.RESET}")
|
||||||
scrape_result = app.scrape_url(link, params={'formats': ['markdown']})
|
# Include 'links' so we can parse sub-links for PDFs or images
|
||||||
print(f"{Colors.GREEN}Page scraping completed successfully.{Colors.RESET}")
|
scrape_result = app.scrape_url(
|
||||||
|
link, params={'formats': ['markdown', 'links']})
|
||||||
|
print(
|
||||||
|
f"{Colors.GREEN}Page scraping completed successfully.{Colors.RESET}")
|
||||||
|
|
||||||
|
# Check sub-links for PDFs or images
|
||||||
|
pdf_image_append = ""
|
||||||
|
sub_links = scrape_result.get('links', [])
|
||||||
|
for sublink in sub_links:
|
||||||
|
if is_pdf_url(sublink):
|
||||||
|
print(
|
||||||
|
f"{Colors.BLUE}Detected PDF in sub-link: {sublink}{Colors.RESET}")
|
||||||
|
extracted_pdf_text = gemini_extract_pdf_content(sublink)
|
||||||
|
if extracted_pdf_text:
|
||||||
|
pdf_image_append += f"\n\n[Sub-link PDF] {sublink}\n{extracted_pdf_text}"
|
||||||
|
elif is_image_url(sublink):
|
||||||
|
print(
|
||||||
|
f"{Colors.BLUE}Detected image in sub-link: {sublink}{Colors.RESET}")
|
||||||
|
extracted_img_text = gemini_extract_image_data(sublink)
|
||||||
|
if extracted_img_text:
|
||||||
|
pdf_image_append += f"\n\n[Sub-link Image] {sublink}\n{extracted_img_text}"
|
||||||
|
|
||||||
|
# Append extracted PDF/image text to the main markdown for the page
|
||||||
|
if pdf_image_append:
|
||||||
|
scrape_result[
|
||||||
|
'markdown'] += f"\n\n---\n**Additional Gemini Extraction:**\n{pdf_image_append}\n"
|
||||||
|
|
||||||
check_prompt = f"""
|
check_prompt = f"""
|
||||||
Analyze this content to find: {objective}
|
Analyze this content to find: {objective}
|
||||||
@ -167,7 +268,8 @@ def find_objective_in_top_pages(map_website, objective, app):
|
|||||||
- No other text or explanations
|
- No other text or explanations
|
||||||
"""
|
"""
|
||||||
|
|
||||||
response = genai.GenerativeModel('gemini-pro').generate_content(check_prompt)
|
response = genai.GenerativeModel(
|
||||||
|
'gemini-pro').generate_content(check_prompt)
|
||||||
|
|
||||||
result = response.text.strip()
|
result = response.text.strip()
|
||||||
|
|
||||||
@ -175,7 +277,8 @@ def find_objective_in_top_pages(map_website, objective, app):
|
|||||||
print(result)
|
print(result)
|
||||||
|
|
||||||
if result != "Objective not met":
|
if result != "Objective not met":
|
||||||
print(f"{Colors.GREEN}Objective potentially fulfilled. Relevant information identified.{Colors.RESET}")
|
print(
|
||||||
|
f"{Colors.GREEN}Objective potentially fulfilled. Relevant information identified.{Colors.RESET}")
|
||||||
try:
|
try:
|
||||||
if '{' in result and '}' in result:
|
if '{' in result and '}' in result:
|
||||||
start_idx = result.find('{')
|
start_idx = result.find('{')
|
||||||
@ -183,19 +286,24 @@ def find_objective_in_top_pages(map_website, objective, app):
|
|||||||
json_str = result[start_idx:end_idx]
|
json_str = result[start_idx:end_idx]
|
||||||
return json.loads(json_str)
|
return json.loads(json_str)
|
||||||
else:
|
else:
|
||||||
print(f"{Colors.RED}No JSON object found in response{Colors.RESET}")
|
print(
|
||||||
|
f"{Colors.RED}No JSON object found in response{Colors.RESET}")
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
print(f"{Colors.RED}Error in parsing response. Proceeding to next page...{Colors.RESET}")
|
print(
|
||||||
|
f"{Colors.RED}Error in parsing response. Proceeding to next page...{Colors.RESET}")
|
||||||
else:
|
else:
|
||||||
print(f"{Colors.YELLOW}Objective not met on this page. Proceeding to next link...{Colors.RESET}")
|
print(
|
||||||
|
f"{Colors.YELLOW}Objective not met on this page. Proceeding to next link...{Colors.RESET}")
|
||||||
|
|
||||||
print(f"{Colors.RED}All available pages analyzed. Objective not fulfilled in examined content.{Colors.RESET}")
|
print(f"{Colors.RED}All available pages analyzed. Objective not fulfilled in examined content.{Colors.RESET}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"{Colors.RED}Error encountered during page analysis: {str(e)}{Colors.RESET}")
|
print(
|
||||||
|
f"{Colors.RED}Error encountered during page analysis: {str(e)}{Colors.RESET}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
url = input(f"{Colors.BLUE}Enter the website to crawl : {Colors.RESET}")
|
url = input(f"{Colors.BLUE}Enter the website to crawl : {Colors.RESET}")
|
||||||
objective = input(f"{Colors.BLUE}Enter your objective: {Colors.RESET}")
|
objective = input(f"{Colors.BLUE}Enter your objective: {Colors.RESET}")
|
||||||
@ -208,12 +316,15 @@ def main():
|
|||||||
result = find_objective_in_top_pages(map_website, objective, app)
|
result = find_objective_in_top_pages(map_website, objective, app)
|
||||||
|
|
||||||
if result:
|
if result:
|
||||||
print(f"{Colors.GREEN}Objective successfully fulfilled. Extracted information:{Colors.RESET}")
|
print(
|
||||||
|
f"{Colors.GREEN}Objective successfully fulfilled. Extracted information:{Colors.RESET}")
|
||||||
print(f"{Colors.MAGENTA}{json.dumps(result, indent=2)}{Colors.RESET}")
|
print(f"{Colors.MAGENTA}{json.dumps(result, indent=2)}{Colors.RESET}")
|
||||||
else:
|
else:
|
||||||
print(f"{Colors.RED}Unable to fulfill the objective with the available content.{Colors.RESET}")
|
print(
|
||||||
|
f"{Colors.RED}Unable to fulfill the objective with the available content.{Colors.RESET}")
|
||||||
else:
|
else:
|
||||||
print(f"{Colors.RED}No relevant pages identified. Consider refining the search parameters or trying a different website.{Colors.RESET}")
|
print(f"{Colors.RED}No relevant pages identified. Consider refining the search parameters or trying a different website.{Colors.RESET}")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
Loading…
x
Reference in New Issue
Block a user