SQLAlchemy is the Python SQL toolkit and ORM that sits on the boundary between Python objects and a live relational database. Everything starts with one Engine built from a URL, and from there you either run SQL by hand with text() or map classes to tables and let the ORM write the SQL for you. The recurring mental model in this sheet is one picture: a Python object on the left (a dict, a mapped instance, a select()) flows along a gray arrow through the Engine and is rendered to a SQL string in the middle, which hits the database on the right and flows back as rows or objects, with the commit step in green and the rollback step in red. Where this overlaps with the pandas sheet, the split is the point: SQLAlchemy owns the connection, the dialect, and the SQL, while pandas owns the analysis. The conventional imports are from sqlalchemy import create_engine, text, select and from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, Session, and everything here is SQLAlchemy 2.0 style (legacy 1.x spellings are flagged per section).
Engine and Connection URLs
Everything starts with one Engine built by create_engine from a URL of the form dialect+driver://user:password@host:port/database, and that single object holds a connection pool you share across your whole program. The engine is lazy (it does not connect until you ask), so you open short-lived blocks with engine.connect() to read or engine.begin() to read-and-commit, and the with statement returns the connection to the pool when you are done.
from sqlalchemy import create_engine, URL
engine = create_engine("sqlite+pysqlite:///app.db") # lazy, pooled
create_engine("postgresql+psycopg://user:pw@localhost:5432/mydb") # PostgreSQL
create_engine("sqlite://") # in-memory, for tests
create_engine(url, echo=True) # log the SQL it emits
URL.create("postgresql+psycopg", username="u", password="pw", # build a URL safely
host="h", database="d")
with engine.connect() as conn: # borrow + return a connection
...See the engine tutorial. The engine is lazy and pooled; one per application is enough.
Raw SQL with text()
When you want to run SQL by hand, wrap the string in text() and pass values as a bound-parameter dict (:name placeholders, never f-strings), which lets the driver escape them and shuts the door on SQL injection. The call returns a Result you can drain with .scalar_one() for a single value, .mappings().all() for dict-like rows, or by iterating; wrap writes in an engine.begin() block so they commit on success and roll back on error.
from sqlalchemy import text
conn.execute(text("SELECT * FROM users")) # execute a query
conn.execute(text("SELECT * FROM users WHERE id = :id"), {"id": 7}) # bound params
conn.execute(text("SELECT count(*) FROM users")).scalar_one() # one scalar value
conn.execute(stmt).mappings().all() # rows as {col: value} dicts
conn.execute(text("INSERT INTO t VALUES (:x)"), [{"x": 1}, {"x": 2}]) # executemany
with engine.begin() as conn: # auto COMMIT, ROLLBACK on error
...See working with transactions. Always bind values; never f-string them into the SQL.
ORM Models (declarative, 2.0)
In SQLAlchemy 2.0 you map a Python class to a table by subclassing DeclarativeBase, setting __tablename__, and declaring each column as a typed Mapped[...] annotation paired with mapped_column(); the Python type (int, str, str | None) drives the SQL type and nullability. Calling Base.metadata.create_all(engine) issues the matching CREATE TABLE statements, so your class definitions are the single source of truth for the schema.
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase): # the mapped base class
pass
class User(Base):
__tablename__ = "user_account" # class maps to a table
id: Mapped[int] = mapped_column(primary_key=True) # typed primary key
name: Mapped[str] = mapped_column(String(30)) # typed column with length
fullname: Mapped[str | None] # nullable, may be NULL
Base.metadata.create_all(engine) # emit CREATE TABLE ...See declarative mapping styles. Mapped[...] + mapped_column() replaces 1.x Column(...).
Session and Unit of Work
A Session is a workspace that tracks the objects you load and create; you add new instances, mutate loaded ones by plain attribute assignment, and then commit() flushes every pending change as one transaction. This is the unit-of-work pattern: nothing hits the database permanently until commit, and rollback() discards the whole batch, so a with Session(engine) as session: block (or a sessionmaker factory) keeps the lifecycle tidy.
from sqlalchemy.orm import Session, sessionmaker
with Session(engine) as session: # open a session block
...
Session = sessionmaker(engine) # a reusable session factory
session.add(user) # stage one new object (pending)
session.add_all([u1, u2]) # stage several at once
session.commit() # flush as one INSERT; COMMIT
session.rollback() # discard the whole batchSee using the Session. Nothing is permanent until commit(); rollback() undoes the unit of work.
Querying with select()
You build a query as a select(...) object and refine it with .where(), .order_by(), .limit(), and friends, then execute it through the session: session.scalars(stmt) returns mapped objects, while session.execute(stmt) returns rows of whatever columns you asked for. Reach for .all(), .one() (which raises if there is not exactly one match), or session.get(Model, pk) for a fast primary-key lookup that checks the identity-map cache first.
from sqlalchemy import select, func
stmt = select(User).where(User.name == "ada") # build a SELECT object
session.scalars(stmt).all() # list of User objects
session.scalars(stmt).one() # exactly one (or raises)
select(User).order_by(User.name).limit(10) # order and limit
session.get(User, 7) # fast primary-key lookup
session.scalar(select(func.count()).select_from(User)) # aggregate / countSee selecting data. Use scalars() for mapped objects, execute() for column rows.
Insert, Update, Delete
The ORM way is to let the session do the SQL: session.add(obj) inserts, assigning to a loaded object’s attribute marks it dirty for an UPDATE, and session.delete(obj) removes it, all flushed on the next commit(). For set-based changes across many rows without loading them, build Core insert(), update(), or delete() statements and run them with session.execute(stmt), which compiles to a single bulk SQL statement.
from sqlalchemy import update, delete
session.add(User(name="ada")); session.commit() # ORM insert
user.fullname = "Ada L."; session.commit() # ORM update (just assign)
session.delete(user); session.commit() # ORM delete
update(User).where(User.id == 7).values(name="x") # Core bulk update
delete(User).where(User.name == "spam") # Core bulk delete
session.execute(stmt); session.commit() # run the Core statementSee ORM data manipulation. ORM tracks per-object changes; Core DML hits many rows at once.
Relationships and Joins
A ForeignKey column ties rows together at the database level, and a relationship() exposes that link as a Python attribute, so user.addresses is a list you can read and append to while SQLAlchemy keeps the foreign keys in sync via back_populates. To pull related data in one query, join() across the relationship, and use loader options like selectinload() to fetch children in a second batched query, which avoids the classic N+1 storm of one query per parent.
from sqlalchemy import ForeignKey, select
from sqlalchemy.orm import Mapped, mapped_column, relationship, selectinload
user_id: Mapped[int] = mapped_column(ForeignKey("user_account.id")) # foreign key
addresses: Mapped[list["Address"]] = relationship(back_populates="user") # one-to-many
user: Mapped["User"] = relationship(back_populates="addresses") # many-to-one
user.addresses.append(Address(email="a@x.com")) # traverse, FK set for you
select(User.name, Address.email).join(User.addresses) # JOIN across the relationship
select(User).options(selectinload(User.addresses)) # eager-load, dodge N+1See working with related objects. back_populates keeps both sides of the link in sync.
Read straight into pandas
SQLAlchemy and pandas split the labor cleanly: you hand pd.read_sql (or read_sql_query / read_sql_table) a SQLAlchemy Connection or Engine plus a text() string or a select() object, SQLAlchemy handles the dialect, parameter binding, and pooling, and pandas returns a DataFrame. Pass values through params= (still bound, still injection-safe), use chunksize= to stream large results without exhausting memory, and df.to_sql(...) to write a DataFrame back to a table.
import pandas as pd
from sqlalchemy import select, text
pd.read_sql(text("SELECT * FROM user_account"), conn) # read a SQL string
pd.read_sql_query(select(User.id, User.name), conn) # read a select() statement
pd.read_sql_table("user_account", engine) # read a whole table
pd.read_sql(text("... WHERE id = :id"), conn, params={"id": 7}) # bound params
pd.read_sql(stmt, conn, chunksize=10_000) # stream big tables in chunks
df.to_sql("scratch", engine, if_exists="replace", index=False) # write a DataFrame backSee pandas read_sql. SQLAlchemy owns the connection and SQL; pandas owns the table.
Quick Reference
| Command | What it does | Area |
|---|---|---|
create_engine(url) |
Build a pooled gateway from a URL | Engine |
engine.connect() |
Open a read connection block | Engine |
engine.begin() |
Open a connection that auto-commits | Engine |
text("SELECT ... :p") |
Wrap a raw SQL string with bound params | Raw SQL |
.scalar_one() / .mappings().all() |
One value / list of dict rows | Raw SQL |
class X(DeclarativeBase) |
The mapped base class | ORM model |
Mapped[int] + mapped_column(...) |
Declare a typed column (2.0 style) | ORM model |
Base.metadata.create_all(engine) |
Emit CREATE TABLE for all models | ORM model |
Session(engine) / sessionmaker(engine) |
Open a session / factory | Session |
session.add(obj) / add_all([...]) |
Stage inserts | Session |
session.commit() / rollback() |
Flush the unit of work / undo | Session |
select(Model).where(...) |
Build a SELECT object | Query |
session.scalars(stmt).all() |
Run it, get mapped objects | Query |
session.get(Model, pk) |
Fast primary-key lookup | Query |
update(M) / delete(M) + session.execute |
Bulk set-based DML | DML |
relationship(back_populates=...) |
Link two mapped classes | Relationships |
select(...).join(M.rel) |
JOIN across a relationship | Relationships |
selectinload(M.rel) |
Eager-load children, dodge N+1 | Relationships |
pd.read_sql(stmt, conn) |
Query straight into a DataFrame | pandas |
df.to_sql(name, engine) |
Write a DataFrame to a table | pandas |
| Database | Example URL | Driver to install |
|---|---|---|
| SQLite (file) | sqlite+pysqlite:///app.db |
built in |
| SQLite (memory) | sqlite:// |
built in |
| PostgreSQL | postgresql+psycopg://u:pw@host:5432/db |
psycopg |
| MySQL / MariaDB | mysql+pymysql://u:pw@host:3306/db |
pymysql |
| Microsoft SQL Server | mssql+pyodbc://u:pw@dsn |
pyodbc |
| Oracle | oracle+oracledb://u:pw@host:1521/?service_name=svc |
oracledb |
| Call | Returns |
|---|---|
.all() |
list of Row tuples |
.first() |
first Row or None |
.one() |
exactly one Row (else raises) |
.scalar_one() |
the single value of the single row |
.scalars().all() |
first column of each row (or mapped objects) |
.mappings().all() |
list of {column: value} dicts |
| Do this (2.0) | Not this (1.x legacy) |
|---|---|
select(User) + session.scalars(...) |
session.query(User) |
with engine.connect() as conn: conn.execute(...) |
engine.execute(...) |
Mapped[int] = mapped_column(...) |
Column(Integer, ...) |
engine.begin() (explicit commit) |
implicit autocommit |
session.get(User, pk) |
session.query(User).get(pk) |
Appendix: Sample Code
Engine, connect, and raw SQL
from sqlalchemy import create_engine, text
engine = create_engine("sqlite+pysqlite:///:memory:") # lazy, pooled
# Read in a connection block
with engine.connect() as conn:
total = conn.execute(text("SELECT 1 + 1 AS total")).scalar_one()
print(total) # 2
# Write inside a begin() block: commits on clean exit, rolls back on error
with engine.begin() as conn:
conn.execute(text("CREATE TABLE t (id INTEGER, name TEXT)"))
conn.execute(
text("INSERT INTO t VALUES (:id, :name)"),
[{"id": 1, "name": "a"}, {"id": 2, "name": "b"}], # executemany
)
with engine.connect() as conn:
rows = conn.execute(text("SELECT * FROM t")).mappings().all()
print(rows) # [{'id': 1, 'name': 'a'}, {'id': 2, 'name': 'b'}]Declarative models (2.0 style)
from sqlalchemy import create_engine, ForeignKey, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
engine = create_engine("sqlite://")
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user_account"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
fullname: Mapped[str | None] # nullable
addresses: Mapped[list["Address"]] = relationship(back_populates="user")
def __repr__(self) -> str:
return f"User(id={self.id!r}, name={self.name!r})"
class Address(Base):
__tablename__ = "address"
id: Mapped[int] = mapped_column(primary_key=True)
email_address: Mapped[str]
user_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
user: Mapped["User"] = relationship(back_populates="addresses")
Base.metadata.create_all(engine) # CREATE TABLE ...The Session: insert, query, update, delete
from sqlalchemy import select, func
from sqlalchemy.orm import Session
with Session(engine) as session:
# insert
session.add_all([
User(name="spongebob", fullname="Spongebob Squarepants"),
User(name="sandy", fullname="Sandy Cheeks"),
])
session.commit() # INSERT ...; COMMIT
# query: build a select(), run via scalars() to get objects
sb = session.scalars(select(User).where(User.name == "spongebob")).one()
print(sb) # User(id=1, name='spongebob')
# update: just assign, then commit
sb.fullname = "SpongeBob SquarePants"
session.commit() # UPDATE ... SET fullname=...
# primary-key lookup (identity-map cache)
print(session.get(User, 2)) # User(id=2, name='sandy')
# aggregate
n = session.scalar(select(func.count()).select_from(User))
print(n) # 2
# delete
session.delete(sb)
session.commit() # DELETE FROM ...Relationships and a JOIN (with N+1 avoided)
from sqlalchemy import select
from sqlalchemy.orm import Session, selectinload
with Session(engine) as session:
user = session.scalars(select(User).where(User.name == "sandy")).one()
user.addresses.append(Address(email_address="sandy@example.com"))
session.commit() # FK set automatically
# JOIN across the relationship
pairs = session.execute(
select(User.name, Address.email_address).join(User.addresses)
).all()
print(pairs) # [('sandy', 'sandy@example.com')]
# Eager-load children in one extra batched query (no N+1)
users = session.scalars(
select(User).options(selectinload(User.addresses))
).all()Straight into pandas (and back)
This is the boundary pattern: SQLAlchemy owns the connection and the SQL, pandas owns the table.
import pandas as pd
from sqlalchemy import select, text
# read a raw SQL string with bound params
with engine.connect() as conn:
df = pd.read_sql(
text("SELECT id, name FROM user_account WHERE id = :id"),
conn,
params={"id": 2},
)
# read a select() statement (column names come from the model)
with engine.connect() as conn:
df2 = pd.read_sql_query(select(User.id, User.name), conn)
# stream a big table in chunks without exhausting memory
with engine.connect() as conn:
for chunk in pd.read_sql(text("SELECT * FROM user_account"), conn,
chunksize=10_000):
... # process each DataFrame chunk
# write a DataFrame back to a new table
small = pd.DataFrame({"id": [1, 2], "name": ["a", "b"]})
small.to_sql("scratch", engine, if_exists="replace", index=False)Behavior notes
- The engine is lazy and pooled.
create_engine(url)does not connect; it builds a connection pool you reuse. OneEngineper application is the norm, andengine.connect()borrows a connection that thewithblock returns to the pool on exit. connect()reads,begin()commits. Useengine.connect()for reads andengine.begin()for writes; thebegin()block commits on clean exit and rolls back on any exception, so you do not callcommit()by hand.- Always bind parameters, never f-string them. Pass values as
:nameplaceholders plus a dict so the driver escapes them; building SQL with f-strings opens the door to SQL injection. - 2.0 style end to end. Use
select(...)+session.scalars(...)instead ofsession.query(...),Mapped[int]+mapped_column()instead ofColumn(Integer), and an explicitbegin()instead of autocommit. The legacy spellings still import but are discouraged. Mapped[str | None]needs Python 3.10 or newer. The PEP 604 union is the modern way to declare a nullable column; on older Pythons useMapped[Optional[str]]instead.selectinloaddodges N+1. Loading parents and then touchingparent.childrenin a loop fires one query per parent;select(...).options(selectinload(...))fetches the children in one extra batched query instead.
References
SQLAlchemy documentation (2.0)
- Documentation home and the unified tutorial
- ORM Quick Start, Engine configuration, Connections and Result
- Using the ORM Session, declarative mapping styles, relationship configuration
- Relationship loading techniques and the v1 to v2 migration guide
Project and related