135 lines
4.3 KiB
Python

from sqlalchemy import create_engine, Column, String, Float, DateTime, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship, declarative_base
from datetime import datetime
Base = declarative_base()
class Product(Base):
__tablename__ = "products"
url = Column(String, primary_key=True)
prices = relationship(
"PriceHistory", back_populates="product", cascade="all, delete-orphan"
)
class PriceHistory(Base):
__tablename__ = "price_histories"
id = Column(String, primary_key=True)
product_url = Column(String, ForeignKey("products.url"))
name = Column(String, nullable=False)
price = Column(Float, nullable=False)
currency = Column(String, nullable=False)
main_image_url = Column(String)
timestamp = Column(DateTime, nullable=False)
product = relationship("Product", back_populates="prices")
class Database:
def __init__(self, connection_string):
self.engine = create_engine(connection_string)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
def add_product(self, url):
session = self.Session()
try:
# Create the product entry
product = Product(url=url)
session.merge(product) # merge will update if exists, insert if not
session.commit()
finally:
session.close()
def product_exists(self, url):
session = self.Session()
try:
return session.query(Product).filter(Product.url == url).first() is not None
finally:
session.close()
def add_price(self, product_data):
session = self.Session()
try:
# First ensure the product exists
if not self.product_exists(product_data["url"]):
# Create the product if it doesn't exist
product = Product(url=product_data["url"])
session.add(product)
session.flush() # Flush to ensure the product is created before adding price
# Convert timestamp string to datetime if it's a string
timestamp = product_data["timestamp"]
if isinstance(timestamp, str):
timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H-%M")
price_history = PriceHistory(
id=f"{product_data['url']}_{timestamp.strftime('%Y%m%d%H%M%S')}",
product_url=product_data["url"],
name=product_data["name"],
price=product_data["price"],
currency=product_data["currency"],
main_image_url=product_data["main_image_url"],
timestamp=timestamp,
)
session.add(price_history)
session.commit()
finally:
session.close()
def get_all_products(self):
session = self.Session()
try:
return session.query(Product).all()
finally:
session.close()
def get_price_history(self, url):
"""Get price history for a product"""
session = self.Session()
try:
return (
session.query(PriceHistory)
.filter(PriceHistory.product_url == url)
.order_by(PriceHistory.timestamp.desc())
.all()
)
finally:
session.close()
def remove_all_products(self):
session = self.Session()
try:
# First delete all price histories
session.query(PriceHistory).delete()
# Then delete all products
session.query(Product).delete()
session.commit()
finally:
session.close()
# def remove_product(self, url):
# """Remove a product and its price history"""
# session = self.Session()
# try:
# product = session.query(Product).filter(Product.url == url).first()
# if product:
# session.delete(
# product
# ) # This will also delete associated price history due to cascade
# session.commit()
# finally:
# session.close()
if __name__ == "__main__":
from dotenv import load_dotenv
import os
load_dotenv()
db = Database(os.getenv("POSTGRES_URL"))
db.remove_all_products()