📝 Add source examples for Python 3.9 and 3.10 (#715)

* 📝 Add source examples for Python 3.9 and 3.10

*  Add tests for new source examples for Python 3.9 and 3.10, still needs pytest markers

*  Add tests for fastapi examples

*  Update tests for FastAPI app testing, for Python 3.9 and 3.10, fixing multi-app testing conflicts

*  Require Python 3.9 and 3.10 for tests

*  Update tests with missing markers
This commit is contained in:
Sebastián Ramírez 2023-11-29 16:51:55 +01:00 committed by GitHub
parent cce30d7546
commit d8effcbc5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
243 changed files with 20057 additions and 80 deletions

View File

@ -0,0 +1,59 @@
from pydantic import condecimal
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
money: condecimal(max_digits=5, decimal_places=3) = Field(default=0)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson", money=1.1)
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador", money=0.001)
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48, money=2.2)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.commit()
def select_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.name == "Deadpond")
results = session.exec(statement)
hero_1 = results.one()
print("Hero 1:", hero_1)
statement = select(Hero).where(Hero.name == "Rusty-Man")
results = session.exec(statement)
hero_2 = results.one()
print("Hero 2:", hero_2)
total_money = hero_1.money + hero_2.money
print(f"Total money: {total_money}")
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,79 @@
from sqlmodel import Field, Session, SQLModel, create_engine
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str
secret_name: str
age: int | None = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
print("Before interacting with the database")
print("Hero 1:", hero_1)
print("Hero 2:", hero_2)
print("Hero 3:", hero_3)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
print("After adding to the session")
print("Hero 1:", hero_1)
print("Hero 2:", hero_2)
print("Hero 3:", hero_3)
session.commit()
print("After committing the session")
print("Hero 1:", hero_1)
print("Hero 2:", hero_2)
print("Hero 3:", hero_3)
print("After committing the session, show IDs")
print("Hero 1 ID:", hero_1.id)
print("Hero 2 ID:", hero_2.id)
print("Hero 3 ID:", hero_3.id)
print("After committing the session, show names")
print("Hero 1 name:", hero_1.name)
print("Hero 2 name:", hero_2.name)
print("Hero 3 name:", hero_3.name)
session.refresh(hero_1)
session.refresh(hero_2)
session.refresh(hero_3)
print("After refreshing the heroes")
print("Hero 1:", hero_1)
print("Hero 2:", hero_2)
print("Hero 3:", hero_3)
print("After the session closes")
print("Hero 1:", hero_1)
print("Hero 2:", hero_2)
print("Hero 3:", hero_3)
def main():
create_db_and_tables()
create_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,80 @@
from sqlmodel import Field, Session, SQLModel, create_engine
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str
secret_name: str
age: int | None = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson") # (1)!
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador") # (2)!
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48) # (3)!
print("Before interacting with the database") # (4)!
print("Hero 1:", hero_1) # (5)!
print("Hero 2:", hero_2) # (6)!
print("Hero 3:", hero_3) # (7)!
with Session(engine) as session: # (8)!
session.add(hero_1) # (9)!
session.add(hero_2) # (10)!
session.add(hero_3) # (11)!
print("After adding to the session") # (12)!
print("Hero 1:", hero_1) # (13)!
print("Hero 2:", hero_2) # (14)!
print("Hero 3:", hero_3) # (15)!
session.commit() # (16)!
print("After committing the session") # (17)!
print("Hero 1:", hero_1) # (18)!
print("Hero 2:", hero_2) # (19)!
print("Hero 3:", hero_3) # (20)!
print("After committing the session, show IDs") # (21)!
print("Hero 1 ID:", hero_1.id) # (22)!
print("Hero 2 ID:", hero_2.id) # (23)!
print("Hero 3 ID:", hero_3.id) # (24)!
print("After committing the session, show names") # (25)!
print("Hero 1 name:", hero_1.name) # (26)!
print("Hero 2 name:", hero_2.name) # (27)!
print("Hero 3 name:", hero_3.name) # (28)!
session.refresh(hero_1) # (29)!
session.refresh(hero_2) # (30)!
session.refresh(hero_3) # (31)!
print("After refreshing the heroes") # (32)!
print("Hero 1:", hero_1) # (33)!
print("Hero 2:", hero_2) # (34)!
print("Hero 3:", hero_3) # (35)!
# (36)!
print("After the session closes") # (37)!
print("Hero 1:", hero_1) # (38)!
print("Hero 2:", hero_2) # (39)!
print("Hero 3:", hero_3) # (40)!
def main():
create_db_and_tables()
create_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,29 @@
from sqlmodel import Session
from .database import create_db_and_tables, engine
from .models import Hero, Team
def create_heroes():
with Session(engine) as session:
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
hero_deadpond = Hero(
name="Deadpond", secret_name="Dive Wilson", team=team_z_force
)
session.add(hero_deadpond)
session.commit()
session.refresh(hero_deadpond)
print("Created hero:", hero_deadpond)
print("Hero's team:", hero_deadpond.team)
def main():
create_db_and_tables()
create_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,10 @@
from sqlmodel import SQLModel, create_engine
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)

View File

@ -0,0 +1,19 @@
from sqlmodel import Field, Relationship, SQLModel
class Team(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
heroes: list["Hero"] = Relationship(back_populates="team")
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
team_id: int | None = Field(default=None, foreign_key="team.id")
team: Team | None = Relationship(back_populates="heroes")

View File

@ -0,0 +1,29 @@
from sqlmodel import Session
from .database import create_db_and_tables, engine
from .models import Hero, Team
def create_heroes():
with Session(engine) as session:
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
hero_deadpond = Hero(
name="Deadpond", secret_name="Dive Wilson", team=team_z_force
)
session.add(hero_deadpond)
session.commit()
session.refresh(hero_deadpond)
print("Created hero:", hero_deadpond)
print("Hero's team:", hero_deadpond.team)
def main():
create_db_and_tables()
create_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,10 @@
from sqlmodel import SQLModel, create_engine
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)

View File

@ -0,0 +1,21 @@
from typing import Optional
from sqlmodel import Field, Relationship, SQLModel
class Team(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
heroes: list["Hero"] = Relationship(back_populates="team")
class Hero(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
team_id: Optional[int] = Field(default=None, foreign_key="team.id")
team: Optional[Team] = Relationship(back_populates="heroes")

View File

@ -0,0 +1,30 @@
from sqlmodel import Session
from .database import create_db_and_tables, engine
from .hero_model import Hero
from .team_model import Team
def create_heroes():
with Session(engine) as session:
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
hero_deadpond = Hero(
name="Deadpond", secret_name="Dive Wilson", team=team_z_force
)
session.add(hero_deadpond)
session.commit()
session.refresh(hero_deadpond)
print("Created hero:", hero_deadpond)
print("Hero's team:", hero_deadpond.team)
def main():
create_db_and_tables()
create_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,10 @@
from sqlmodel import SQLModel, create_engine
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)

View File

@ -0,0 +1,16 @@
from typing import TYPE_CHECKING, Optional
from sqlmodel import Field, Relationship, SQLModel
if TYPE_CHECKING:
from .team_model import Team
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
team_id: int | None = Field(default=None, foreign_key="team.id")
team: Optional["Team"] = Relationship(back_populates="heroes")

View File

@ -0,0 +1,14 @@
from typing import TYPE_CHECKING
from sqlmodel import Field, Relationship, SQLModel
if TYPE_CHECKING:
from .hero_model import Hero
class Team(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
heroes: list["Hero"] = Relationship(back_populates="team")

View File

@ -0,0 +1,30 @@
from sqlmodel import Session
from .database import create_db_and_tables, engine
from .hero_model import Hero
from .team_model import Team
def create_heroes():
with Session(engine) as session:
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
hero_deadpond = Hero(
name="Deadpond", secret_name="Dive Wilson", team=team_z_force
)
session.add(hero_deadpond)
session.commit()
session.refresh(hero_deadpond)
print("Created hero:", hero_deadpond)
print("Hero's team:", hero_deadpond.team)
def main():
create_db_and_tables()
create_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,10 @@
from sqlmodel import SQLModel, create_engine
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)

View File

@ -0,0 +1,16 @@
from typing import TYPE_CHECKING, Optional
from sqlmodel import Field, Relationship, SQLModel
if TYPE_CHECKING:
from .team_model import Team
class Hero(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
team_id: Optional[int] = Field(default=None, foreign_key="team.id")
team: Optional["Team"] = Relationship(back_populates="heroes")

View File

@ -0,0 +1,14 @@
from typing import TYPE_CHECKING, Optional
from sqlmodel import Field, Relationship, SQLModel
if TYPE_CHECKING:
from .hero_model import Hero
class Team(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
heroes: list["Hero"] = Relationship(back_populates="team")

View File

@ -0,0 +1,34 @@
from sqlmodel import Field, SQLModel, create_engine
class Team(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
team_id: int | None = Field(default=None, foreign_key="team.id")
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def main():
create_db_and_tables()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,79 @@
from sqlmodel import Field, Session, SQLModel, create_engine
class Team(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
team_id: int | None = Field(default=None, foreign_key="team.id")
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
with Session(engine) as session:
team_preventers = Team(name="Preventers", headquarters="Sharp Tower")
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
session.add(team_preventers)
session.add(team_z_force)
session.commit()
hero_deadpond = Hero(
name="Deadpond", secret_name="Dive Wilson", team_id=team_z_force.id
)
hero_rusty_man = Hero(
name="Rusty-Man",
secret_name="Tommy Sharp",
age=48,
team_id=team_preventers.id,
)
hero_spider_boy = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
session.add(hero_deadpond)
session.add(hero_rusty_man)
session.add(hero_spider_boy)
session.commit()
session.refresh(hero_deadpond)
session.refresh(hero_rusty_man)
session.refresh(hero_spider_boy)
print("Created hero:", hero_deadpond)
print("Created hero:", hero_rusty_man)
print("Created hero:", hero_spider_boy)
hero_spider_boy.team_id = team_preventers.id
session.add(hero_spider_boy)
session.commit()
session.refresh(hero_spider_boy)
print("Updated hero:", hero_spider_boy)
hero_spider_boy.team_id = None
session.add(hero_spider_boy)
session.commit()
session.refresh(hero_spider_boy)
print("No longer Preventer:", hero_spider_boy)
def main():
create_db_and_tables()
create_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,67 @@
from sqlmodel import Field, Session, SQLModel, create_engine
class Team(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
team_id: int | None = Field(default=None, foreign_key="team.id")
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
with Session(engine) as session:
team_preventers = Team(name="Preventers", headquarters="Sharp Tower")
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
session.add(team_preventers)
session.add(team_z_force)
session.commit()
hero_deadpond = Hero(
name="Deadpond", secret_name="Dive Wilson", team_id=team_z_force.id
)
hero_rusty_man = Hero(
name="Rusty-Man",
secret_name="Tommy Sharp",
age=48,
team_id=team_preventers.id,
)
hero_spider_boy = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
session.add(hero_deadpond)
session.add(hero_rusty_man)
session.add(hero_spider_boy)
session.commit()
session.refresh(hero_deadpond)
session.refresh(hero_rusty_man)
session.refresh(hero_spider_boy)
print("Created hero:", hero_deadpond)
print("Created hero:", hero_rusty_man)
print("Created hero:", hero_spider_boy)
def main():
create_db_and_tables()
create_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,76 @@
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Team(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
team_id: int | None = Field(default=None, foreign_key="team.id")
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
with Session(engine) as session:
team_preventers = Team(name="Preventers", headquarters="Sharp Tower")
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
session.add(team_preventers)
session.add(team_z_force)
session.commit()
hero_deadpond = Hero(
name="Deadpond", secret_name="Dive Wilson", team_id=team_z_force.id
)
hero_rusty_man = Hero(
name="Rusty-Man",
secret_name="Tommy Sharp",
age=48,
team_id=team_preventers.id,
)
hero_spider_boy = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
session.add(hero_deadpond)
session.add(hero_rusty_man)
session.add(hero_spider_boy)
session.commit()
session.refresh(hero_deadpond)
session.refresh(hero_rusty_man)
session.refresh(hero_spider_boy)
print("Created hero:", hero_deadpond)
print("Created hero:", hero_rusty_man)
print("Created hero:", hero_spider_boy)
def select_heroes():
with Session(engine) as session:
statement = select(Hero, Team).where(Hero.team_id == Team.id)
results = session.exec(statement)
for hero, team in results:
print("Hero:", hero, "Team:", team)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,76 @@
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Team(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
team_id: int | None = Field(default=None, foreign_key="team.id")
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
with Session(engine) as session:
team_preventers = Team(name="Preventers", headquarters="Sharp Tower")
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
session.add(team_preventers)
session.add(team_z_force)
session.commit()
hero_deadpond = Hero(
name="Deadpond", secret_name="Dive Wilson", team_id=team_z_force.id
)
hero_rusty_man = Hero(
name="Rusty-Man",
secret_name="Tommy Sharp",
age=48,
team_id=team_preventers.id,
)
hero_spider_boy = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
session.add(hero_deadpond)
session.add(hero_rusty_man)
session.add(hero_spider_boy)
session.commit()
session.refresh(hero_deadpond)
session.refresh(hero_rusty_man)
session.refresh(hero_spider_boy)
print("Created hero:", hero_deadpond)
print("Created hero:", hero_rusty_man)
print("Created hero:", hero_spider_boy)
def select_heroes():
with Session(engine) as session:
statement = select(Hero, Team).join(Team)
results = session.exec(statement)
for hero, team in results:
print("Hero:", hero, "Team:", team)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,76 @@
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Team(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
team_id: int | None = Field(default=None, foreign_key="team.id")
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
with Session(engine) as session:
team_preventers = Team(name="Preventers", headquarters="Sharp Tower")
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
session.add(team_preventers)
session.add(team_z_force)
session.commit()
hero_deadpond = Hero(
name="Deadpond", secret_name="Dive Wilson", team_id=team_z_force.id
)
hero_rusty_man = Hero(
name="Rusty-Man",
secret_name="Tommy Sharp",
age=48,
team_id=team_preventers.id,
)
hero_spider_boy = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
session.add(hero_deadpond)
session.add(hero_rusty_man)
session.add(hero_spider_boy)
session.commit()
session.refresh(hero_deadpond)
session.refresh(hero_rusty_man)
session.refresh(hero_spider_boy)
print("Created hero:", hero_deadpond)
print("Created hero:", hero_rusty_man)
print("Created hero:", hero_spider_boy)
def select_heroes():
with Session(engine) as session:
statement = select(Hero, Team).join(Team, isouter=True)
results = session.exec(statement)
for hero, team in results:
print("Hero:", hero, "Team:", team)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,76 @@
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Team(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
team_id: int | None = Field(default=None, foreign_key="team.id")
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
with Session(engine) as session:
team_preventers = Team(name="Preventers", headquarters="Sharp Tower")
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
session.add(team_preventers)
session.add(team_z_force)
session.commit()
hero_deadpond = Hero(
name="Deadpond", secret_name="Dive Wilson", team_id=team_z_force.id
)
hero_rusty_man = Hero(
name="Rusty-Man",
secret_name="Tommy Sharp",
age=48,
team_id=team_preventers.id,
)
hero_spider_boy = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
session.add(hero_deadpond)
session.add(hero_rusty_man)
session.add(hero_spider_boy)
session.commit()
session.refresh(hero_deadpond)
session.refresh(hero_rusty_man)
session.refresh(hero_spider_boy)
print("Created hero:", hero_deadpond)
print("Created hero:", hero_rusty_man)
print("Created hero:", hero_spider_boy)
def select_heroes():
with Session(engine) as session:
statement = select(Hero).join(Team).where(Team.name == "Preventers")
results = session.exec(statement)
for hero in results:
print("Preventer Hero:", hero)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,76 @@
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Team(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
team_id: int | None = Field(default=None, foreign_key="team.id")
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
with Session(engine) as session:
team_preventers = Team(name="Preventers", headquarters="Sharp Tower")
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
session.add(team_preventers)
session.add(team_z_force)
session.commit()
hero_deadpond = Hero(
name="Deadpond", secret_name="Dive Wilson", team_id=team_z_force.id
)
hero_rusty_man = Hero(
name="Rusty-Man",
secret_name="Tommy Sharp",
age=48,
team_id=team_preventers.id,
)
hero_spider_boy = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
session.add(hero_deadpond)
session.add(hero_rusty_man)
session.add(hero_spider_boy)
session.commit()
session.refresh(hero_deadpond)
session.refresh(hero_rusty_man)
session.refresh(hero_spider_boy)
print("Created hero:", hero_deadpond)
print("Created hero:", hero_rusty_man)
print("Created hero:", hero_spider_boy)
def select_heroes():
with Session(engine) as session:
statement = select(Hero, Team).join(Team).where(Team.name == "Preventers")
results = session.exec(statement)
for hero, team in results:
print("Preventer Hero:", hero, "Team:", team)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,73 @@
from sqlmodel import Field, Session, SQLModel, create_engine
class Team(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
team_id: int | None = Field(default=None, foreign_key="team.id")
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
with Session(engine) as session:
team_preventers = Team(name="Preventers", headquarters="Sharp Tower")
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
session.add(team_preventers)
session.add(team_z_force)
session.commit()
hero_deadpond = Hero(
name="Deadpond", secret_name="Dive Wilson", team_id=team_z_force.id
)
hero_rusty_man = Hero(
name="Rusty-Man",
secret_name="Tommy Sharp",
age=48,
team_id=team_preventers.id,
)
hero_spider_boy = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
session.add(hero_deadpond)
session.add(hero_rusty_man)
session.add(hero_spider_boy)
session.commit()
session.refresh(hero_deadpond)
session.refresh(hero_rusty_man)
session.refresh(hero_spider_boy)
print("Created hero:", hero_deadpond)
print("Created hero:", hero_rusty_man)
print("Created hero:", hero_spider_boy)
hero_spider_boy.team_id = team_preventers.id
session.add(hero_spider_boy)
session.commit()
session.refresh(hero_spider_boy)
print("Updated hero:", hero_spider_boy)
def main():
create_db_and_tables()
create_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,16 @@
from sqlmodel import Field, SQLModel, create_engine
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str
secret_name: str
age: int | None = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
SQLModel.metadata.create_all(engine)

View File

@ -0,0 +1,22 @@
from sqlmodel import Field, SQLModel, create_engine
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str
secret_name: str
age: int | None = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
if __name__ == "__main__":
create_db_and_tables()

View File

@ -0,0 +1,22 @@
from sqlmodel import Field, SQLModel, create_engine # (2)!
class Hero(SQLModel, table=True): # (3)!
id: int | None = Field(default=None, primary_key=True) # (4)!
name: str # (5)!
secret_name: str # (6)!
age: int | None = None # (7)!
sqlite_file_name = "database.db" # (8)!
sqlite_url = f"sqlite:///{sqlite_file_name}" # (9)!
engine = create_engine(sqlite_url, echo=True) # (10)!
def create_db_and_tables(): # (11)!
SQLModel.metadata.create_all(engine) # (12)!
if __name__ == "__main__": # (13)!
create_db_and_tables() # (14)!

View File

@ -0,0 +1,98 @@
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.add(hero_4)
session.add(hero_5)
session.add(hero_6)
session.add(hero_7)
session.commit()
def update_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.name == "Spider-Boy")
results = session.exec(statement)
hero_1 = results.one()
print("Hero 1:", hero_1)
statement = select(Hero).where(Hero.name == "Captain North America")
results = session.exec(statement)
hero_2 = results.one()
print("Hero 2:", hero_2)
hero_1.age = 16
hero_1.name = "Spider-Youngster"
session.add(hero_1)
hero_2.name = "Captain North America Except Canada"
hero_2.age = 110
session.add(hero_2)
session.commit()
session.refresh(hero_1)
session.refresh(hero_2)
print("Updated hero 1:", hero_1)
print("Updated hero 2:", hero_2)
def delete_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.name == "Spider-Youngster")
results = session.exec(statement)
hero = results.one()
print("Hero: ", hero)
session.delete(hero)
session.commit()
print("Deleted hero:", hero)
statement = select(Hero).where(Hero.name == "Spider-Youngster")
results = session.exec(statement)
hero = results.first()
if hero is None:
print("There's no hero named Spider-Youngster")
def main():
create_db_and_tables()
create_heroes()
update_heroes()
delete_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,99 @@
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.add(hero_4)
session.add(hero_5)
session.add(hero_6)
session.add(hero_7)
session.commit()
def update_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.name == "Spider-Boy")
results = session.exec(statement)
hero_1 = results.one()
print("Hero 1:", hero_1)
statement = select(Hero).where(Hero.name == "Captain North America")
results = session.exec(statement)
hero_2 = results.one()
print("Hero 2:", hero_2)
hero_1.age = 16
hero_1.name = "Spider-Youngster"
session.add(hero_1)
hero_2.name = "Captain North America Except Canada"
hero_2.age = 110
session.add(hero_2)
session.commit()
session.refresh(hero_1)
session.refresh(hero_2)
print("Updated hero 1:", hero_1)
print("Updated hero 2:", hero_2)
def delete_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.name == "Spider-Youngster") # (1)!
results = session.exec(statement) # (2)!
hero = results.one() # (3)!
print("Hero: ", hero) # (4)!
session.delete(hero) # (5)!
session.commit() # (6)!
print("Deleted hero:", hero) # (7)!
statement = select(Hero).where(Hero.name == "Spider-Youngster") # (8)!
results = session.exec(statement) # (9)!
hero = results.first() # (10)!
if hero is None: # (11)!
print("There's no hero named Spider-Youngster") # (12)!
# (13)!
def main():
create_db_and_tables()
create_heroes()
update_heroes()
delete_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,38 @@
from fastapi.testclient import TestClient
from sqlalchemy import Inspector, inspect
from sqlmodel import Session, create_engine
from . import main as app_mod
from .test_main import client_fixture, session_fixture
assert client_fixture, "This keeps the client fixture used below"
assert session_fixture, "This keeps the session fixture used by client_fixture"
def test_startup():
app_mod.engine = create_engine("sqlite://")
app_mod.on_startup()
insp: Inspector = inspect(app_mod.engine)
assert insp.has_table(str(app_mod.Hero.__tablename__))
def test_get_session():
app_mod.engine = create_engine("sqlite://")
for session in app_mod.get_session():
assert isinstance(session, Session)
assert session.bind == app_mod.engine
def test_read_hero_not_found(client: TestClient):
response = client.get("/heroes/9000")
assert response.status_code == 404
def test_update_hero_not_found(client: TestClient):
response = client.patch("/heroes/9000", json={"name": "Very-Rusty-Man"})
assert response.status_code == 404
def test_delete_hero_not_found(client: TestClient):
response = client.delete("/heroes/9000")
assert response.status_code == 404

View File

@ -0,0 +1,17 @@
1. Import the `app` from the the `main` module.
2. We create a `TestClient` for the FastAPI `app` and put it in the variable `client`.
3. Then we use use this `client` to **talk to the API** and send a `POST` HTTP operation, creating a new hero.
4. Then we get the **JSON data** from the response and put it in the variable `data`.
5. Next we start testing the results with `assert` statements, we check that the status code of the response is `200`.
6. We check that the `name` of the hero created is `"Deadpond"`.
7. We check that the `secret_name` of the hero created is `"Dive Wilson"`.
8. We check that the `age` of the hero created is `None`, because we didn't send an age.
9. We check that the hero created has an `id` created by the database, so it's not `None`.

View File

@ -0,0 +1,25 @@
1. Import the `get_session` dependency from the the `main` module.
2. Define the new function that will be the new **dependency override**.
3. This function will return a different **session** than the one that would be returned by the original `get_session` function.
We haven't seen how this new **session** object is created yet, but the point is that this is a different session than the original one from the app.
This session is attached to a different **engine**, and that different **engine** uses a different URL, for a database just for testing.
We haven't defined that new **URL** nor the new **engine** yet, but here we already see the that this object `session` will override the one returned by the original dependency `get_session()`.
4. Then, the FastAPI `app` object has an attribute `app.dependency_overrides`.
This attribute is a dictionary, and we can put dependency overrides in it by passing, as the **key**, the **original dependency function**, and as the **value**, the **new overriding dependency function**.
So, here we are telling the FastAPI app to use `get_session_override` instead of `get_session` in all the places in the code that depend on `get_session`, that is, all the parameters with something like:
```Python
session: Session = Depends(get_session)
```
5. After we are done with the dependency override, we can restore the application back to normal, by removing all the values in this dictionary `app.dependency_overrides`.
This way whenever a *path operation function* needs the dependency FastAPI will use the original one instead of the override.

View File

@ -0,0 +1,37 @@
1. Here's a subtle thing to notice.
Remember that [Order Matters](../create-db-and-table.md#sqlmodel-metadata-order-matters){.internal-link target=_blank} and we need to make sure all the **SQLModel** models are already defined and **imported** before calling `.create_all()`.
IN this line, by importing something, *anything*, from `.main`, the code in `.main` will be executed, including the definition of the **table models**, and that will automatically register them in `SQLModel.metadata`.
2. Here we create a new **engine**, completely different from the one in `main.py`.
This is the engine we will use for the tests.
We use the new URL of the database for tests:
```
sqlite:///testing.db
```
And again, we use the connection argument `check_same_thread=False`.
3. Then we call:
```Python
SQLModel.metadata.create_all(engine)
```
...to make sure we create all the tables in the new testing database.
The **table models** are registered in `SQLModel.metadata` just because we imported *something* from `.main`, and the code in `.main` was executed, creating the classes for the **table models** and automatically registering them in `SQLModel.metadata`.
So, by the point we call this method, the **table models** are already registered there. 💯
4. Here's where we create the custom **session** object for this test in a `with` block.
It uses the new custom **engine** we created, so anything that uses this session will be using the testing database.
5. Now, back to the dependency override, it is just returning the same **session** object from outside, that's it, that's the whole trick.
6. By this point, the testing **session** `with` block finishes, and the session is closed, the file is closed, etc.

View File

@ -0,0 +1,26 @@
1. Import `StaticPool` from `sqlmodel`, we will use it in a bit.
2. For the **SQLite URL**, don't write any file name, leave it empty.
So, instead of:
```
sqlite:///testing.db
```
...just write:
```
sqlite://
```
This is enough to tell **SQLModel** (actually SQLAlchemy) that we want to use an **in-memory SQLite database**.
3. Remember that we told the **low-level** library in charge of communicating with SQLite that we want to be able to **access the database from different threads** with `check_same_thread=False`?
Now that we use an **in-memory database**, we need to also tell SQLAlchemy that we want to be able to use the **same in-memory database** object from different threads.
We tell it that with the `poolclass=StaticPool` parameter.
!!! info
You can read more details in the <a href="https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#using-a-memory-database-in-multiple-threads" class="external-link" target="_blank">SQLAlchemy documentation about Using a Memory Database in Multiple Threads</a>

View File

@ -0,0 +1,41 @@
1. Import `pytest`.
2. Use the `@pytest.fixture()` decorator on top of the function to tell pytest that this is a **fixture** function (equivalent to a FastAPI dependency).
We also give it a name of `"session"`, this will be important in the testing function.
3. Create the fixture function. This is equivalent to a FastAPI dependency function.
In this fixture we create the custom **engine**, with the in-memory database, we create the tables, and we create the **session**.
Then we `yield` the `session` object.
4. The thing that we `return` or `yield` is what will be available to the test function, in this case, the `session` object.
Here we use `yield` so that **pytest** comes back to execute "the rest of the code" in this function once the testing function is done.
We don't have any more visible "rest of the code" after the `yield`, but we have the end of the `with` block that will close the **session**.
By using `yield`, pytest will:
* run the first part
* create the **session** object
* give it to the test function
* run the test function
* once the test function is done, it will continue here, right after the `yield`, and will correctly close the **session** object in the end of the `with` block.
5. Now, in the test function, to tell **pytest** that this test wants to get the fixture, instead of declaring something like in FastAPI with:
```Python
session: Session = Depends(session_fixture)
```
...the way we tell pytest what is the fixture that we want is by using the **exact same name** of the fixture.
In this case, we named it `session`, so the parameter has to be exactly named `session` for it to work.
We also add the type annotation `session: Session` so that we can get autocompletion and inline error checks in our editor.
6. Now in the dependency override function, we just return the same `session` object that came from outside it.
The `session` object comes from the parameter passed to the test function, and we just re-use it and return it here in the dependency override.

View File

@ -0,0 +1,23 @@
1. Create the new fixture named `"client"`.
2. This **client fixture**, in turn, also requires the **session fixture**.
3. Now we create the **dependency override** inside the client fixture.
4. Set the **dependency override** in the `app.dependency_overrides` dictionary.
5. Create the `TestClient` with the **FastAPI** `app`.
6. `yield` the `TestClient` instance.
By using `yield`, after the test function is done, pytest will come back to execute the rest of the code after `yield`.
7. This is the cleanup code, after `yield`, and after the test function is done.
Here we clear the dependency overrides (here it's only one) in the FastAPI `app`.
8. Now the test function requires the **client fixture**.
And inside the test function, the code is quite **simple**, we just use the `TestClient` to make requests to the API, check the data, and that's it.
The fixtures take care of all the **setup** and **cleanup** code.

View File

@ -0,0 +1,104 @@
from fastapi import Depends, FastAPI, HTTPException, Query
from sqlmodel import Field, Session, SQLModel, create_engine, select
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
class Hero(HeroBase, table=True):
id: int | None = Field(default=None, primary_key=True)
class HeroCreate(HeroBase):
pass
class HeroRead(HeroBase):
id: int
class HeroUpdate(SQLModel):
name: str | None = None
secret_name: str | None = None
age: int | None = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def get_session():
with Session(engine) as session:
yield session
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(*, session: Session = Depends(get_session), hero: HeroCreate):
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes(
*,
session: Session = Depends(get_session),
offset: int = 0,
limit: int = Query(default=100, le=100),
):
heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
return heroes
@app.get("/heroes/{hero_id}", response_model=HeroRead)
def read_hero(*, session: Session = Depends(get_session), hero_id: int):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero
@app.patch("/heroes/{hero_id}", response_model=HeroRead)
def update_hero(
*, session: Session = Depends(get_session), hero_id: int, hero: HeroUpdate
):
db_hero = session.get(Hero, hero_id)
if not db_hero:
raise HTTPException(status_code=404, detail="Hero not found")
hero_data = hero.dict(exclude_unset=True)
for key, value in hero_data.items():
setattr(db_hero, key, value)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.delete("/heroes/{hero_id}")
def delete_hero(*, session: Session = Depends(get_session), hero_id: int):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
session.delete(hero)
session.commit()
return {"ok": True}

View File

@ -0,0 +1,38 @@
from fastapi.testclient import TestClient
from sqlalchemy import Inspector, inspect
from sqlmodel import Session, create_engine
from . import main as app_mod
from .test_main import client_fixture, session_fixture
assert client_fixture, "This keeps the client fixture used below"
assert session_fixture, "This keeps the session fixture used by client_fixture"
def test_startup():
app_mod.engine = create_engine("sqlite://")
app_mod.on_startup()
insp: Inspector = inspect(app_mod.engine)
assert insp.has_table(str(app_mod.Hero.__tablename__))
def test_get_session():
app_mod.engine = create_engine("sqlite://")
for session in app_mod.get_session():
assert isinstance(session, Session)
assert session.bind == app_mod.engine
def test_read_hero_not_found(client: TestClient):
response = client.get("/heroes/9000")
assert response.status_code == 404
def test_update_hero_not_found(client: TestClient):
response = client.patch("/heroes/9000", json={"name": "Very-Rusty-Man"})
assert response.status_code == 404
def test_delete_hero_not_found(client: TestClient):
response = client.delete("/heroes/9000")
assert response.status_code == 404

View File

@ -0,0 +1,125 @@
import pytest
from fastapi.testclient import TestClient
from sqlmodel import Session, SQLModel, create_engine
from sqlmodel.pool import StaticPool
from .main import Hero, app, get_session
@pytest.fixture(name="session")
def session_fixture():
engine = create_engine(
"sqlite://", connect_args={"check_same_thread": False}, poolclass=StaticPool
)
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
yield session
@pytest.fixture(name="client")
def client_fixture(session: Session):
def get_session_override():
return session
app.dependency_overrides[get_session] = get_session_override
client = TestClient(app)
yield client
app.dependency_overrides.clear()
def test_create_hero(client: TestClient):
response = client.post(
"/heroes/", json={"name": "Deadpond", "secret_name": "Dive Wilson"}
)
data = response.json()
assert response.status_code == 200
assert data["name"] == "Deadpond"
assert data["secret_name"] == "Dive Wilson"
assert data["age"] is None
assert data["id"] is not None
def test_create_hero_incomplete(client: TestClient):
# No secret_name
response = client.post("/heroes/", json={"name": "Deadpond"})
assert response.status_code == 422
def test_create_hero_invalid(client: TestClient):
# secret_name has an invalid type
response = client.post(
"/heroes/",
json={
"name": "Deadpond",
"secret_name": {"message": "Do you wanna know my secret identity?"},
},
)
assert response.status_code == 422
def test_read_heroes(session: Session, client: TestClient):
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
session.add(hero_1)
session.add(hero_2)
session.commit()
response = client.get("/heroes/")
data = response.json()
assert response.status_code == 200
assert len(data) == 2
assert data[0]["name"] == hero_1.name
assert data[0]["secret_name"] == hero_1.secret_name
assert data[0]["age"] == hero_1.age
assert data[0]["id"] == hero_1.id
assert data[1]["name"] == hero_2.name
assert data[1]["secret_name"] == hero_2.secret_name
assert data[1]["age"] == hero_2.age
assert data[1]["id"] == hero_2.id
def test_read_hero(session: Session, client: TestClient):
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
session.add(hero_1)
session.commit()
response = client.get(f"/heroes/{hero_1.id}")
data = response.json()
assert response.status_code == 200
assert data["name"] == hero_1.name
assert data["secret_name"] == hero_1.secret_name
assert data["age"] == hero_1.age
assert data["id"] == hero_1.id
def test_update_hero(session: Session, client: TestClient):
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
session.add(hero_1)
session.commit()
response = client.patch(f"/heroes/{hero_1.id}", json={"name": "Deadpuddle"})
data = response.json()
assert response.status_code == 200
assert data["name"] == "Deadpuddle"
assert data["secret_name"] == "Dive Wilson"
assert data["age"] is None
assert data["id"] == hero_1.id
def test_delete_hero(session: Session, client: TestClient):
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
session.add(hero_1)
session.commit()
response = client.delete(f"/heroes/{hero_1.id}")
hero_in_db = session.get(Hero, hero_1.id)
assert response.status_code == 200
assert hero_in_db is None

View File

@ -0,0 +1,32 @@
from fastapi.testclient import TestClient
from sqlmodel import Session, SQLModel, create_engine
from .main import app, get_session # (1)!
def test_create_hero():
engine = create_engine(
"sqlite:///testing.db", connect_args={"check_same_thread": False}
)
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
def get_session_override():
return session
app.dependency_overrides[get_session] = get_session_override
client = TestClient(app) # (2)!
response = client.post( # (3)!
"/heroes/", json={"name": "Deadpond", "secret_name": "Dive Wilson"}
)
app.dependency_overrides.clear()
data = response.json() # (4)!
assert response.status_code == 200 # (5)!
assert data["name"] == "Deadpond" # (6)!
assert data["secret_name"] == "Dive Wilson" # (7)!
assert data["age"] is None # (8)!
assert data["id"] is not None # (9)!

View File

@ -0,0 +1,32 @@
from fastapi.testclient import TestClient
from sqlmodel import Session, SQLModel, create_engine
from .main import app, get_session # (1)!
def test_create_hero():
engine = create_engine(
"sqlite:///testing.db", connect_args={"check_same_thread": False}
)
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
def get_session_override(): # (2)!
return session # (3)!
app.dependency_overrides[get_session] = get_session_override # (4)!
client = TestClient(app)
response = client.post(
"/heroes/", json={"name": "Deadpond", "secret_name": "Dive Wilson"}
)
app.dependency_overrides.clear() # (5)!
data = response.json()
assert response.status_code == 200
assert data["name"] == "Deadpond"
assert data["secret_name"] == "Dive Wilson"
assert data["age"] is None
assert data["id"] is not None

View File

@ -0,0 +1,33 @@
from fastapi.testclient import TestClient
from sqlmodel import Session, SQLModel, create_engine
from .main import app, get_session # (1)!
def test_create_hero():
engine = create_engine( # (2)!
"sqlite:///testing.db", connect_args={"check_same_thread": False}
)
SQLModel.metadata.create_all(engine) # (3)!
with Session(engine) as session: # (4)!
def get_session_override():
return session # (5)!
app.dependency_overrides[get_session] = get_session_override # (4)!
client = TestClient(app)
response = client.post(
"/heroes/", json={"name": "Deadpond", "secret_name": "Dive Wilson"}
)
app.dependency_overrides.clear()
data = response.json()
assert response.status_code == 200
assert data["name"] == "Deadpond"
assert data["secret_name"] == "Dive Wilson"
assert data["age"] is None
assert data["id"] is not None
# (6)!

View File

@ -0,0 +1,35 @@
from fastapi.testclient import TestClient
from sqlmodel import Session, SQLModel, create_engine
from sqlmodel.pool import StaticPool # (1)!
from .main import app, get_session
def test_create_hero():
engine = create_engine(
"sqlite://", # (2)!
connect_args={"check_same_thread": False},
poolclass=StaticPool, # (3)!
)
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
def get_session_override():
return session
app.dependency_overrides[get_session] = get_session_override
client = TestClient(app)
response = client.post(
"/heroes/", json={"name": "Deadpond", "secret_name": "Dive Wilson"}
)
app.dependency_overrides.clear()
data = response.json()
assert response.status_code == 200
assert data["name"] == "Deadpond"
assert data["secret_name"] == "Dive Wilson"
assert data["age"] is None
assert data["id"] is not None

View File

@ -0,0 +1,37 @@
import pytest # (1)!
from fastapi.testclient import TestClient
from sqlmodel import Session, SQLModel, create_engine
from sqlmodel.pool import StaticPool
from .main import app, get_session
@pytest.fixture(name="session") # (2)!
def session_fixture(): # (3)!
engine = create_engine(
"sqlite://", connect_args={"check_same_thread": False}, poolclass=StaticPool
)
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
yield session # (4)!
def test_create_hero(session: Session): # (5)!
def get_session_override():
return session # (6)!
app.dependency_overrides[get_session] = get_session_override
client = TestClient(app)
response = client.post(
"/heroes/", json={"name": "Deadpond", "secret_name": "Dive Wilson"}
)
app.dependency_overrides.clear()
data = response.json()
assert response.status_code == 200
assert data["name"] == "Deadpond"
assert data["secret_name"] == "Dive Wilson"
assert data["age"] is None
assert data["id"] is not None

View File

@ -0,0 +1,41 @@
import pytest
from fastapi.testclient import TestClient
from sqlmodel import Session, SQLModel, create_engine
from sqlmodel.pool import StaticPool
from .main import app, get_session
@pytest.fixture(name="session")
def session_fixture():
engine = create_engine(
"sqlite://", connect_args={"check_same_thread": False}, poolclass=StaticPool
)
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
yield session
@pytest.fixture(name="client") # (1)!
def client_fixture(session: Session): # (2)!
def get_session_override(): # (3)!
return session
app.dependency_overrides[get_session] = get_session_override # (4)!
client = TestClient(app) # (5)!
yield client # (6)!
app.dependency_overrides.clear() # (7)!
def test_create_hero(client: TestClient): # (8)!
response = client.post(
"/heroes/", json={"name": "Deadpond", "secret_name": "Dive Wilson"}
)
data = response.json()
assert response.status_code == 200
assert data["name"] == "Deadpond"
assert data["secret_name"] == "Dive Wilson"
assert data["age"] is None
assert data["id"] is not None

View File

@ -0,0 +1,17 @@
1. Import the `app` from the the `main` module.
2. We create a `TestClient` for the FastAPI `app` and put it in the variable `client`.
3. Then we use use this `client` to **talk to the API** and send a `POST` HTTP operation, creating a new hero.
4. Then we get the **JSON data** from the response and put it in the variable `data`.
5. Next we start testing the results with `assert` statements, we check that the status code of the response is `200`.
6. We check that the `name` of the hero created is `"Deadpond"`.
7. We check that the `secret_name` of the hero created is `"Dive Wilson"`.
8. We check that the `age` of the hero created is `None`, because we didn't send an age.
9. We check that the hero created has an `id` created by the database, so it's not `None`.

View File

@ -0,0 +1,25 @@
1. Import the `get_session` dependency from the the `main` module.
2. Define the new function that will be the new **dependency override**.
3. This function will return a different **session** than the one that would be returned by the original `get_session` function.
We haven't seen how this new **session** object is created yet, but the point is that this is a different session than the original one from the app.
This session is attached to a different **engine**, and that different **engine** uses a different URL, for a database just for testing.
We haven't defined that new **URL** nor the new **engine** yet, but here we already see the that this object `session` will override the one returned by the original dependency `get_session()`.
4. Then, the FastAPI `app` object has an attribute `app.dependency_overrides`.
This attribute is a dictionary, and we can put dependency overrides in it by passing, as the **key**, the **original dependency function**, and as the **value**, the **new overriding dependency function**.
So, here we are telling the FastAPI app to use `get_session_override` instead of `get_session` in all the places in the code that depend on `get_session`, that is, all the parameters with something like:
```Python
session: Session = Depends(get_session)
```
5. After we are done with the dependency override, we can restore the application back to normal, by removing all the values in this dictionary `app.dependency_overrides`.
This way whenever a *path operation function* needs the dependency FastAPI will use the original one instead of the override.

View File

@ -0,0 +1,37 @@
1. Here's a subtle thing to notice.
Remember that [Order Matters](../create-db-and-table.md#sqlmodel-metadata-order-matters){.internal-link target=_blank} and we need to make sure all the **SQLModel** models are already defined and **imported** before calling `.create_all()`.
IN this line, by importing something, *anything*, from `.main`, the code in `.main` will be executed, including the definition of the **table models**, and that will automatically register them in `SQLModel.metadata`.
2. Here we create a new **engine**, completely different from the one in `main.py`.
This is the engine we will use for the tests.
We use the new URL of the database for tests:
```
sqlite:///testing.db
```
And again, we use the connection argument `check_same_thread=False`.
3. Then we call:
```Python
SQLModel.metadata.create_all(engine)
```
...to make sure we create all the tables in the new testing database.
The **table models** are registered in `SQLModel.metadata` just because we imported *something* from `.main`, and the code in `.main` was executed, creating the classes for the **table models** and automatically registering them in `SQLModel.metadata`.
So, by the point we call this method, the **table models** are already registered there. 💯
4. Here's where we create the custom **session** object for this test in a `with` block.
It uses the new custom **engine** we created, so anything that uses this session will be using the testing database.
5. Now, back to the dependency override, it is just returning the same **session** object from outside, that's it, that's the whole trick.
6. By this point, the testing **session** `with` block finishes, and the session is closed, the file is closed, etc.

View File

@ -0,0 +1,26 @@
1. Import `StaticPool` from `sqlmodel`, we will use it in a bit.
2. For the **SQLite URL**, don't write any file name, leave it empty.
So, instead of:
```
sqlite:///testing.db
```
...just write:
```
sqlite://
```
This is enough to tell **SQLModel** (actually SQLAlchemy) that we want to use an **in-memory SQLite database**.
3. Remember that we told the **low-level** library in charge of communicating with SQLite that we want to be able to **access the database from different threads** with `check_same_thread=False`?
Now that we use an **in-memory database**, we need to also tell SQLAlchemy that we want to be able to use the **same in-memory database** object from different threads.
We tell it that with the `poolclass=StaticPool` parameter.
!!! info
You can read more details in the <a href="https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#using-a-memory-database-in-multiple-threads" class="external-link" target="_blank">SQLAlchemy documentation about Using a Memory Database in Multiple Threads</a>

View File

@ -0,0 +1,41 @@
1. Import `pytest`.
2. Use the `@pytest.fixture()` decorator on top of the function to tell pytest that this is a **fixture** function (equivalent to a FastAPI dependency).
We also give it a name of `"session"`, this will be important in the testing function.
3. Create the fixture function. This is equivalent to a FastAPI dependency function.
In this fixture we create the custom **engine**, with the in-memory database, we create the tables, and we create the **session**.
Then we `yield` the `session` object.
4. The thing that we `return` or `yield` is what will be available to the test function, in this case, the `session` object.
Here we use `yield` so that **pytest** comes back to execute "the rest of the code" in this function once the testing function is done.
We don't have any more visible "rest of the code" after the `yield`, but we have the end of the `with` block that will close the **session**.
By using `yield`, pytest will:
* run the first part
* create the **session** object
* give it to the test function
* run the test function
* once the test function is done, it will continue here, right after the `yield`, and will correctly close the **session** object in the end of the `with` block.
5. Now, in the test function, to tell **pytest** that this test wants to get the fixture, instead of declaring something like in FastAPI with:
```Python
session: Session = Depends(session_fixture)
```
...the way we tell pytest what is the fixture that we want is by using the **exact same name** of the fixture.
In this case, we named it `session`, so the parameter has to be exactly named `session` for it to work.
We also add the type annotation `session: Session` so that we can get autocompletion and inline error checks in our editor.
6. Now in the dependency override function, we just return the same `session` object that came from outside it.
The `session` object comes from the parameter passed to the test function, and we just re-use it and return it here in the dependency override.

View File

@ -0,0 +1,23 @@
1. Create the new fixture named `"client"`.
2. This **client fixture**, in turn, also requires the **session fixture**.
3. Now we create the **dependency override** inside the client fixture.
4. Set the **dependency override** in the `app.dependency_overrides` dictionary.
5. Create the `TestClient` with the **FastAPI** `app`.
6. `yield` the `TestClient` instance.
By using `yield`, after the test function is done, pytest will come back to execute the rest of the code after `yield`.
7. This is the cleanup code, after `yield`, and after the test function is done.
Here we clear the dependency overrides (here it's only one) in the FastAPI `app`.
8. Now the test function requires the **client fixture**.
And inside the test function, the code is quite **simple**, we just use the `TestClient` to make requests to the API, check the data, and that's it.
The fixtures take care of all the **setup** and **cleanup** code.

View File

@ -0,0 +1,106 @@
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, Query
from sqlmodel import Field, Session, SQLModel, create_engine, select
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
class Hero(HeroBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
class HeroCreate(HeroBase):
pass
class HeroRead(HeroBase):
id: int
class HeroUpdate(SQLModel):
name: Optional[str] = None
secret_name: Optional[str] = None
age: Optional[int] = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def get_session():
with Session(engine) as session:
yield session
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(*, session: Session = Depends(get_session), hero: HeroCreate):
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes(
*,
session: Session = Depends(get_session),
offset: int = 0,
limit: int = Query(default=100, le=100),
):
heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
return heroes
@app.get("/heroes/{hero_id}", response_model=HeroRead)
def read_hero(*, session: Session = Depends(get_session), hero_id: int):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero
@app.patch("/heroes/{hero_id}", response_model=HeroRead)
def update_hero(
*, session: Session = Depends(get_session), hero_id: int, hero: HeroUpdate
):
db_hero = session.get(Hero, hero_id)
if not db_hero:
raise HTTPException(status_code=404, detail="Hero not found")
hero_data = hero.dict(exclude_unset=True)
for key, value in hero_data.items():
setattr(db_hero, key, value)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.delete("/heroes/{hero_id}")
def delete_hero(*, session: Session = Depends(get_session), hero_id: int):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
session.delete(hero)
session.commit()
return {"ok": True}

View File

@ -0,0 +1,38 @@
from fastapi.testclient import TestClient
from sqlalchemy import Inspector, inspect
from sqlmodel import Session, create_engine
from . import main as app_mod
from .test_main import client_fixture, session_fixture
assert client_fixture, "This keeps the client fixture used below"
assert session_fixture, "This keeps the session fixture used by client_fixture"
def test_startup():
app_mod.engine = create_engine("sqlite://")
app_mod.on_startup()
insp: Inspector = inspect(app_mod.engine)
assert insp.has_table(str(app_mod.Hero.__tablename__))
def test_get_session():
app_mod.engine = create_engine("sqlite://")
for session in app_mod.get_session():
assert isinstance(session, Session)
assert session.bind == app_mod.engine
def test_read_hero_not_found(client: TestClient):
response = client.get("/heroes/9000")
assert response.status_code == 404
def test_update_hero_not_found(client: TestClient):
response = client.patch("/heroes/9000", json={"name": "Very-Rusty-Man"})
assert response.status_code == 404
def test_delete_hero_not_found(client: TestClient):
response = client.delete("/heroes/9000")
assert response.status_code == 404

View File

@ -0,0 +1,125 @@
import pytest
from fastapi.testclient import TestClient
from sqlmodel import Session, SQLModel, create_engine
from sqlmodel.pool import StaticPool
from .main import Hero, app, get_session
@pytest.fixture(name="session")
def session_fixture():
engine = create_engine(
"sqlite://", connect_args={"check_same_thread": False}, poolclass=StaticPool
)
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
yield session
@pytest.fixture(name="client")
def client_fixture(session: Session):
def get_session_override():
return session
app.dependency_overrides[get_session] = get_session_override
client = TestClient(app)
yield client
app.dependency_overrides.clear()
def test_create_hero(client: TestClient):
response = client.post(
"/heroes/", json={"name": "Deadpond", "secret_name": "Dive Wilson"}
)
data = response.json()
assert response.status_code == 200
assert data["name"] == "Deadpond"
assert data["secret_name"] == "Dive Wilson"
assert data["age"] is None
assert data["id"] is not None
def test_create_hero_incomplete(client: TestClient):
# No secret_name
response = client.post("/heroes/", json={"name": "Deadpond"})
assert response.status_code == 422
def test_create_hero_invalid(client: TestClient):
# secret_name has an invalid type
response = client.post(
"/heroes/",
json={
"name": "Deadpond",
"secret_name": {"message": "Do you wanna know my secret identity?"},
},
)
assert response.status_code == 422
def test_read_heroes(session: Session, client: TestClient):
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
session.add(hero_1)
session.add(hero_2)
session.commit()
response = client.get("/heroes/")
data = response.json()
assert response.status_code == 200
assert len(data) == 2
assert data[0]["name"] == hero_1.name
assert data[0]["secret_name"] == hero_1.secret_name
assert data[0]["age"] == hero_1.age
assert data[0]["id"] == hero_1.id
assert data[1]["name"] == hero_2.name
assert data[1]["secret_name"] == hero_2.secret_name
assert data[1]["age"] == hero_2.age
assert data[1]["id"] == hero_2.id
def test_read_hero(session: Session, client: TestClient):
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
session.add(hero_1)
session.commit()
response = client.get(f"/heroes/{hero_1.id}")
data = response.json()
assert response.status_code == 200
assert data["name"] == hero_1.name
assert data["secret_name"] == hero_1.secret_name
assert data["age"] == hero_1.age
assert data["id"] == hero_1.id
def test_update_hero(session: Session, client: TestClient):
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
session.add(hero_1)
session.commit()
response = client.patch(f"/heroes/{hero_1.id}", json={"name": "Deadpuddle"})
data = response.json()
assert response.status_code == 200
assert data["name"] == "Deadpuddle"
assert data["secret_name"] == "Dive Wilson"
assert data["age"] is None
assert data["id"] == hero_1.id
def test_delete_hero(session: Session, client: TestClient):
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
session.add(hero_1)
session.commit()
response = client.delete(f"/heroes/{hero_1.id}")
hero_in_db = session.get(Hero, hero_1.id)
assert response.status_code == 200
assert hero_in_db is None

View File

@ -0,0 +1,32 @@
from fastapi.testclient import TestClient
from sqlmodel import Session, SQLModel, create_engine
from .main import app, get_session # (1)!
def test_create_hero():
engine = create_engine(
"sqlite:///testing.db", connect_args={"check_same_thread": False}
)
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
def get_session_override():
return session
app.dependency_overrides[get_session] = get_session_override
client = TestClient(app) # (2)!
response = client.post( # (3)!
"/heroes/", json={"name": "Deadpond", "secret_name": "Dive Wilson"}
)
app.dependency_overrides.clear()
data = response.json() # (4)!
assert response.status_code == 200 # (5)!
assert data["name"] == "Deadpond" # (6)!
assert data["secret_name"] == "Dive Wilson" # (7)!
assert data["age"] is None # (8)!
assert data["id"] is not None # (9)!

View File

@ -0,0 +1,32 @@
from fastapi.testclient import TestClient
from sqlmodel import Session, SQLModel, create_engine
from .main import app, get_session # (1)!
def test_create_hero():
engine = create_engine(
"sqlite:///testing.db", connect_args={"check_same_thread": False}
)
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
def get_session_override(): # (2)!
return session # (3)!
app.dependency_overrides[get_session] = get_session_override # (4)!
client = TestClient(app)
response = client.post(
"/heroes/", json={"name": "Deadpond", "secret_name": "Dive Wilson"}
)
app.dependency_overrides.clear() # (5)!
data = response.json()
assert response.status_code == 200
assert data["name"] == "Deadpond"
assert data["secret_name"] == "Dive Wilson"
assert data["age"] is None
assert data["id"] is not None

View File

@ -0,0 +1,33 @@
from fastapi.testclient import TestClient
from sqlmodel import Session, SQLModel, create_engine
from .main import app, get_session # (1)!
def test_create_hero():
engine = create_engine( # (2)!
"sqlite:///testing.db", connect_args={"check_same_thread": False}
)
SQLModel.metadata.create_all(engine) # (3)!
with Session(engine) as session: # (4)!
def get_session_override():
return session # (5)!
app.dependency_overrides[get_session] = get_session_override # (4)!
client = TestClient(app)
response = client.post(
"/heroes/", json={"name": "Deadpond", "secret_name": "Dive Wilson"}
)
app.dependency_overrides.clear()
data = response.json()
assert response.status_code == 200
assert data["name"] == "Deadpond"
assert data["secret_name"] == "Dive Wilson"
assert data["age"] is None
assert data["id"] is not None
# (6)!

View File

@ -0,0 +1,35 @@
from fastapi.testclient import TestClient
from sqlmodel import Session, SQLModel, create_engine
from sqlmodel.pool import StaticPool # (1)!
from .main import app, get_session
def test_create_hero():
engine = create_engine(
"sqlite://", # (2)!
connect_args={"check_same_thread": False},
poolclass=StaticPool, # (3)!
)
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
def get_session_override():
return session
app.dependency_overrides[get_session] = get_session_override
client = TestClient(app)
response = client.post(
"/heroes/", json={"name": "Deadpond", "secret_name": "Dive Wilson"}
)
app.dependency_overrides.clear()
data = response.json()
assert response.status_code == 200
assert data["name"] == "Deadpond"
assert data["secret_name"] == "Dive Wilson"
assert data["age"] is None
assert data["id"] is not None

View File

@ -0,0 +1,37 @@
import pytest # (1)!
from fastapi.testclient import TestClient
from sqlmodel import Session, SQLModel, create_engine
from sqlmodel.pool import StaticPool
from .main import app, get_session
@pytest.fixture(name="session") # (2)!
def session_fixture(): # (3)!
engine = create_engine(
"sqlite://", connect_args={"check_same_thread": False}, poolclass=StaticPool
)
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
yield session # (4)!
def test_create_hero(session: Session): # (5)!
def get_session_override():
return session # (6)!
app.dependency_overrides[get_session] = get_session_override
client = TestClient(app)
response = client.post(
"/heroes/", json={"name": "Deadpond", "secret_name": "Dive Wilson"}
)
app.dependency_overrides.clear()
data = response.json()
assert response.status_code == 200
assert data["name"] == "Deadpond"
assert data["secret_name"] == "Dive Wilson"
assert data["age"] is None
assert data["id"] is not None

View File

@ -0,0 +1,41 @@
import pytest
from fastapi.testclient import TestClient
from sqlmodel import Session, SQLModel, create_engine
from sqlmodel.pool import StaticPool
from .main import app, get_session
@pytest.fixture(name="session")
def session_fixture():
engine = create_engine(
"sqlite://", connect_args={"check_same_thread": False}, poolclass=StaticPool
)
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
yield session
@pytest.fixture(name="client") # (1)!
def client_fixture(session: Session): # (2)!
def get_session_override(): # (3)!
return session
app.dependency_overrides[get_session] = get_session_override # (4)!
client = TestClient(app) # (5)!
yield client # (6)!
app.dependency_overrides.clear() # (7)!
def test_create_hero(client: TestClient): # (8)!
response = client.post(
"/heroes/", json={"name": "Deadpond", "secret_name": "Dive Wilson"}
)
data = response.json()
assert response.status_code == 200
assert data["name"] == "Deadpond"
assert data["secret_name"] == "Dive Wilson"
assert data["age"] is None
assert data["id"] is not None

View File

@ -0,0 +1,97 @@
from fastapi import FastAPI, HTTPException, Query
from sqlmodel import Field, Session, SQLModel, create_engine, select
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
class Hero(HeroBase, table=True):
id: int | None = Field(default=None, primary_key=True)
class HeroCreate(HeroBase):
pass
class HeroRead(HeroBase):
id: int
class HeroUpdate(SQLModel):
name: str | None = None
secret_name: str | None = None
age: int | None = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(hero: HeroCreate):
with Session(engine) as session:
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes(offset: int = 0, limit: int = Query(default=100, le=100)):
with Session(engine) as session:
heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
return heroes
@app.get("/heroes/{hero_id}", response_model=HeroRead)
def read_hero(hero_id: int):
with Session(engine) as session:
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero
@app.patch("/heroes/{hero_id}", response_model=HeroRead)
def update_hero(hero_id: int, hero: HeroUpdate):
with Session(engine) as session:
db_hero = session.get(Hero, hero_id)
if not db_hero:
raise HTTPException(status_code=404, detail="Hero not found")
hero_data = hero.dict(exclude_unset=True)
for key, value in hero_data.items():
setattr(db_hero, key, value)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.delete("/heroes/{hero_id}")
def delete_hero(hero_id: int):
with Session(engine) as session:
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
session.delete(hero)
session.commit()
return {"ok": True}

View File

@ -0,0 +1,99 @@
from typing import Optional
from fastapi import FastAPI, HTTPException, Query
from sqlmodel import Field, Session, SQLModel, create_engine, select
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
class Hero(HeroBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
class HeroCreate(HeroBase):
pass
class HeroRead(HeroBase):
id: int
class HeroUpdate(SQLModel):
name: Optional[str] = None
secret_name: Optional[str] = None
age: Optional[int] = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(hero: HeroCreate):
with Session(engine) as session:
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes(offset: int = 0, limit: int = Query(default=100, le=100)):
with Session(engine) as session:
heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
return heroes
@app.get("/heroes/{hero_id}", response_model=HeroRead)
def read_hero(hero_id: int):
with Session(engine) as session:
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero
@app.patch("/heroes/{hero_id}", response_model=HeroRead)
def update_hero(hero_id: int, hero: HeroUpdate):
with Session(engine) as session:
db_hero = session.get(Hero, hero_id)
if not db_hero:
raise HTTPException(status_code=404, detail="Hero not found")
hero_data = hero.dict(exclude_unset=True)
for key, value in hero_data.items():
setattr(db_hero, key, value)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.delete("/heroes/{hero_id}")
def delete_hero(hero_id: int):
with Session(engine) as session:
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
session.delete(hero)
session.commit()
return {"ok": True}

View File

@ -0,0 +1,65 @@
from fastapi import FastAPI, HTTPException, Query
from sqlmodel import Field, Session, SQLModel, create_engine, select
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
class Hero(HeroBase, table=True):
id: int | None = Field(default=None, primary_key=True)
class HeroCreate(HeroBase):
pass
class HeroRead(HeroBase):
id: int
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(hero: HeroCreate):
with Session(engine) as session:
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes(offset: int = 0, limit: int = Query(default=100, le=100)):
with Session(engine) as session:
heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
return heroes
@app.get("/heroes/{hero_id}", response_model=HeroRead)
def read_hero(hero_id: int):
with Session(engine) as session:
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero

View File

@ -0,0 +1,67 @@
from typing import Optional
from fastapi import FastAPI, HTTPException, Query
from sqlmodel import Field, Session, SQLModel, create_engine, select
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
class Hero(HeroBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
class HeroCreate(HeroBase):
pass
class HeroRead(HeroBase):
id: int
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(hero: HeroCreate):
with Session(engine) as session:
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes(offset: int = 0, limit: int = Query(default=100, le=100)):
with Session(engine) as session:
heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
return heroes
@app.get("/heroes/{hero_id}", response_model=HeroRead)
def read_hero(hero_id: int):
with Session(engine) as session:
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero

View File

@ -0,0 +1,58 @@
from fastapi import FastAPI
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
class HeroCreate(SQLModel):
name: str
secret_name: str
age: int | None = None
class HeroRead(SQLModel):
id: int
name: str
secret_name: str
age: int | None = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(hero: HeroCreate):
with Session(engine) as session:
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes():
with Session(engine) as session:
heroes = session.exec(select(Hero)).all()
return heroes

View File

@ -0,0 +1,60 @@
from typing import Optional
from fastapi import FastAPI
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
class HeroCreate(SQLModel):
name: str
secret_name: str
age: Optional[int] = None
class HeroRead(SQLModel):
id: int
name: str
secret_name: str
age: Optional[int] = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(hero: HeroCreate):
with Session(engine) as session:
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes():
with Session(engine) as session:
heroes = session.exec(select(Hero)).all()
return heroes

View File

@ -0,0 +1,56 @@
from fastapi import FastAPI
from sqlmodel import Field, Session, SQLModel, create_engine, select
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
class Hero(HeroBase, table=True):
id: int | None = Field(default=None, primary_key=True)
class HeroCreate(HeroBase):
pass
class HeroRead(HeroBase):
id: int
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(hero: HeroCreate):
with Session(engine) as session:
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes():
with Session(engine) as session:
heroes = session.exec(select(Hero)).all()
return heroes

View File

@ -0,0 +1,58 @@
from typing import Optional
from fastapi import FastAPI
from sqlmodel import Field, Session, SQLModel, create_engine, select
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
class Hero(HeroBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
class HeroCreate(HeroBase):
pass
class HeroRead(HeroBase):
id: int
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(hero: HeroCreate):
with Session(engine) as session:
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes():
with Session(engine) as session:
heroes = session.exec(select(Hero)).all()
return heroes

View File

@ -0,0 +1,65 @@
from fastapi import FastAPI, HTTPException
from sqlmodel import Field, Session, SQLModel, create_engine, select
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
class Hero(HeroBase, table=True):
id: int | None = Field(default=None, primary_key=True)
class HeroCreate(HeroBase):
pass
class HeroRead(HeroBase):
id: int
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(hero: HeroCreate):
with Session(engine) as session:
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes():
with Session(engine) as session:
heroes = session.exec(select(Hero)).all()
return heroes
@app.get("/heroes/{hero_id}", response_model=HeroRead)
def read_hero(hero_id: int):
with Session(engine) as session:
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero

View File

@ -0,0 +1,67 @@
from typing import Optional
from fastapi import FastAPI, HTTPException
from sqlmodel import Field, Session, SQLModel, create_engine, select
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
class Hero(HeroBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
class HeroCreate(HeroBase):
pass
class HeroRead(HeroBase):
id: int
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(hero: HeroCreate):
with Session(engine) as session:
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes():
with Session(engine) as session:
heroes = session.exec(select(Hero)).all()
return heroes
@app.get("/heroes/{hero_id}", response_model=HeroRead)
def read_hero(hero_id: int):
with Session(engine) as session:
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero

View File

@ -0,0 +1,199 @@
from fastapi import Depends, FastAPI, HTTPException, Query
from sqlmodel import Field, Relationship, Session, SQLModel, create_engine, select
class TeamBase(SQLModel):
name: str = Field(index=True)
headquarters: str
class Team(TeamBase, table=True):
id: int | None = Field(default=None, primary_key=True)
heroes: list["Hero"] = Relationship(back_populates="team")
class TeamCreate(TeamBase):
pass
class TeamRead(TeamBase):
id: int
class TeamUpdate(SQLModel):
id: int | None = None
name: str | None = None
headquarters: str | None = None
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
team_id: int | None = Field(default=None, foreign_key="team.id")
class Hero(HeroBase, table=True):
id: int | None = Field(default=None, primary_key=True)
team: Team | None = Relationship(back_populates="heroes")
class HeroRead(HeroBase):
id: int
class HeroCreate(HeroBase):
pass
class HeroUpdate(SQLModel):
name: str | None = None
secret_name: str | None = None
age: int | None = None
team_id: int | None = None
class HeroReadWithTeam(HeroRead):
team: TeamRead | None = None
class TeamReadWithHeroes(TeamRead):
heroes: list[HeroRead] = []
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def get_session():
with Session(engine) as session:
yield session
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(*, session: Session = Depends(get_session), hero: HeroCreate):
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes(
*,
session: Session = Depends(get_session),
offset: int = 0,
limit: int = Query(default=100, le=100),
):
heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
return heroes
@app.get("/heroes/{hero_id}", response_model=HeroReadWithTeam)
def read_hero(*, session: Session = Depends(get_session), hero_id: int):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero
@app.patch("/heroes/{hero_id}", response_model=HeroRead)
def update_hero(
*, session: Session = Depends(get_session), hero_id: int, hero: HeroUpdate
):
db_hero = session.get(Hero, hero_id)
if not db_hero:
raise HTTPException(status_code=404, detail="Hero not found")
hero_data = hero.dict(exclude_unset=True)
for key, value in hero_data.items():
setattr(db_hero, key, value)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.delete("/heroes/{hero_id}")
def delete_hero(*, session: Session = Depends(get_session), hero_id: int):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
session.delete(hero)
session.commit()
return {"ok": True}
@app.post("/teams/", response_model=TeamRead)
def create_team(*, session: Session = Depends(get_session), team: TeamCreate):
db_team = Team.from_orm(team)
session.add(db_team)
session.commit()
session.refresh(db_team)
return db_team
@app.get("/teams/", response_model=list[TeamRead])
def read_teams(
*,
session: Session = Depends(get_session),
offset: int = 0,
limit: int = Query(default=100, le=100),
):
teams = session.exec(select(Team).offset(offset).limit(limit)).all()
return teams
@app.get("/teams/{team_id}", response_model=TeamReadWithHeroes)
def read_team(*, team_id: int, session: Session = Depends(get_session)):
team = session.get(Team, team_id)
if not team:
raise HTTPException(status_code=404, detail="Team not found")
return team
@app.patch("/teams/{team_id}", response_model=TeamRead)
def update_team(
*,
session: Session = Depends(get_session),
team_id: int,
team: TeamUpdate,
):
db_team = session.get(Team, team_id)
if not db_team:
raise HTTPException(status_code=404, detail="Team not found")
team_data = team.dict(exclude_unset=True)
for key, value in team_data.items():
setattr(db_team, key, value)
session.add(db_team)
session.commit()
session.refresh(db_team)
return db_team
@app.delete("/teams/{team_id}")
def delete_team(*, session: Session = Depends(get_session), team_id: int):
team = session.get(Team, team_id)
if not team:
raise HTTPException(status_code=404, detail="Team not found")
session.delete(team)
session.commit()
return {"ok": True}

View File

@ -0,0 +1,201 @@
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, Query
from sqlmodel import Field, Relationship, Session, SQLModel, create_engine, select
class TeamBase(SQLModel):
name: str = Field(index=True)
headquarters: str
class Team(TeamBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
heroes: list["Hero"] = Relationship(back_populates="team")
class TeamCreate(TeamBase):
pass
class TeamRead(TeamBase):
id: int
class TeamUpdate(SQLModel):
id: Optional[int] = None
name: Optional[str] = None
headquarters: Optional[str] = None
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
team_id: Optional[int] = Field(default=None, foreign_key="team.id")
class Hero(HeroBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
team: Optional[Team] = Relationship(back_populates="heroes")
class HeroRead(HeroBase):
id: int
class HeroCreate(HeroBase):
pass
class HeroUpdate(SQLModel):
name: Optional[str] = None
secret_name: Optional[str] = None
age: Optional[int] = None
team_id: Optional[int] = None
class HeroReadWithTeam(HeroRead):
team: Optional[TeamRead] = None
class TeamReadWithHeroes(TeamRead):
heroes: list[HeroRead] = []
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def get_session():
with Session(engine) as session:
yield session
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(*, session: Session = Depends(get_session), hero: HeroCreate):
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes(
*,
session: Session = Depends(get_session),
offset: int = 0,
limit: int = Query(default=100, le=100),
):
heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
return heroes
@app.get("/heroes/{hero_id}", response_model=HeroReadWithTeam)
def read_hero(*, session: Session = Depends(get_session), hero_id: int):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero
@app.patch("/heroes/{hero_id}", response_model=HeroRead)
def update_hero(
*, session: Session = Depends(get_session), hero_id: int, hero: HeroUpdate
):
db_hero = session.get(Hero, hero_id)
if not db_hero:
raise HTTPException(status_code=404, detail="Hero not found")
hero_data = hero.dict(exclude_unset=True)
for key, value in hero_data.items():
setattr(db_hero, key, value)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.delete("/heroes/{hero_id}")
def delete_hero(*, session: Session = Depends(get_session), hero_id: int):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
session.delete(hero)
session.commit()
return {"ok": True}
@app.post("/teams/", response_model=TeamRead)
def create_team(*, session: Session = Depends(get_session), team: TeamCreate):
db_team = Team.from_orm(team)
session.add(db_team)
session.commit()
session.refresh(db_team)
return db_team
@app.get("/teams/", response_model=list[TeamRead])
def read_teams(
*,
session: Session = Depends(get_session),
offset: int = 0,
limit: int = Query(default=100, le=100),
):
teams = session.exec(select(Team).offset(offset).limit(limit)).all()
return teams
@app.get("/teams/{team_id}", response_model=TeamReadWithHeroes)
def read_team(*, team_id: int, session: Session = Depends(get_session)):
team = session.get(Team, team_id)
if not team:
raise HTTPException(status_code=404, detail="Team not found")
return team
@app.patch("/teams/{team_id}", response_model=TeamRead)
def update_team(
*,
session: Session = Depends(get_session),
team_id: int,
team: TeamUpdate,
):
db_team = session.get(Team, team_id)
if not db_team:
raise HTTPException(status_code=404, detail="Team not found")
team_data = team.dict(exclude_unset=True)
for key, value in team_data.items():
setattr(db_team, key, value)
session.add(db_team)
session.commit()
session.refresh(db_team)
return db_team
@app.delete("/teams/{team_id}")
def delete_team(*, session: Session = Depends(get_session), team_id: int):
team = session.get(Team, team_id)
if not team:
raise HTTPException(status_code=404, detail="Team not found")
session.delete(team)
session.commit()
return {"ok": True}

View File

@ -0,0 +1,44 @@
from fastapi import FastAPI
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=Hero)
def create_hero(hero: Hero):
with Session(engine) as session:
session.add(hero)
session.commit()
session.refresh(hero)
return hero
@app.get("/heroes/", response_model=list[Hero])
def read_heroes():
with Session(engine) as session:
heroes = session.exec(select(Hero)).all()
return heroes

View File

@ -0,0 +1,46 @@
from typing import Optional
from fastapi import FastAPI
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=Hero)
def create_hero(hero: Hero):
with Session(engine) as session:
session.add(hero)
session.commit()
session.refresh(hero)
return hero
@app.get("/heroes/", response_model=list[Hero])
def read_heroes():
with Session(engine) as session:
heroes = session.exec(select(Hero)).all()
return heroes

View File

@ -0,0 +1,104 @@
from fastapi import Depends, FastAPI, HTTPException, Query
from sqlmodel import Field, Session, SQLModel, create_engine, select
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
class Hero(HeroBase, table=True):
id: int | None = Field(default=None, primary_key=True)
class HeroCreate(HeroBase):
pass
class HeroRead(HeroBase):
id: int
class HeroUpdate(SQLModel):
name: str | None = None
secret_name: str | None = None
age: int | None = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def get_session():
with Session(engine) as session:
yield session
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(*, session: Session = Depends(get_session), hero: HeroCreate):
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes(
*,
session: Session = Depends(get_session),
offset: int = 0,
limit: int = Query(default=100, le=100),
):
heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
return heroes
@app.get("/heroes/{hero_id}", response_model=HeroRead)
def read_hero(*, session: Session = Depends(get_session), hero_id: int):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero
@app.patch("/heroes/{hero_id}", response_model=HeroRead)
def update_hero(
*, session: Session = Depends(get_session), hero_id: int, hero: HeroUpdate
):
db_hero = session.get(Hero, hero_id)
if not db_hero:
raise HTTPException(status_code=404, detail="Hero not found")
hero_data = hero.dict(exclude_unset=True)
for key, value in hero_data.items():
setattr(db_hero, key, value)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.delete("/heroes/{hero_id}")
def delete_hero(*, session: Session = Depends(get_session), hero_id: int):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
session.delete(hero)
session.commit()
return {"ok": True}

View File

@ -0,0 +1,106 @@
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, Query
from sqlmodel import Field, Session, SQLModel, create_engine, select
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
class Hero(HeroBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
class HeroCreate(HeroBase):
pass
class HeroRead(HeroBase):
id: int
class HeroUpdate(SQLModel):
name: Optional[str] = None
secret_name: Optional[str] = None
age: Optional[int] = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def get_session():
with Session(engine) as session:
yield session
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(*, session: Session = Depends(get_session), hero: HeroCreate):
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes(
*,
session: Session = Depends(get_session),
offset: int = 0,
limit: int = Query(default=100, le=100),
):
heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
return heroes
@app.get("/heroes/{hero_id}", response_model=HeroRead)
def read_hero(*, session: Session = Depends(get_session), hero_id: int):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero
@app.patch("/heroes/{hero_id}", response_model=HeroRead)
def update_hero(
*, session: Session = Depends(get_session), hero_id: int, hero: HeroUpdate
):
db_hero = session.get(Hero, hero_id)
if not db_hero:
raise HTTPException(status_code=404, detail="Hero not found")
hero_data = hero.dict(exclude_unset=True)
for key, value in hero_data.items():
setattr(db_hero, key, value)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.delete("/heroes/{hero_id}")
def delete_hero(*, session: Session = Depends(get_session), hero_id: int):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
session.delete(hero)
session.commit()
return {"ok": True}

View File

@ -0,0 +1,44 @@
from fastapi import FastAPI
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/")
def create_hero(hero: Hero):
with Session(engine) as session:
session.add(hero)
session.commit()
session.refresh(hero)
return hero
@app.get("/heroes/")
def read_heroes():
with Session(engine) as session:
heroes = session.exec(select(Hero)).all()
return heroes

View File

@ -0,0 +1,190 @@
from fastapi import Depends, FastAPI, HTTPException, Query
from sqlmodel import Field, Relationship, Session, SQLModel, create_engine, select
class TeamBase(SQLModel):
name: str = Field(index=True)
headquarters: str
class Team(TeamBase, table=True):
id: int | None = Field(default=None, primary_key=True)
heroes: list["Hero"] = Relationship(back_populates="team")
class TeamCreate(TeamBase):
pass
class TeamRead(TeamBase):
id: int
class TeamUpdate(SQLModel):
name: str | None = None
headquarters: str | None = None
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
team_id: int | None = Field(default=None, foreign_key="team.id")
class Hero(HeroBase, table=True):
id: int | None = Field(default=None, primary_key=True)
team: Team | None = Relationship(back_populates="heroes")
class HeroRead(HeroBase):
id: int
class HeroCreate(HeroBase):
pass
class HeroUpdate(SQLModel):
name: str | None = None
secret_name: str | None = None
age: int | None = None
team_id: int | None = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def get_session():
with Session(engine) as session:
yield session
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(*, session: Session = Depends(get_session), hero: HeroCreate):
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes(
*,
session: Session = Depends(get_session),
offset: int = 0,
limit: int = Query(default=100, le=100),
):
heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
return heroes
@app.get("/heroes/{hero_id}", response_model=HeroRead)
def read_hero(*, session: Session = Depends(get_session), hero_id: int):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero
@app.patch("/heroes/{hero_id}", response_model=HeroRead)
def update_hero(
*, session: Session = Depends(get_session), hero_id: int, hero: HeroUpdate
):
db_hero = session.get(Hero, hero_id)
if not db_hero:
raise HTTPException(status_code=404, detail="Hero not found")
hero_data = hero.dict(exclude_unset=True)
for key, value in hero_data.items():
setattr(db_hero, key, value)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.delete("/heroes/{hero_id}")
def delete_hero(*, session: Session = Depends(get_session), hero_id: int):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
session.delete(hero)
session.commit()
return {"ok": True}
@app.post("/teams/", response_model=TeamRead)
def create_team(*, session: Session = Depends(get_session), team: TeamCreate):
db_team = Team.from_orm(team)
session.add(db_team)
session.commit()
session.refresh(db_team)
return db_team
@app.get("/teams/", response_model=list[TeamRead])
def read_teams(
*,
session: Session = Depends(get_session),
offset: int = 0,
limit: int = Query(default=100, le=100),
):
teams = session.exec(select(Team).offset(offset).limit(limit)).all()
return teams
@app.get("/teams/{team_id}", response_model=TeamRead)
def read_team(*, team_id: int, session: Session = Depends(get_session)):
team = session.get(Team, team_id)
if not team:
raise HTTPException(status_code=404, detail="Team not found")
return team
@app.patch("/teams/{team_id}", response_model=TeamRead)
def update_team(
*,
session: Session = Depends(get_session),
team_id: int,
team: TeamUpdate,
):
db_team = session.get(Team, team_id)
if not db_team:
raise HTTPException(status_code=404, detail="Team not found")
team_data = team.dict(exclude_unset=True)
for key, value in team_data.items():
setattr(db_team, key, value)
session.add(db_team)
session.commit()
session.refresh(db_team)
return db_team
@app.delete("/teams/{team_id}")
def delete_team(*, session: Session = Depends(get_session), team_id: int):
team = session.get(Team, team_id)
if not team:
raise HTTPException(status_code=404, detail="Team not found")
session.delete(team)
session.commit()
return {"ok": True}

View File

@ -0,0 +1,192 @@
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, Query
from sqlmodel import Field, Relationship, Session, SQLModel, create_engine, select
class TeamBase(SQLModel):
name: str = Field(index=True)
headquarters: str
class Team(TeamBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
heroes: list["Hero"] = Relationship(back_populates="team")
class TeamCreate(TeamBase):
pass
class TeamRead(TeamBase):
id: int
class TeamUpdate(SQLModel):
name: Optional[str] = None
headquarters: Optional[str] = None
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
team_id: Optional[int] = Field(default=None, foreign_key="team.id")
class Hero(HeroBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
team: Optional[Team] = Relationship(back_populates="heroes")
class HeroRead(HeroBase):
id: int
class HeroCreate(HeroBase):
pass
class HeroUpdate(SQLModel):
name: Optional[str] = None
secret_name: Optional[str] = None
age: Optional[int] = None
team_id: Optional[int] = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def get_session():
with Session(engine) as session:
yield session
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(*, session: Session = Depends(get_session), hero: HeroCreate):
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes(
*,
session: Session = Depends(get_session),
offset: int = 0,
limit: int = Query(default=100, le=100),
):
heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
return heroes
@app.get("/heroes/{hero_id}", response_model=HeroRead)
def read_hero(*, session: Session = Depends(get_session), hero_id: int):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero
@app.patch("/heroes/{hero_id}", response_model=HeroRead)
def update_hero(
*, session: Session = Depends(get_session), hero_id: int, hero: HeroUpdate
):
db_hero = session.get(Hero, hero_id)
if not db_hero:
raise HTTPException(status_code=404, detail="Hero not found")
hero_data = hero.dict(exclude_unset=True)
for key, value in hero_data.items():
setattr(db_hero, key, value)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.delete("/heroes/{hero_id}")
def delete_hero(*, session: Session = Depends(get_session), hero_id: int):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
session.delete(hero)
session.commit()
return {"ok": True}
@app.post("/teams/", response_model=TeamRead)
def create_team(*, session: Session = Depends(get_session), team: TeamCreate):
db_team = Team.from_orm(team)
session.add(db_team)
session.commit()
session.refresh(db_team)
return db_team
@app.get("/teams/", response_model=list[TeamRead])
def read_teams(
*,
session: Session = Depends(get_session),
offset: int = 0,
limit: int = Query(default=100, le=100),
):
teams = session.exec(select(Team).offset(offset).limit(limit)).all()
return teams
@app.get("/teams/{team_id}", response_model=TeamRead)
def read_team(*, team_id: int, session: Session = Depends(get_session)):
team = session.get(Team, team_id)
if not team:
raise HTTPException(status_code=404, detail="Team not found")
return team
@app.patch("/teams/{team_id}", response_model=TeamRead)
def update_team(
*,
session: Session = Depends(get_session),
team_id: int,
team: TeamUpdate,
):
db_team = session.get(Team, team_id)
if not db_team:
raise HTTPException(status_code=404, detail="Team not found")
team_data = team.dict(exclude_unset=True)
for key, value in team_data.items():
setattr(db_team, key, value)
session.add(db_team)
session.commit()
session.refresh(db_team)
return db_team
@app.delete("/teams/{team_id}")
def delete_team(*, session: Session = Depends(get_session), team_id: int):
team = session.get(Team, team_id)
if not team:
raise HTTPException(status_code=404, detail="Team not found")
session.delete(team)
session.commit()
return {"ok": True}

View File

@ -0,0 +1,86 @@
from fastapi import FastAPI, HTTPException, Query
from sqlmodel import Field, Session, SQLModel, create_engine, select
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
class Hero(HeroBase, table=True):
id: int | None = Field(default=None, primary_key=True)
class HeroCreate(HeroBase):
pass
class HeroRead(HeroBase):
id: int
class HeroUpdate(SQLModel):
name: str | None = None
secret_name: str | None = None
age: int | None = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(hero: HeroCreate):
with Session(engine) as session:
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes(offset: int = 0, limit: int = Query(default=100, le=100)):
with Session(engine) as session:
heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
return heroes
@app.get("/heroes/{hero_id}", response_model=HeroRead)
def read_hero(hero_id: int):
with Session(engine) as session:
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero
@app.patch("/heroes/{hero_id}", response_model=HeroRead)
def update_hero(hero_id: int, hero: HeroUpdate):
with Session(engine) as session:
db_hero = session.get(Hero, hero_id)
if not db_hero:
raise HTTPException(status_code=404, detail="Hero not found")
hero_data = hero.dict(exclude_unset=True)
for key, value in hero_data.items():
setattr(db_hero, key, value)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero

View File

@ -0,0 +1,88 @@
from typing import Optional
from fastapi import FastAPI, HTTPException, Query
from sqlmodel import Field, Session, SQLModel, create_engine, select
class HeroBase(SQLModel):
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
class Hero(HeroBase, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
class HeroCreate(HeroBase):
pass
class HeroRead(HeroBase):
id: int
class HeroUpdate(SQLModel):
name: Optional[str] = None
secret_name: Optional[str] = None
age: Optional[int] = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, echo=True, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/", response_model=HeroRead)
def create_hero(hero: HeroCreate):
with Session(engine) as session:
db_hero = Hero.from_orm(hero)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero
@app.get("/heroes/", response_model=list[HeroRead])
def read_heroes(offset: int = 0, limit: int = Query(default=100, le=100)):
with Session(engine) as session:
heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
return heroes
@app.get("/heroes/{hero_id}", response_model=HeroRead)
def read_hero(hero_id: int):
with Session(engine) as session:
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero
@app.patch("/heroes/{hero_id}", response_model=HeroRead)
def update_hero(hero_id: int, hero: HeroUpdate):
with Session(engine) as session:
db_hero = session.get(Hero, hero_id)
if not db_hero:
raise HTTPException(status_code=404, detail="Hero not found")
hero_data = hero.dict(exclude_unset=True)
for key, value in hero_data.items():
setattr(db_hero, key, value)
session.add(db_hero)
session.commit()
session.refresh(db_hero)
return db_hero

View File

@ -0,0 +1,49 @@
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.commit()
def select_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.name == "Deadpond")
results = session.exec(statement)
for hero in results:
print(hero)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,57 @@
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
hero_4 = Hero(name="Tarantula", secret_name="Natalia Roman-on", age=32)
hero_5 = Hero(name="Black Lion", secret_name="Trevor Challa", age=35)
hero_6 = Hero(name="Dr. Weird", secret_name="Steve Weird", age=36)
hero_7 = Hero(name="Captain North America", secret_name="Esteban Rogelios", age=93)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.add(hero_4)
session.add(hero_5)
session.add(hero_6)
session.add(hero_7)
session.commit()
def select_heroes():
with Session(engine) as session:
statement = select(Hero).where(Hero.age <= 35)
results = session.exec(statement)
for hero in results:
print(hero)
def main():
create_db_and_tables()
create_heroes()
select_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,43 @@
from sqlmodel import Field, Session, SQLModel, create_engine
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str
secret_name: str
age: int | None = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
session = Session(engine)
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.commit()
session.close()
def main():
create_db_and_tables()
create_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,40 @@
from sqlmodel import Field, Session, SQLModel, create_engine
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str
secret_name: str
age: int | None = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson")
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
with Session(engine) as session:
session.add(hero_1)
session.add(hero_2)
session.add(hero_3)
session.commit()
def main():
create_db_and_tables()
create_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,41 @@
from sqlmodel import Field, Session, SQLModel, create_engine
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str
secret_name: str
age: int | None = None
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes(): # (1)!
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson") # (2)!
hero_2 = Hero(name="Spider-Boy", secret_name="Pedro Parqueador")
hero_3 = Hero(name="Rusty-Man", secret_name="Tommy Sharp", age=48)
with Session(engine) as session: # (3)!
session.add(hero_1) # (4)!
session.add(hero_2)
session.add(hero_3)
session.commit() # (5)!
# (6)!
def main(): # (7)!
create_db_and_tables() # (8)!
create_heroes() # (9)!
if __name__ == "__main__": # (10)!
main() # (11)!

View File

@ -0,0 +1,78 @@
from sqlmodel import Field, Relationship, Session, SQLModel, create_engine
class HeroTeamLink(SQLModel, table=True):
team_id: int | None = Field(default=None, foreign_key="team.id", primary_key=True)
hero_id: int | None = Field(default=None, foreign_key="hero.id", primary_key=True)
class Team(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
heroes: list["Hero"] = Relationship(back_populates="teams", link_model=HeroTeamLink)
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
teams: list[Team] = Relationship(back_populates="heroes", link_model=HeroTeamLink)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
with Session(engine) as session:
team_preventers = Team(name="Preventers", headquarters="Sharp Tower")
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
hero_deadpond = Hero(
name="Deadpond",
secret_name="Dive Wilson",
teams=[team_z_force, team_preventers],
)
hero_rusty_man = Hero(
name="Rusty-Man",
secret_name="Tommy Sharp",
age=48,
teams=[team_preventers],
)
hero_spider_boy = Hero(
name="Spider-Boy", secret_name="Pedro Parqueador", teams=[team_preventers]
)
session.add(hero_deadpond)
session.add(hero_rusty_man)
session.add(hero_spider_boy)
session.commit()
session.refresh(hero_deadpond)
session.refresh(hero_rusty_man)
session.refresh(hero_spider_boy)
print("Deadpond:", hero_deadpond)
print("Deadpond teams:", hero_deadpond.teams)
print("Rusty-Man:", hero_rusty_man)
print("Rusty-Man Teams:", hero_rusty_man.teams)
print("Spider-Boy:", hero_spider_boy)
print("Spider-Boy Teams:", hero_spider_boy.teams)
def main():
create_db_and_tables()
create_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,84 @@
from typing import Optional
from sqlmodel import Field, Relationship, Session, SQLModel, create_engine
class HeroTeamLink(SQLModel, table=True):
team_id: Optional[int] = Field(
default=None, foreign_key="team.id", primary_key=True
)
hero_id: Optional[int] = Field(
default=None, foreign_key="hero.id", primary_key=True
)
class Team(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
heroes: list["Hero"] = Relationship(back_populates="teams", link_model=HeroTeamLink)
class Hero(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
teams: list[Team] = Relationship(back_populates="heroes", link_model=HeroTeamLink)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
with Session(engine) as session:
team_preventers = Team(name="Preventers", headquarters="Sharp Tower")
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
hero_deadpond = Hero(
name="Deadpond",
secret_name="Dive Wilson",
teams=[team_z_force, team_preventers],
)
hero_rusty_man = Hero(
name="Rusty-Man",
secret_name="Tommy Sharp",
age=48,
teams=[team_preventers],
)
hero_spider_boy = Hero(
name="Spider-Boy", secret_name="Pedro Parqueador", teams=[team_preventers]
)
session.add(hero_deadpond)
session.add(hero_rusty_man)
session.add(hero_spider_boy)
session.commit()
session.refresh(hero_deadpond)
session.refresh(hero_rusty_man)
session.refresh(hero_spider_boy)
print("Deadpond:", hero_deadpond)
print("Deadpond teams:", hero_deadpond.teams)
print("Rusty-Man:", hero_rusty_man)
print("Rusty-Man Teams:", hero_rusty_man.teams)
print("Spider-Boy:", hero_spider_boy)
print("Spider-Boy Teams:", hero_spider_boy.teams)
def main():
create_db_and_tables()
create_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,101 @@
from sqlmodel import Field, Relationship, Session, SQLModel, create_engine, select
class HeroTeamLink(SQLModel, table=True):
team_id: int | None = Field(default=None, foreign_key="team.id", primary_key=True)
hero_id: int | None = Field(default=None, foreign_key="hero.id", primary_key=True)
class Team(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
heroes: list["Hero"] = Relationship(back_populates="teams", link_model=HeroTeamLink)
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
teams: list[Team] = Relationship(back_populates="heroes", link_model=HeroTeamLink)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
with Session(engine) as session:
team_preventers = Team(name="Preventers", headquarters="Sharp Tower")
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
hero_deadpond = Hero(
name="Deadpond",
secret_name="Dive Wilson",
teams=[team_z_force, team_preventers],
)
hero_rusty_man = Hero(
name="Rusty-Man",
secret_name="Tommy Sharp",
age=48,
teams=[team_preventers],
)
hero_spider_boy = Hero(
name="Spider-Boy", secret_name="Pedro Parqueador", teams=[team_preventers]
)
session.add(hero_deadpond)
session.add(hero_rusty_man)
session.add(hero_spider_boy)
session.commit()
session.refresh(hero_deadpond)
session.refresh(hero_rusty_man)
session.refresh(hero_spider_boy)
print("Deadpond:", hero_deadpond)
print("Deadpond teams:", hero_deadpond.teams)
print("Rusty-Man:", hero_rusty_man)
print("Rusty-Man Teams:", hero_rusty_man.teams)
print("Spider-Boy:", hero_spider_boy)
print("Spider-Boy Teams:", hero_spider_boy.teams)
def update_heroes():
with Session(engine) as session:
hero_spider_boy = session.exec(
select(Hero).where(Hero.name == "Spider-Boy")
).one()
team_z_force = session.exec(select(Team).where(Team.name == "Z-Force")).one()
team_z_force.heroes.append(hero_spider_boy)
session.add(team_z_force)
session.commit()
print("Updated Spider-Boy's Teams:", hero_spider_boy.teams)
print("Z-Force heroes:", team_z_force.heroes)
hero_spider_boy.teams.remove(team_z_force)
session.add(team_z_force)
session.commit()
print("Reverted Z-Force's heroes:", team_z_force.heroes)
print("Reverted Spider-Boy's teams:", hero_spider_boy.teams)
def main():
create_db_and_tables()
create_heroes()
update_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,107 @@
from typing import Optional
from sqlmodel import Field, Relationship, Session, SQLModel, create_engine, select
class HeroTeamLink(SQLModel, table=True):
team_id: Optional[int] = Field(
default=None, foreign_key="team.id", primary_key=True
)
hero_id: Optional[int] = Field(
default=None, foreign_key="hero.id", primary_key=True
)
class Team(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
heroes: list["Hero"] = Relationship(back_populates="teams", link_model=HeroTeamLink)
class Hero(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
teams: list[Team] = Relationship(back_populates="heroes", link_model=HeroTeamLink)
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
with Session(engine) as session:
team_preventers = Team(name="Preventers", headquarters="Sharp Tower")
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
hero_deadpond = Hero(
name="Deadpond",
secret_name="Dive Wilson",
teams=[team_z_force, team_preventers],
)
hero_rusty_man = Hero(
name="Rusty-Man",
secret_name="Tommy Sharp",
age=48,
teams=[team_preventers],
)
hero_spider_boy = Hero(
name="Spider-Boy", secret_name="Pedro Parqueador", teams=[team_preventers]
)
session.add(hero_deadpond)
session.add(hero_rusty_man)
session.add(hero_spider_boy)
session.commit()
session.refresh(hero_deadpond)
session.refresh(hero_rusty_man)
session.refresh(hero_spider_boy)
print("Deadpond:", hero_deadpond)
print("Deadpond teams:", hero_deadpond.teams)
print("Rusty-Man:", hero_rusty_man)
print("Rusty-Man Teams:", hero_rusty_man.teams)
print("Spider-Boy:", hero_spider_boy)
print("Spider-Boy Teams:", hero_spider_boy.teams)
def update_heroes():
with Session(engine) as session:
hero_spider_boy = session.exec(
select(Hero).where(Hero.name == "Spider-Boy")
).one()
team_z_force = session.exec(select(Team).where(Team.name == "Z-Force")).one()
team_z_force.heroes.append(hero_spider_boy)
session.add(team_z_force)
session.commit()
print("Updated Spider-Boy's Teams:", hero_spider_boy.teams)
print("Z-Force heroes:", team_z_force.heroes)
hero_spider_boy.teams.remove(team_z_force)
session.add(team_z_force)
session.commit()
print("Reverted Z-Force's heroes:", team_z_force.heroes)
print("Reverted Spider-Boy's teams:", hero_spider_boy.teams)
def main():
create_db_and_tables()
create_heroes()
update_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,117 @@
from sqlmodel import Field, Relationship, Session, SQLModel, create_engine, select
class HeroTeamLink(SQLModel, table=True):
team_id: int | None = Field(default=None, foreign_key="team.id", primary_key=True)
hero_id: int | None = Field(default=None, foreign_key="hero.id", primary_key=True)
is_training: bool = False
team: "Team" = Relationship(back_populates="hero_links")
hero: "Hero" = Relationship(back_populates="team_links")
class Team(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
hero_links: list[HeroTeamLink] = Relationship(back_populates="team")
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: int | None = Field(default=None, index=True)
team_links: list[HeroTeamLink] = Relationship(back_populates="hero")
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
with Session(engine) as session:
team_preventers = Team(name="Preventers", headquarters="Sharp Tower")
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
hero_deadpond = Hero(
name="Deadpond",
secret_name="Dive Wilson",
)
hero_rusty_man = Hero(
name="Rusty-Man",
secret_name="Tommy Sharp",
age=48,
)
hero_spider_boy = Hero(
name="Spider-Boy",
secret_name="Pedro Parqueador",
)
deadpond_team_z_link = HeroTeamLink(team=team_z_force, hero=hero_deadpond)
deadpond_preventers_link = HeroTeamLink(
team=team_preventers, hero=hero_deadpond, is_training=True
)
spider_boy_preventers_link = HeroTeamLink(
team=team_preventers, hero=hero_spider_boy, is_training=True
)
rusty_man_preventers_link = HeroTeamLink(
team=team_preventers, hero=hero_rusty_man
)
session.add(deadpond_team_z_link)
session.add(deadpond_preventers_link)
session.add(spider_boy_preventers_link)
session.add(rusty_man_preventers_link)
session.commit()
for link in team_z_force.hero_links:
print("Z-Force hero:", link.hero, "is training:", link.is_training)
for link in team_preventers.hero_links:
print("Preventers hero:", link.hero, "is training:", link.is_training)
def update_heroes():
with Session(engine) as session:
hero_spider_boy = session.exec(
select(Hero).where(Hero.name == "Spider-Boy")
).one()
team_z_force = session.exec(select(Team).where(Team.name == "Z-Force")).one()
spider_boy_z_force_link = HeroTeamLink(
team=team_z_force, hero=hero_spider_boy, is_training=True
)
team_z_force.hero_links.append(spider_boy_z_force_link)
session.add(team_z_force)
session.commit()
print("Updated Spider-Boy's Teams:", hero_spider_boy.team_links)
print("Z-Force heroes:", team_z_force.hero_links)
for link in hero_spider_boy.team_links:
if link.team.name == "Preventers":
link.is_training = False
session.add(hero_spider_boy)
session.commit()
for link in hero_spider_boy.team_links:
print("Spider-Boy team:", link.team, "is training:", link.is_training)
def main():
create_db_and_tables()
create_heroes()
update_heroes()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,123 @@
from typing import Optional
from sqlmodel import Field, Relationship, Session, SQLModel, create_engine, select
class HeroTeamLink(SQLModel, table=True):
team_id: Optional[int] = Field(
default=None, foreign_key="team.id", primary_key=True
)
hero_id: Optional[int] = Field(
default=None, foreign_key="hero.id", primary_key=True
)
is_training: bool = False
team: "Team" = Relationship(back_populates="hero_links")
hero: "Hero" = Relationship(back_populates="team_links")
class Team(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
headquarters: str
hero_links: list[HeroTeamLink] = Relationship(back_populates="team")
class Hero(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
secret_name: str
age: Optional[int] = Field(default=None, index=True)
team_links: list[HeroTeamLink] = Relationship(back_populates="hero")
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url, echo=True)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def create_heroes():
with Session(engine) as session:
team_preventers = Team(name="Preventers", headquarters="Sharp Tower")
team_z_force = Team(name="Z-Force", headquarters="Sister Margarets Bar")
hero_deadpond = Hero(
name="Deadpond",
secret_name="Dive Wilson",
)
hero_rusty_man = Hero(
name="Rusty-Man",
secret_name="Tommy Sharp",
age=48,
)
hero_spider_boy = Hero(
name="Spider-Boy",
secret_name="Pedro Parqueador",
)
deadpond_team_z_link = HeroTeamLink(team=team_z_force, hero=hero_deadpond)
deadpond_preventers_link = HeroTeamLink(
team=team_preventers, hero=hero_deadpond, is_training=True
)
spider_boy_preventers_link = HeroTeamLink(
team=team_preventers, hero=hero_spider_boy, is_training=True
)
rusty_man_preventers_link = HeroTeamLink(
team=team_preventers, hero=hero_rusty_man
)
session.add(deadpond_team_z_link)
session.add(deadpond_preventers_link)
session.add(spider_boy_preventers_link)
session.add(rusty_man_preventers_link)
session.commit()
for link in team_z_force.hero_links:
print("Z-Force hero:", link.hero, "is training:", link.is_training)
for link in team_preventers.hero_links:
print("Preventers hero:", link.hero, "is training:", link.is_training)
def update_heroes():
with Session(engine) as session:
hero_spider_boy = session.exec(
select(Hero).where(Hero.name == "Spider-Boy")
).one()
team_z_force = session.exec(select(Team).where(Team.name == "Z-Force")).one()
spider_boy_z_force_link = HeroTeamLink(
team=team_z_force, hero=hero_spider_boy, is_training=True
)
team_z_force.hero_links.append(spider_boy_z_force_link)
session.add(team_z_force)
session.commit()
print("Updated Spider-Boy's Teams:", hero_spider_boy.team_links)
print("Z-Force heroes:", team_z_force.hero_links)
for link in hero_spider_boy.team_links:
if link.team.name == "Preventers":
link.is_training = False
session.add(hero_spider_boy)
session.commit()
for link in hero_spider_boy.team_links:
print("Spider-Boy team:", link.team, "is training:", link.is_training)
def main():
create_db_and_tables()
create_heroes()
update_heroes()
if __name__ == "__main__":
main()

Some files were not shown because too many files have changed in this diff Show More