mirror of
https://git.mirrors.martin98.com/https://github.com/mendableai/firecrawl
synced 2025-04-22 14:09:45 +08:00
fix: update gemini library. extract pdf links from markdowncontent
This commit is contained in:
parent
cb3bc5e445
commit
32f9897670
@ -1,10 +1,23 @@
|
|||||||
import os
|
import os
|
||||||
from firecrawl import FirecrawlApp
|
from firecrawl import FirecrawlApp
|
||||||
import json
|
import json
|
||||||
|
import re
|
||||||
import requests
|
import requests
|
||||||
from google.generativeai import types as genai_types
|
from requests.exceptions import RequestException
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
import google.generativeai as genai
|
import google.genai as genai
|
||||||
|
# Load environment variables
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
# Retrieve API keys from environment variables
|
||||||
|
firecrawl_api_key = os.getenv("FIRECRAWL_API_KEY")
|
||||||
|
gemini_api_key = os.getenv("GEMINI_API_KEY")
|
||||||
|
|
||||||
|
# Initialize the FirecrawlApp and Gemini client
|
||||||
|
app = FirecrawlApp(api_key=firecrawl_api_key)
|
||||||
|
client = genai.Client(api_key=gemini_api_key) # Create Gemini client
|
||||||
|
model_name = "gemini-2.0-flash"
|
||||||
|
types = genai.types
|
||||||
|
|
||||||
# ANSI color codes
|
# ANSI color codes
|
||||||
|
|
||||||
@ -19,28 +32,37 @@ class Colors:
|
|||||||
RESET = '\033[0m'
|
RESET = '\033[0m'
|
||||||
|
|
||||||
|
|
||||||
def is_pdf_url(u: str) -> bool:
|
def pdf_size_in_mb(data: bytes) -> float:
|
||||||
return u.lower().split('?')[0].endswith('.pdf')
|
"""Utility function to estimate PDF size in MB from raw bytes."""
|
||||||
|
return len(data) / (1024 * 1024)
|
||||||
|
|
||||||
|
|
||||||
def is_image_url(u: str) -> bool:
|
def gemini_extract_pdf_content(pdf_url, objective):
|
||||||
exts = ['.jpg', '.jpeg', '.png', '.gif', '.webp', '.heic', '.heif']
|
|
||||||
url_no_q = u.lower().split('?')[0]
|
|
||||||
return any(url_no_q.endswith(ext) for ext in exts)
|
|
||||||
|
|
||||||
|
|
||||||
def gemini_extract_pdf_content(pdf_url):
|
|
||||||
"""
|
"""
|
||||||
Downloads a PDF from pdf_url, then calls Gemini to extract text.
|
Downloads a PDF from pdf_url, then calls Gemini to extract text.
|
||||||
Returns a string with the extracted text only.
|
Returns a string with the extracted text only.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
pdf_data = requests.get(pdf_url, timeout=15).content
|
pdf_data = requests.get(pdf_url, timeout=15).content
|
||||||
model = genai.GenerativeModel('gemini-pro')
|
size_mb = pdf_size_in_mb(pdf_data)
|
||||||
response = model.generate_content([
|
if size_mb > 15:
|
||||||
genai_types.Part.from_bytes(pdf_data, mime_type='application/pdf'),
|
print(
|
||||||
"Extract all textual information from this PDF. Return only text."
|
f"{Colors.YELLOW}Warning: PDF size is {size_mb} MB. Skipping PDF extraction.{Colors.RESET}")
|
||||||
])
|
return ""
|
||||||
|
|
||||||
|
prompt = f"""
|
||||||
|
The objective is: {objective}.
|
||||||
|
From this PDF, extract only the text that helps address this objective.
|
||||||
|
If it contains no relevant info, return an empty string.
|
||||||
|
"""
|
||||||
|
response = client.models.generate_content(
|
||||||
|
model=model_name,
|
||||||
|
contents=[
|
||||||
|
types.Part.from_bytes(
|
||||||
|
data=pdf_data, mime_type="application/pdf"),
|
||||||
|
prompt
|
||||||
|
]
|
||||||
|
)
|
||||||
return response.text.strip()
|
return response.text.strip()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error using Gemini to process PDF '{pdf_url}': {str(e)}")
|
print(f"Error using Gemini to process PDF '{pdf_url}': {str(e)}")
|
||||||
@ -51,45 +73,53 @@ def gemini_extract_image_data(image_url):
|
|||||||
"""
|
"""
|
||||||
Downloads an image from image_url, then calls Gemini to:
|
Downloads an image from image_url, then calls Gemini to:
|
||||||
1) Summarize what's in the image
|
1) Summarize what's in the image
|
||||||
2) Return bounding boxes for the main objects
|
Returns a string with the summary.
|
||||||
Returns a string merging the summary and bounding box info.
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
print(f"Gemini IMAGE extraction from: {image_url}")
|
||||||
image_data = requests.get(image_url, timeout=15).content
|
image_data = requests.get(image_url, timeout=15).content
|
||||||
model = genai.GenerativeModel('gemini-pro')
|
|
||||||
|
|
||||||
# 1) Summarize
|
# 1) Summarize
|
||||||
resp_summary = model.generate_content([
|
resp_summary = client.models.generate_content([
|
||||||
genai_types.Part.from_bytes(image_data, mime_type='image/jpeg'),
|
"Describe the contents of this image in a short paragraph.",
|
||||||
"Describe the contents of this image in a short paragraph."
|
types.Part.from_bytes(data=image_data, mime_type="image/jpeg"),
|
||||||
])
|
])
|
||||||
summary_text = resp_summary.text.strip()
|
summary_text = resp_summary.text.strip()
|
||||||
|
|
||||||
# 2) Get bounding boxes
|
return f"**Image Summary**:\n{summary_text}"
|
||||||
resp_bbox = model.generate_content([
|
|
||||||
genai_types.Part.from_bytes(image_data, mime_type='image/jpeg'),
|
|
||||||
("Return bounding boxes for the objects in this image in the "
|
|
||||||
"format: [{'object':'cat','bbox':[y_min,x_min,y_max,x_max]}, ...]. "
|
|
||||||
"Coordinates 0-1000. Output valid JSON only.")
|
|
||||||
])
|
|
||||||
bbox_text = resp_bbox.text.strip()
|
|
||||||
|
|
||||||
return f"**Image Summary**:\n{summary_text}\n\n**Bounding Boxes**:\n{bbox_text}"
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error using Gemini to process Image '{image_url}': {str(e)}")
|
print(f"Error using Gemini to process Image '{image_url}': {str(e)}")
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
|
||||||
# Load environment variables
|
def extract_urls_from_markdown(markdown_text):
|
||||||
load_dotenv()
|
"""
|
||||||
|
Simple regex-based approach to extract potential URLs from a markdown string.
|
||||||
|
We look for http(s)://someurl up until a space or parenthesis or quote, etc.
|
||||||
|
"""
|
||||||
|
pattern = r'(https?://[^\s\'")]+)'
|
||||||
|
found = re.findall(pattern, markdown_text)
|
||||||
|
return list(set(found)) # unique them
|
||||||
|
|
||||||
# Retrieve API keys from environment variables
|
|
||||||
firecrawl_api_key = os.getenv("FIRECRAWL_API_KEY")
|
|
||||||
gemini_api_key = os.getenv("GEMINI_API_KEY")
|
|
||||||
|
|
||||||
# Initialize the FirecrawlApp and Gemini client
|
def detect_mime_type(url, timeout=8):
|
||||||
app = FirecrawlApp(api_key=firecrawl_api_key)
|
"""
|
||||||
genai.configure(api_key=gemini_api_key) # Configure Gemini API
|
Attempt a HEAD request to detect the Content-Type. Return 'pdf', 'image' or None if undetermined.
|
||||||
|
Also validates image extensions for supported formats.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
resp = requests.head(url, timeout=timeout, allow_redirects=True)
|
||||||
|
ctype = resp.headers.get('Content-Type', '').lower()
|
||||||
|
exts = ['.jpg', '.jpeg', '.png', '.gif', '.webp', '.heic', '.heif']
|
||||||
|
|
||||||
|
if 'pdf' in ctype:
|
||||||
|
return 'pdf'
|
||||||
|
elif ctype.startswith('image/') and any(url.lower().endswith(ext) for ext in exts):
|
||||||
|
return 'image'
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
except RequestException as e:
|
||||||
|
print(f"Warning: HEAD request failed for {url}. Error: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def find_relevant_page_via_map(objective, url, app):
|
def find_relevant_page_via_map(objective, url, app):
|
||||||
@ -105,8 +135,10 @@ def find_relevant_page_via_map(objective, url, app):
|
|||||||
print(
|
print(
|
||||||
f"{Colors.YELLOW}Analyzing objective to determine optimal search parameter...{Colors.RESET}")
|
f"{Colors.YELLOW}Analyzing objective to determine optimal search parameter...{Colors.RESET}")
|
||||||
# Use gemini-pro instead of gemini-2.0-flash
|
# Use gemini-pro instead of gemini-2.0-flash
|
||||||
model = genai.GenerativeModel('gemini-pro')
|
response = client.models.generate_content(
|
||||||
response = model.generate_content(map_prompt)
|
model=model_name,
|
||||||
|
contents=[map_prompt]
|
||||||
|
)
|
||||||
|
|
||||||
map_search_parameter = response.text.strip()
|
map_search_parameter = response.text.strip()
|
||||||
print(
|
print(
|
||||||
@ -160,8 +192,10 @@ def find_relevant_page_via_map(objective, url, app):
|
|||||||
{json.dumps(links, indent=2)}"""
|
{json.dumps(links, indent=2)}"""
|
||||||
|
|
||||||
print(f"{Colors.YELLOW}Ranking URLs by relevance to objective...{Colors.RESET}")
|
print(f"{Colors.YELLOW}Ranking URLs by relevance to objective...{Colors.RESET}")
|
||||||
model = genai.GenerativeModel('gemini-pro')
|
response = client.models.generate_content(
|
||||||
response = model.generate_content(rank_prompt)
|
model=model_name,
|
||||||
|
contents=[rank_prompt]
|
||||||
|
)
|
||||||
|
|
||||||
print(f"{Colors.MAGENTA}Debug - Raw Gemini response:{Colors.RESET}")
|
print(f"{Colors.MAGENTA}Debug - Raw Gemini response:{Colors.RESET}")
|
||||||
print(response.text)
|
print(response.text)
|
||||||
@ -228,28 +262,35 @@ def find_objective_in_top_pages(map_website, objective, app):
|
|||||||
|
|
||||||
for link in top_links:
|
for link in top_links:
|
||||||
print(f"{Colors.YELLOW}Initiating scrape of page: {link}{Colors.RESET}")
|
print(f"{Colors.YELLOW}Initiating scrape of page: {link}{Colors.RESET}")
|
||||||
# Include 'links' so we can parse sub-links for PDFs or images
|
|
||||||
scrape_result = app.scrape_url(
|
scrape_result = app.scrape_url(
|
||||||
link, params={'formats': ['markdown', 'links']})
|
link, params={'formats': ['markdown']})
|
||||||
print(
|
print(
|
||||||
f"{Colors.GREEN}Page scraping completed successfully.{Colors.RESET}")
|
f"{Colors.GREEN}Page scraping completed successfully.{Colors.RESET}")
|
||||||
|
|
||||||
# Check sub-links for PDFs or images
|
# Now detect any PDF or image URLs in the Markdown text
|
||||||
|
page_markdown = scrape_result.get('markdown', '')
|
||||||
|
if not page_markdown:
|
||||||
|
print(
|
||||||
|
f"{Colors.RED}No markdown returned for {link}, skipping...{Colors.RESET}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
found_urls = extract_urls_from_markdown(page_markdown)
|
||||||
pdf_image_append = ""
|
pdf_image_append = ""
|
||||||
sub_links = scrape_result.get('links', [])
|
|
||||||
for sublink in sub_links:
|
for sub_url in found_urls:
|
||||||
if is_pdf_url(sublink):
|
mime_type_short = detect_mime_type(sub_url)
|
||||||
|
if mime_type_short == 'pdf':
|
||||||
print(
|
print(
|
||||||
f"{Colors.BLUE}Detected PDF in sub-link: {sublink}{Colors.RESET}")
|
f"{Colors.YELLOW} Detected PDF: {sub_url}. Extracting content...{Colors.RESET}")
|
||||||
extracted_pdf_text = gemini_extract_pdf_content(sublink)
|
pdf_content = gemini_extract_pdf_content(sub_url)
|
||||||
if extracted_pdf_text:
|
if pdf_content:
|
||||||
pdf_image_append += f"\n\n[Sub-link PDF] {sublink}\n{extracted_pdf_text}"
|
pdf_image_append += f"\n\n---\n[PDF from {sub_url}]:\n{pdf_content}"
|
||||||
elif is_image_url(sublink):
|
elif mime_type_short == 'image':
|
||||||
print(
|
print(
|
||||||
f"{Colors.BLUE}Detected image in sub-link: {sublink}{Colors.RESET}")
|
f"{Colors.YELLOW} Detected Image: {sub_url}. Extracting content...{Colors.RESET}")
|
||||||
extracted_img_text = gemini_extract_image_data(sublink)
|
image_content = gemini_extract_image_data(sub_url)
|
||||||
if extracted_img_text:
|
if image_content:
|
||||||
pdf_image_append += f"\n\n[Sub-link Image] {sublink}\n{extracted_img_text}"
|
pdf_image_append += f"\n\n---\n[Image from {sub_url}]:\n{image_content}"
|
||||||
|
|
||||||
# Append extracted PDF/image text to the main markdown for the page
|
# Append extracted PDF/image text to the main markdown for the page
|
||||||
if pdf_image_append:
|
if pdf_image_append:
|
||||||
@ -260,7 +301,8 @@ def find_objective_in_top_pages(map_website, objective, app):
|
|||||||
Analyze this content to find: {objective}
|
Analyze this content to find: {objective}
|
||||||
If found, return ONLY a JSON object with information related to the objective. If not found, respond EXACTLY with: Objective not met
|
If found, return ONLY a JSON object with information related to the objective. If not found, respond EXACTLY with: Objective not met
|
||||||
|
|
||||||
Content to analyze: {scrape_result['markdown']}
|
Content to analyze:
|
||||||
|
{scrape_result['markdown']}
|
||||||
|
|
||||||
Remember:
|
Remember:
|
||||||
- Return valid JSON if information is found
|
- Return valid JSON if information is found
|
||||||
@ -268,8 +310,10 @@ def find_objective_in_top_pages(map_website, objective, app):
|
|||||||
- No other text or explanations
|
- No other text or explanations
|
||||||
"""
|
"""
|
||||||
|
|
||||||
response = genai.GenerativeModel(
|
response = client.models.generate_content(
|
||||||
'gemini-pro').generate_content(check_prompt)
|
model=model_name,
|
||||||
|
contents=[check_prompt]
|
||||||
|
)
|
||||||
|
|
||||||
result = response.text.strip()
|
result = response.text.strip()
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user