SQLAlchemy
认识
ORM:Object Relational Mapping(对象关系映射)将数据库中的表与类构建映射
- 简洁易读:将数据表抽象为对象(数据模型),更直观易读
- 可移植:封装了多种数据库引擎,面对多个数据库,操作基本一致,代码易维护
- 更安全:有效避免 SQL 注入
数据库与 python 对象的映射
- 数据库表 (table)映射为 Python 的类 (class),称为 model
- 表的字段 (field) 映射为 Column
- 表的记录 (record)以类的实例 (instance) 来表示
快速入门
FastAPI 的文件结构
1
2
3
4
5
6
7
8
| .
└── sql_app
├── __init__.py
├── crud.py # 增删查改
├── database.py # 创建引擎
├── main.py
├── models.py # 声明映射
└── schemas.py # 建立 pydantic 模型
|
创建引擎
1
2
3
4
5
6
7
| # database.py
from sqlalchemy import create_engine
# using MySQL
engine = create_engine('mysql+pymysql://user:pwd@localhost/testdb', pool_recycle=3600)
# using SQLite
engine = create_engine("sqlite:///testdb.db")
|
或者将数据库链接分离
1
2
3
4
5
6
7
8
| # database.py
SQLALCHEMY_DATABASE_URL1 = "sqlite:///./sql_app.db"
SQLALCHEMY_DATABASE_URL2 = "postgresql://user:password@postgresserver/db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL1, connect_args={"check_same_thread": False}
) # needed only for SQLite
|
建立对象
1
2
3
| # database.py
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
声名映射
声明 Base 实例(在 database.py
中)
1
2
3
4
| # database.py
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
|
新建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| # models.py
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
|
SQLAlchemy 提供了 backref
让我们可以只需要定义一个关系:
1
| items = relationship("Item", backref="owner")
|
添加了这个就可以不用再在 Item
中定义 relationship
了!
建立 Pydantic 模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| # schemas.py
from typing import List, Optional
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: Optional[str] = None
class ItemCreate(ItemBase): # 创建时的所用的对象
pass
class Item(ItemBase): # API调用的对象
id: int
owner_id: int
class Config:
orm_mode = True # 将字典识别为ORM对象
class UserBase(BaseModel):
email: str
class UserCreate(UserBase): # 创建时的所用的对象
password: str
class User(UserBase): # API调用的对象
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
|
CRUD
Create, Read, Update, and Delete
读
详细:https://www.cnblogs.com/jingqi/p/8059673.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| from sqlalchemy.orm import Session
from . import models, schemas
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()
def get_items(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Item).offset(skip).limit(limit).all()
|
建
1
2
3
4
5
6
7
8
9
10
11
| # 把描述的表创建出来
Base.metadata.create_all(engine) # Bas
# 把多个表数据添加到会话
session.add_all(Users)
# 把一个表数据添加到会话
session.add(dog)
# 提交会话
session.commit()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| def create_user(db: Session, user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db.add(db_user) # 增加实例到数据库中
db.commit() # 提交更改
db.refresh(db_user) # 刷新实例
return db_user
def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
db_item = models.Item(**item.dict(), owner_id=user_id) # 使用字典传入
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
|
改
直接改
删
删除整个数据库
创建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| from typing import List
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine) # 正式新建数据库和表(如果原来没有的话)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email) # 检查用户是否被注册
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user) # 新建用户
@app.get("/users/", response_model=List[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
|
概念
概念 | 对应 | 说明 |
---|
engine | 连接 | 驱动引擎 |
session | 连接池、事务、会话 | 由此开始查询 |
model | 表 | 类定义 |
column | 列 | |
query | 若干行 | 可以链式添加多个文件 |
Engine
- 位于数据库驱动之上的一个抽象概念,它适配了各种数据库驱动,提供了连接池等功能
- 用法:
engine = create_engine(<数据库连接串>)
数据库连接串的格式是
1
| dialect+driver://username:password@host:port/database?param
|
dialect 可以是 mysql
, postgresql
, oracle
, mssql
, sqlite
driver:驱动,比如 MySQL 的驱动 pymysql, 如果不填写,就使用默认驱动
再往后就是用户名、密码、地址、端口、数据库、连接参数
例
1
2
3
4
5
6
7
8
9
10
| # MySQL
engine = create_engine('mysql+pymysql://scott:tiger@localhost/foo?charset=utf8mb4')
# PostgreSQL
engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/mydatabase')
# Oracle
engine = create_engine('oracle+cx_oracle://scott:tiger@tnsname')
# MS SQL
engine = create_engine('mssql+pymssql://scott:tiger@hostname:port/dbname')
# SQLite
engine = create_engine('sqlite:////absolute/path/to/foo.db')
|
Session
Column 参数
Object Name | Description |
---|
BigInteger | A type for bigger int integers. |
Boolean | A bool datatype. |
Date | A type for datetime.date() objects. |
DateTime | A type for datetime.datetime() objects. |
Enum | Generic Enum Type. |
Float | Type representing floating point types, such as FLOAT or REAL . |
Integer | A type for int integers. |
Interval | A type for datetime.timedelta() objects. |
LargeBinary | A type for large binary byte data. |
MatchType | Refers to the return type of the MATCH operator. |
Numeric | A type for fixed precision numbers, such as NUMERIC or DECIMAL . |
PickleType | Holds Python objects, which are serialized using pickle. |
SchemaType | Mark a type as possibly requiring schema-level DDL for usage. |
SmallInteger | A type for smaller int integers. |
String | The base for all string and character types. |
Text | A variably sized string type. |
Time | A type for datetime.time() objects. |
Unicode | A variable length Unicode string type. |
UnicodeText | An unbounded-length Unicode string type. |
类型名 | 说明 |
---|
Integer | 普通整数,一般是32位 |
SmallInteger | 取值范围小的整数,一般是16位 |
Float | 浮点数 |
Numeric | 定点数 |
String | 字符串 |
Text | 文本字符串 |
Boolean | 布尔值 |
Date | 日期 |
Time | 时间 |
DateTime | 日期和时间 |
- primary_key:设置字段是否为主键
- unique:设置字段是否唯一
- index:设置字段是否为索引参数
- default:设置字段默认值
- nullable:设置字段是否可空,默认为
True
(可空) - autoincrement:设置字段是否自动递增
- comment:设置字段注释
query
Session
的 query
函数会返回一个 Query
对象
1
| db.query(User).filter(User.id == user_id).first()
|
get
根据指定主键查询
filter
1
| query.filter(User.name == 'ed')
|
1
| query.filter(User.name != 'ed')
|
1
| query.filter(User.name.like('%ed%'))
|
1
2
3
4
5
6
| query.filter(User.name.in_(['ed', 'wendy', 'jack']))
# works with query objects too:
query.filter(User.name.in_(
session.query(User.name).filter(User.name.like('%ed%'))
))
|
1
| query.filter(~User.name.in_(['ed', 'wendy', 'jack']))
|
1
2
3
4
| query.filter(User.name == None)
# alternatively, if pep8/linters are a concern
query.filter(User.name.is_(None))
|
1
2
3
4
| query.filter(User.name != None)
# alternatively, if pep8/linters are a concern
query.filter(User.name.isnot(None))
|
1
2
3
4
5
6
7
8
9
| # use and_()
from sqlalchemy import and_
query.filter(and_(User.name == 'ed', User.fullname == 'Ed Jones'))
# or send multiple expressions to .filter()
query.filter(User.name == 'ed', User.fullname == 'Ed Jones')
# or chain multiple filter()/filter_by() calls
query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')
|
1
2
| from sqlalchemy import or_
query.filter(or_(User.name == 'ed', User.name == 'wendy'))
|
1
| query.filter(User.name.match('wendy'))
|
filter_by
查询指定字段值的结果
1
| query.filter_by(User.name='wang_wu').all() # 查询所有名字为wang_wu的实例
|
返回列表 (List) 和单项 (Scalar)
1
2
3
4
| >>> query = session.query(User).filter(User.name.like('%ed')).order_by(User.id)
SQL>>> query.all()
[<User(name='ed', fullname='Ed Jones', password='f8s7ccs')>,
<User(name='fred', fullname='Fred Flinstone', password='blah')>]
|
first()
返回至多一个结果,而且以单项形式,而不是只有一个元素的 tuple 形式返回这个结果.
1
2
| >>> query.first()
<User(name='ed', fullname='Ed Jones', password='f8s7ccs')>
|
one()
返回且仅返回一个查询结果。当结果的数量不足一个或者多于一个时会报错。
1
2
3
4
| >>> user = query.one()
Traceback (most recent call last):
...
MultipleResultsFound: Multiple rows were found for one()
|
没有查找到结果时:
1
2
3
4
| >>> user = query.filter(User.id == 99).one()
Traceback (most recent call last):
...
NoResultFound: No row was found for one()
|
one_or_none()
:从名称可以看出,当结果数量为 0 时返回 None
, 多于 1 个时报错scalar()
和one()
类似,但是返回单项而不是 tuple
嵌入使用 SQL
你可以在 Query
中通过 text()
使用 SQL 语句。例如:
1
2
3
4
5
6
7
8
9
| >>> from sqlalchemy import text
>>> for user in session.query(User).\
... filter(text("id<224")).\
... order_by(text("id")).all():
... print(user.name)
ed
wendy
mary
fred
|
除了上面这种直接将参数写进字符串的方式外,你还可以通过 params()
方法来传递参数
1
2
3
| >>> session.query(User).filter(text("id<:value and name=:name")).\
... params(value=224, name='fred').order_by(User.id).one()
<User(name='fred', fullname='Fred Flinstone', password='blah')>
|
并且,你可以直接使用完整的 SQL 语句,但是要注意将表名和列明写正确。
1
2
3
4
| >>> session.query(User).from_statement(
... text("SELECT * FROM users where name=:name")).\
... params(name='ed').all()
[<User(name='ed', fullname='Ed Jones', password='f8s7ccs')>]
|
关系
外键 (ForeignKey) 始终定义在多的一方
一对多
Child
为多
1
2
3
4
5
6
7
8
9
| class Parent(Base):
__tablename__ = 'parent'
id = Column(Integer,primary_key = True)
children = relationship("Child",backref='parent')
class Child(Base):
__tablename__ = 'child'
id = Column(Integer,primary_key = True)
parent_id = Column(Integer,ForeignKey('parent.id'))
|
多对一
Parent
为多
1
2
3
4
5
6
7
8
9
| class Parent(Base):
__tablename__ = 'parent'
id = Column(Integer, primary_key=True)
child_id = Column(Integer, ForeignKey('child.id'))
child = relationship("Child", backref="parents")
class Child(Base):
__tablename__ = 'child'
id = Column(Integer, primary_key=True)
|
为了建立双向关系,可以在 relationship()
中设置 backref,Child 对象就有 parents 属性. 设置 cascade= ‘all’
,可以级联删除
1
| children = relationship("Child",cascade='all',backref='parent')
|
一对一
“一对一“就是“多对一”和“一对多”的一个特例,只需在 relationship
加上一个参数 uselist=False
替换多的一端即可
从一对多转换到一对一
1
2
3
4
5
6
7
8
9
| class Parent(Base):
__tablename__ = 'parent'
id = Column(Integer, primary_key=True)
child = relationship("Child", uselist=False, backref="parent")
class Child(Base):
__tablename__ = 'child'
id = Column(Integer, primary_key=True)
parent_id = Column(Integer, ForeignKey('parent.id'))
|
从多对一转换到一对一
1
2
3
4
5
6
7
8
9
| class Parent(Base):
__tablename__ = 'parent'
id = Column(Integer, primary_key=True)
child_id = Column(Integer, ForeignKey('child.id'))
child = relationship("Child", backref=backref("parent", uselist=False))
class Child(Base):
__tablename__ = 'child'
id = Column(Integer, primary_key=True)
|
多对多
多对多关系需要一个中间关联表,通过参数 secondary
来指定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| from sqlalchemy import Table,Text
post_keywords = Table('post_keywords',Base.metadata,
Column('post_id',Integer,ForeignKey('posts.id')),
Column('keyword_id',Integer,ForeignKey('keywords.id'))
)
class BlogPost(Base):
__tablename__ = 'posts'
id = Column(Integer,primary_key=True)
body = Column(Text)
keywords = relationship('Keyword',secondary=post_keywords,backref='posts')
class Keyword(Base):
__tablename__ = 'keywords'
id = Column(Integer,primary_key = True)
keyword = Column(String(50),nullable=False,unique=True)
|
技巧
relationship()
中 lazy
属性详解
lazy
决定了 SQLAlchemy
什么时候从数据库中加载数据
select:就是访问到属性的时候,就会全部加载该属性的数据
joined:对关联的两个表使用联接
subquery:与 joined 类似,但使用子子查询
dynamic:不加载记录,但提供加载记录的查询,也就是生成 query 对象
- 只可以用在一对多和多对多关系中,不可以用在一对一和多对一中
关于更新数据的讨论
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| # 1 (bad)
user.no_of_logins += 1
# result: UPDATE user SET no_of_logins = 31 WHERE user.id = 6
# 2 (bad)
user.no_of_logins = user.no_of_logins + 1
# result: UPDATE user SET no_of_logins = 31 WHERE user.id = 6
# 3 (bad)
setattr(user, 'no_of_logins', user.no_of_logins + 1)
# result: UPDATE user SET no_of_logins = 31 WHERE user.id = 6
# 4 (ok)
user.no_of_logins = User.no_of_logins + 1
# result: UPDATE user SET no_of_logins = no_of_logins + 1 WHERE user.id = 6
# 5 (ok)
setattr(user, 'no_of_logins', User.no_of_logins + 1)
# result: UPDATE user SET no_of_logins = no_of_logins + 1 WHERE user.id = 6
|
If you are going to increment twice via code that produces SQL like SET no_of_logins = no_of_logins + 1
, then you will need to commit or at least flush in between increments, or else you will only get one increment in total:
1
2
3
4
5
6
7
8
9
10
11
12
13
| # 6 (bad)
user.no_of_logins = User.no_of_logins + 1
user.no_of_logins = User.no_of_logins + 1
session.commit()
# result: UPDATE user SET no_of_logins = no_of_logins + 1 WHERE user.id = 6
# 7 (ok)
user.no_of_logins = User.no_of_logins + 1
session.flush()
# result: UPDATE user SET no_of_logins = no_of_logins + 1 WHERE user.id = 6
user.no_of_logins = User.no_of_logins + 1
session.commit()
# result: UPDATE user SET no_of_logins = no_of_logins + 1 WHERE user.id = 6
|
关于插入大量数据的讨论
官方:https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow
大体意思:ORM 模型不适合于大量数据的插入,使用 sqlalchemy Core 或者 sqlalchemy 提供的 bulk insert 函数会是更好的选择
1
2
3
4
5
6
| SQLAlchemy ORM: Total time for 100000 records 2.39429616928 secs
SQLAlchemy ORM pk given: Total time for 100000 records 1.51412987709 secs
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.568987131119 secs
SQLAlchemy ORM bulk_insert_mappings(): Total time for 100000 records 0.320806980133 secs
SQLAlchemy Core: Total time for 100000 records 0.206904888153 secs
sqlite3: Total time for 100000 records 0.165791988373 sec
|
Script:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
| import time
import sqlite3
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None
class Customer(Base):
__tablename__ = "customer"
id = Column(Integer, primary_key=True)
name = Column(String(255))
def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
global engine
engine = create_engine(dbname, echo=False)
DBSession.remove()
DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
def test_sqlalchemy_orm(n=100000):
init_sqlalchemy()
t0 = time.time()
for i in xrange(n):
customer = Customer()
customer.name = 'NAME ' + str(i)
DBSession.add(customer)
if i % 1000 == 0:
DBSession.flush()
DBSession.commit()
print(
"SQLAlchemy ORM: Total time for " + str(n) +
" records " + str(time.time() - t0) + " secs")
def test_sqlalchemy_orm_pk_given(n=100000):
init_sqlalchemy()
t0 = time.time()
for i in xrange(n):
customer = Customer(id=i + 1, name="NAME " + str(i))
DBSession.add(customer)
if i % 1000 == 0:
DBSession.flush()
DBSession.commit()
print(
"SQLAlchemy ORM pk given: Total time for " + str(n) +
" records " + str(time.time() - t0) + " secs")
def test_sqlalchemy_orm_bulk_save_objects(n=100000):
init_sqlalchemy()
t0 = time.time()
for chunk in range(0, n, 10000):
DBSession.bulk_save_objects(
[
Customer(name="NAME " + str(i))
for i in xrange(chunk, min(chunk + 10000, n))
]
)
DBSession.commit()
print(
"SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
" records " + str(time.time() - t0) + " secs")
def test_sqlalchemy_orm_bulk_insert(n=100000):
init_sqlalchemy()
t0 = time.time()
for chunk in range(0, n, 10000):
DBSession.bulk_insert_mappings(
Customer,
[
dict(name="NAME " + str(i))
for i in xrange(chunk, min(chunk + 10000, n))
]
)
DBSession.commit()
print(
"SQLAlchemy ORM bulk_insert_mappings(): Total time for " + str(n) +
" records " + str(time.time() - t0) + " secs")
def test_sqlalchemy_core(n=100000):
init_sqlalchemy()
t0 = time.time()
engine.execute(
Customer.__table__.insert(),
[{"name": 'NAME ' + str(i)} for i in xrange(n)]
)
print(
"SQLAlchemy Core: Total time for " + str(n) +
" records " + str(time.time() - t0) + " secs")
def init_sqlite3(dbname):
conn = sqlite3.connect(dbname)
c = conn.cursor()
c.execute("DROP TABLE IF EXISTS customer")
c.execute(
"CREATE TABLE customer (id INTEGER NOT NULL, "
"name VARCHAR(255), PRIMARY KEY(id))")
conn.commit()
return conn
def test_sqlite3(n=100000, dbname='sqlite3.db'):
conn = init_sqlite3(dbname)
c = conn.cursor()
t0 = time.time()
for i in xrange(n):
row = ('NAME ' + str(i),)
c.execute("INSERT INTO customer (name) VALUES (?)", row)
conn.commit()
print(
"sqlite3: Total time for " + str(n) +
" records " + str(time.time() - t0) + " sec")
if __name__ == '__main__':
test_sqlalchemy_orm(100000)
test_sqlalchemy_orm_pk_given(100000)
test_sqlalchemy_orm_bulk_save_objects(100000)
test_sqlalchemy_orm_bulk_insert(100000)
test_sqlalchemy_core(100000)
test_sqlite3(100000)
|
级联删除
https://www.cnblogs.com/ShanCe/p/15381412.html