高效处理

参考:

高效 count

参考:

注意:

  1. 尽量避免使用: session.query(table).count()

    • 速度太慢
  2. 推荐使用 func.count()

大批量 INSERT, UPDATE 和 DELETE

传统方法 Legacy

参考:

工具:

  • Insert 插入新行:

    • session.bulk_insert_mappings(Table, list[dict])

      • 插入 dict 类型
      • 性能更好
    • session.bulk_save_objects(list[Table])

      • 插入实例化的表对象
      • 性能略差
  • Update:

    • session.bulk_update_mappings(Table, list[dict])

      • 注意 dict 中要包括主键 primary key
  • Delete

    • 没有 delete 大批量删除传统方法,可以使用 ORM 方式

性能排序,由慢到快:

  1. ORM 逐条插入
  2. session.bulk_save_objects(list[Table])
  3. session.bulk_insert_mappings(Table, list[dict])
  4. 使用 sqlalchemy 底层 api
  5. 直接使用 dbapi, 如 psycopg2 直接 uiys connection.corsor.execute("SQL expression")

基本操作

session.commit() vs session.flush()

  1. flush():

    • 把当前操作放到数据库的内存中,并没有真的保存到数据库中
    • flush() 只能使,当前 sesson 的后续操作可见,而对其它 session 不可见
  2. commit()

    • 永久保存到数据库中

创建链接 engine and session

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# 导入:
from sqlalchemy import Column, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

# 创建对象的基类:
Base = declarative_base()

# 定义User对象:
class User(Base):
    # 表的名字:
    __tablename__ = 'user'

    # 表的结构:
    id = Column(String(20), primary_key=True)
    name_slice = Column(String(20))

    # 初始化数据库连接:
    engine = create_engine('mysql+mysqlconnector://root:password@localhost:3306/test')
    # 创建DBSession类型:
    DBSession = sessionmaker(bind=engine)

使用方法

使用底层 dbapi

参考:

概要:

  • 即使用 engine.connect().execute("SQL Expression") 处理
  • 直接操纵 SQL 表达式

使用 sqlalchemy Core statement 方法, insert() 、select()

参考:

概要:

  • 使用 engine.connect().execute(select().where()) 类似语句
  • 通过 cursor.execute() 执行使用 select() 、insert()等构建的 statement

特点:

  • statement 可以被打印出来

    1
    2
    
    >>> print(insert(user_table))
    INSERT INTO user_account (id, name, fullname) VALUES (:id, :name, :fullname)
  • 可以被用于 ORM 语句中
  • 完整通过 (connect 调用):

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    >>> with engine.connect() as conn:
    ...     result = conn.execute(
        ...         insert(user_table),
        ...         [
            ...             {"name": "sandy", "fullname": "Sandy Cheeks"},
            ...             {"name": "patrick", "fullname": "Patrick Star"},
            ...         ],
        ...     )
    ...     conn.commit()
    BEGIN (implicit)
    INSERT INTO user_account (name, fullname) VALUES (?, ?)
    [...] [('sandy', 'Sandy Cheeks'), ('patrick', 'Patrick Star')]
    COMMIT

ORM

使用 ORM

参考:

流程:

  1. 创建表结构
  2. 使用 session 的方法操作数据库

    • 查询和数据操纵处理

      • session.add()
      • session.get()

        • 通过主键 primary key 查询
      • session.query()
      • session.delete()
      • session.execute(select()..)

        • 执行 select(), insert() 等 sqlalchemy Core statement
    • 业务 transaction 相关

      • session.fush()
      • session.commit()
      • session.rollback()
      • session.close()

sqlalchemy + sqlite

路径设置

参考:

  • Engine Configuration — SQLAlchemy 2.0 Documentation
  • url 路径

    1
    2
    3
    
    # sqlite://<nohostname>/<path>
    # where <path> is relative:
    engine = create_engine("sqlite:///foo.db")
  • 磁盘路径

    1
    2
    3
    4
    5
    6
    7
    8
    
    # Unix/Mac - 4 initial slashes in total
    engine = create_engine("sqlite:////absolute/path/to/foo.db")
    
    # Windows
    engine = create_engine("sqlite:///C:\\path\\to\\foo.db")
    
    # Windows alternative using raw string
    engine = create_engine(r"sqlite:///C:\path\to\foo.db")
    • linux 路径:

      • 绝对路径开头是 4 个斜线 "/"
      • 相对路径是 3 个斜线 "/"

sqlalchemy + postgresql + jsonb

参考:

1
2
3
4
5
6
7
8
from sqlalchemy_json import mutable_json_type

class Document(Base):

    __tablename__ = 'document'

    id = Column(Integer, primary_key=True)
    config = Column(mutable_json_type(dbtype=JSONB, nested=True))

方法二例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import copy

doc.config = copy.deepcopy(doc.config)

doc.config['key2']['key2a'] = 'c'

session.add(doc)
session.commit()

assert doc.config['key2']['key2a'] == 'c'

sqlalchemy + asyncio 教程

参考:

database_url + asyncio

  • sqlite:

    sqlite+aiosqlite://
    
  • postgresql:

    postgresql+asyncpg://scott:tiger@localhost/test
    

注意事项

不同的 asyncio event loop 不可以使用相同的 session maker

参考:

解决方法:

例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from asyncio import current_task

from sqlalchemy.ext.asyncio import (
    async_scoped_session,
    async_sessionmaker,
)

async_session_factory = async_sessionmaker(
    some_async_engine,
    expire_on_commit=False,
)
AsyncScopedSession = async_scoped_session(
    async_session_factory,
    scopefunc=current_task,
)
some_async_session = AsyncScopedSession()

select, update, delete 等用法

  1. seqlalchemy 中的 select 不能选择单个 column, 只能选择一整个 ORM 表格对象

    1
    
    session.execute(select(User).filter_by(name="sandy")).scalar_one()

一次查询查询加载所有 relation 关联对象

1
2
3
4
async with async_session() as session:
    stmt = select(A).order_by(A.id).options(selectinload(A.bs))

    result = await session.execute(stmt)
  • 注意里面的用法

    .options(selectinload(A.bs))
    

插入时间, 更新时间如何让 sqlalchemy 自动生成

参考:

1
2
3
4
from sqlalchemy.sql import func

time_created = Column(DateTime(timezone=True), server_default=func.now())
time_updated = Column(DateTime(timezone=True), onupdate=func.now())