Merge pull request #946 from BexTuychiev/price-tracker

Add assets for the Automated Amazon Price Tracking article
This commit is contained in:
Eric Ciarla 2024-12-10 10:27:42 -05:00 committed by GitHub
commit 5d90a6c1cd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 6252 additions and 0 deletions

View File

@ -0,0 +1,33 @@
name: Price Check
on:
schedule:
# Runs every 6 hours
- cron: "0 0,6,12,18 * * *"
workflow_dispatch: # Allows manual triggering
jobs:
check-prices:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.10"
cache: "pip"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run price checker
env:
FIRECRAWL_API_KEY: ${{ secrets.FIRECRAWL_API_KEY }}
POSTGRES_URL: ${{ secrets.POSTGRES_URL }}
DISCORD_WEBHOOK_URL: ${{ secrets.DISCORD_WEBHOOK_URL }}
run: python check_prices.py

View File

@ -0,0 +1 @@
.venv

View File

@ -0,0 +1,31 @@
# Automated Price Tracking System
A robust price tracking system that monitors product prices across e-commerce websites and notifies users of price changes through Discord.
## Features
- Automated price checking every 6 hours
- Support for multiple e-commerce platforms through Firecrawl API
- Discord notifications for price changes
- Historical price data storage in PostgreSQL database
- Interactive price history visualization with Streamlit
## Setup
1. Clone the repository
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Configure environment variables:
```bash
cp .env.example .env
```
Then edit `.env` with your:
- Discord webhook URL
- Database credentials
- Firecrawl API key

View File

@ -0,0 +1,49 @@
import os
import asyncio
from database import Database
from dotenv import load_dotenv
from firecrawl import FirecrawlApp
from scraper import scrape_product
from notifications import send_price_alert
load_dotenv()
db = Database(os.getenv("POSTGRES_URL"))
app = FirecrawlApp()
# Threshold percentage for price drop alerts (e.g., 5% = 0.05)
PRICE_DROP_THRESHOLD = 0.05
async def check_prices():
products = db.get_all_products()
product_urls = set(product.url for product in products)
for product_url in product_urls:
# Get the price history
price_history = db.get_price_history(product_url)
if not price_history:
continue
# Get the earliest recorded price
earliest_price = price_history[-1].price
# Retrieve updated product data
updated_product = scrape_product(product_url)
current_price = updated_product["price"]
# Add the price to the database
db.add_price(updated_product)
print(f"Added new price entry for {updated_product['name']}")
# Check if price dropped below threshold
if earliest_price > 0: # Avoid division by zero
price_drop = (earliest_price - current_price) / earliest_price
if price_drop >= PRICE_DROP_THRESHOLD:
await send_price_alert(
updated_product["name"], earliest_price, current_price, product_url
)
if __name__ == "__main__":
asyncio.run(check_prices())

View File

@ -0,0 +1,134 @@
from sqlalchemy import create_engine, Column, String, Float, DateTime, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship, declarative_base
from datetime import datetime
Base = declarative_base()
class Product(Base):
__tablename__ = "products"
url = Column(String, primary_key=True)
prices = relationship(
"PriceHistory", back_populates="product", cascade="all, delete-orphan"
)
class PriceHistory(Base):
__tablename__ = "price_histories"
id = Column(String, primary_key=True)
product_url = Column(String, ForeignKey("products.url"))
name = Column(String, nullable=False)
price = Column(Float, nullable=False)
currency = Column(String, nullable=False)
main_image_url = Column(String)
timestamp = Column(DateTime, nullable=False)
product = relationship("Product", back_populates="prices")
class Database:
def __init__(self, connection_string):
self.engine = create_engine(connection_string)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
def add_product(self, url):
session = self.Session()
try:
# Create the product entry
product = Product(url=url)
session.merge(product) # merge will update if exists, insert if not
session.commit()
finally:
session.close()
def product_exists(self, url):
session = self.Session()
try:
return session.query(Product).filter(Product.url == url).first() is not None
finally:
session.close()
def add_price(self, product_data):
session = self.Session()
try:
# First ensure the product exists
if not self.product_exists(product_data["url"]):
# Create the product if it doesn't exist
product = Product(url=product_data["url"])
session.add(product)
session.flush() # Flush to ensure the product is created before adding price
# Convert timestamp string to datetime if it's a string
timestamp = product_data["timestamp"]
if isinstance(timestamp, str):
timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H-%M")
price_history = PriceHistory(
id=f"{product_data['url']}_{timestamp.strftime('%Y%m%d%H%M%S')}",
product_url=product_data["url"],
name=product_data["name"],
price=product_data["price"],
currency=product_data["currency"],
main_image_url=product_data["main_image_url"],
timestamp=timestamp,
)
session.add(price_history)
session.commit()
finally:
session.close()
def get_all_products(self):
session = self.Session()
try:
return session.query(Product).all()
finally:
session.close()
def get_price_history(self, url):
"""Get price history for a product"""
session = self.Session()
try:
return (
session.query(PriceHistory)
.filter(PriceHistory.product_url == url)
.order_by(PriceHistory.timestamp.desc())
.all()
)
finally:
session.close()
def remove_all_products(self):
session = self.Session()
try:
# First delete all price histories
session.query(PriceHistory).delete()
# Then delete all products
session.query(Product).delete()
session.commit()
finally:
session.close()
# def remove_product(self, url):
# """Remove a product and its price history"""
# session = self.Session()
# try:
# product = session.query(Product).filter(Product.url == url).first()
# if product:
# session.delete(
# product
# ) # This will also delete associated price history due to cascade
# session.commit()
# finally:
# session.close()
if __name__ == "__main__":
from dotenv import load_dotenv
import os
load_dotenv()
db = Database(os.getenv("POSTGRES_URL"))
db.remove_all_products()

View File

@ -0,0 +1,36 @@
from dotenv import load_dotenv
import os
import aiohttp
import asyncio
load_dotenv()
async def send_price_alert(
product_name: str, old_price: float, new_price: float, url: str
):
"""Send a price drop alert to Discord"""
drop_percentage = ((old_price - new_price) / old_price) * 100
message = {
"embeds": [
{
"title": "Price Drop Alert! 🎉",
"description": f"**{product_name}**\nPrice dropped by {drop_percentage:.1f}%!\n"
f"Old price: ${old_price:.2f}\n"
f"New price: ${new_price:.2f}\n"
f"[View Product]({url})",
"color": 3066993,
}
]
}
try:
async with aiohttp.ClientSession() as session:
await session.post(os.getenv("DISCORD_WEBHOOK_URL"), json=message)
except Exception as e:
print(f"Error sending Discord notification: {e}")
if __name__ == "__main__":
asyncio.run(send_price_alert("Test Product", 100, 90, "https://www.google.com"))

View File

@ -0,0 +1,9 @@
streamlit
firecrawl-py
pydantic
psycopg2-binary
python-dotenv
sqlalchemy==2.0.35
pandas
plotly
aiohttp

View File

@ -0,0 +1,38 @@
from firecrawl import FirecrawlApp
from pydantic import BaseModel, Field
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
app = FirecrawlApp()
class Product(BaseModel):
"""Schema for creating a new product"""
url: str = Field(description="The URL of the product")
name: str = Field(description="The product name/title")
price: float = Field(description="The current price of the product")
currency: str = Field(description="Currency code (USD, EUR, etc)")
main_image_url: str = Field(description="The URL of the main image of the product")
def scrape_product(url: str):
extracted_data = app.scrape_url(
url,
params={
"formats": ["extract"],
"extract": {"schema": Product.model_json_schema()},
},
)
# Add the scraping date to the extracted data
extracted_data["extract"]["timestamp"] = datetime.utcnow()
return extracted_data["extract"]
if __name__ == "__main__":
product = "https://www.amazon.com/gp/product/B002U21ZZK/"
print(scrape_product(product))

View File

@ -0,0 +1,86 @@
import os
import streamlit as st
import pandas as pd
import plotly.express as px
from utils import is_valid_url
from database import Database
from dotenv import load_dotenv
from scraper import scrape_product
load_dotenv()
st.set_page_config(page_title="Price Tracker", page_icon="📊", layout="wide")
with st.spinner("Loading database..."):
db = Database(os.getenv("POSTGRES_URL"))
# Set up sidebar
with st.sidebar:
st.title("Add New Product")
product_url = st.text_input("Product URL")
add_button = st.button("Add Product")
if add_button:
if not product_url:
st.error("Please enter a product URL")
elif not is_valid_url(product_url):
st.error("Please enter a valid URL")
else:
db.add_product(product_url)
with st.spinner("Added product to database. Scraping product data..."):
product_data = scrape_product(product_url)
db.add_price(product_data)
st.success("Product is now being tracked!")
# Main content
st.title("Price Tracker Dashboard")
st.markdown("## Tracked Products")
# Get all products and their price histories
products = db.get_all_products()
# Create a card for each product
for product in products:
price_history = db.get_price_history(product.url)
if price_history:
# Create DataFrame for plotting
df = pd.DataFrame(
[
{"timestamp": ph.timestamp, "price": ph.price, "name": ph.name}
for ph in price_history
]
)
# Create a card-like container for each product
with st.expander(df["name"][0], expanded=False):
st.markdown("---")
col1, col2 = st.columns([1, 3])
with col1:
if price_history[0].main_image_url:
st.image(price_history[0].main_image_url, width=200)
st.metric(
label="Current Price",
value=f"{price_history[0].price} {price_history[0].currency}",
)
with col2:
# Create price history plot
fig = px.line(
df,
x="timestamp",
y="price",
title=None,
)
fig.update_layout(
xaxis_title=None,
yaxis_title="Price ($)",
showlegend=False,
margin=dict(l=0, r=0, t=0, b=0),
height=300,
)
fig.update_xaxes(tickformat="%Y-%m-%d %H:%M", tickangle=45)
fig.update_yaxes(tickprefix="$", tickformat=".2f")
st.plotly_chart(fig, use_container_width=True)

View File

@ -0,0 +1,28 @@
from urllib.parse import urlparse
import re
def is_valid_url(url: str) -> bool:
try:
# Parse the URL
result = urlparse(url)
# Check if scheme and netloc are present
if not all([result.scheme, result.netloc]):
return False
# Check if scheme is http or https
if result.scheme not in ["http", "https"]:
return False
# Basic regex pattern for domain validation
domain_pattern = (
r"^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z]{2,})+$"
)
if not re.match(domain_pattern, result.netloc):
return False
return True
except Exception:
return False

Binary file not shown.

After

Width:  |  Height:  |  Size: 130 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 201 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 254 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 249 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 101 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 262 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 159 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 88 KiB

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

Binary file not shown.

After

Width:  |  Height:  |  Size: 410 KiB

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff