SQLalchemy补充

发布时间 2023-08-22 20:24:45作者: 星空看海

七 更多查询方式

#1  查询: filer:写条件     filter_by:等于的值
#  查询所有  是list对象
res = session.query(User).all()  # 是个普通列表
print(type(res))
print(len(res))

# 2 只查询某几个字段
# select name as xx,email from user;
res = session.query(User.name.label('xx'), User.email)
print(res)  # 打出原生sql
print(res.all())
for item in res.all():
    print(item[0])


# 3 filter传的是表达式,filter_by传的是参数
res = session.query(User).filter(User.name == "lqz").all()
res = session.query(User).filter(User.name != "lqz").all()
res = session.query(User).filter(User.name != "lqz", User.email == '3@qq.com').all() 

#4 django 中使用 Q查询显示 或 非条件 如果是逗号 就是and条件
# flask中,逗号也表示and条件
res = session.query(User).filter_by(name='lqz099').all()
res = session.query(User).filter_by(name='lqz099',email='47@qq.com').all()


# 5 取一个 all了后是list,list 没有first方法
res = session.query(User).first()



# 6查询所有,使用占位符(了解)  :value     :name
# select * from user where id <20 or name=lqz
res = session.query(User).filter(text("id<:value or name=:name")).params(value=20, name='lqz').all()


# 7 自定义查询(了解)
# from_statement 写纯原生sql

res=session.query(User).from_statement(text("SELECT * FROM users where email=:email")).params(email='3@qq.com').all()
print(type(res[0]))  # 是book的对象,但是查的是User表   不要这样写
print(res[0].name)  #

# 8 高级查询
# 条件
# 表达式,and条件连接
res = session.query(User).filter(User.id > 1, User.name == 'lqz099').all() # and条件

# between
res = session.query(User).filter(User.id.between(1, 9), User.name == 'lqz099').all()
res = session.query(User).filter(User.id.between(1, 9)).all()

# in
res = session.query(User).filter(User.id.in_([1,3,4])).all()
res = session.query(User).filter(User.email.in_(['3@qq.com','r@qq.com'])).all()

# ~非,除..外
res = session.query(User).filter(~User.id.in_([1,3,4])).all()
print(res)

# 二次筛选
res = session.query(User).filter(~User.id.in_(session.query(User.id).filter_by(name='lqz'))).all()
print(res)


# and or条件
from sqlalchemy import and_, or_

# or_包裹的都是or条件,and_包裹的都是and条件
res = session.query(User).filter(and_(User.id >= 3, User.name == 'lqz099')).all()  #  and条件
res = session.query(User).filter(User.id < 3, User.name == 'lqz099').all()  #  等同于上面
res = session.query(User).filter(or_(User.id < 2, User.name == 'eric')).all()
res = session.query(User).filter(or_(User.id < 2,and_(User.name == 'lqz099', User.id > 3),User.extra != ""))


# 通配符,模糊查询,以e开头,不以e开头
res = session.query(User).filter(User.email.like('%@%')).all()
# select user.id from user where  user.name not like e%;
res = session.query(User.id).filter(~User.name.like('e%'))
res = session.query(User).filter(~User.name.like('e%')).all()

# 分页
# 一页2条,查第5页
res = session.query(User)[2*5:2*5+2]

# 排序,根据name降序排列(从大到小)
res = session.query(User).order_by(User.email.desc()).all()
res = session.query(Book).order_by(Book.price.desc()).all()
res = session.query(Book).order_by(Book.price.asc()).all()
# 第一个条件重复后,再按第二个条件升序排
res = session.query(User).order_by(User.name.desc(), User.id.asc())



# 分组查询  5个聚合函数
from sqlalchemy.sql import func
# 分组后,只能拿分组字段和聚合函数字典,如果拿别的,是严格模式,会报错
res = session.query(User).group_by(User.extra)  # 如果是严格模式,就报错
# 分组之后取最大id,id之和,最小id  和分组的字段
# 5个聚合函数(最大、最小、平均、求和、计数)
from sqlalchemy.sql import func
res = session.query(
    User.name,
    func.max(User.id),
    func.sum(User.id),
    func.min(User.id),
    func.avg(User.id)).group_by(User.name).all()
for item in res:
    print(item)

# 分组后筛选:having
# select name,max(id),sum(id),min(id) from  user group by  user.name   having id_max>2;


from sqlalchemy.sql import func
res = session.query(
    User.name,
    func.max(User.id),
    func.sum(User.id),
    func.min(User.id)).group_by(User.name).having(func.max(User.id) > 2).all()

print(res)



八 连表查询

### 关联关系,基于连表的跨表查询
from models1 import Person,Hobby
# 连表操作
select * from person,hobby where person.hobby_id=hobby.id;
res = session.query(Person, Hobby).filter(Person.hobby_id == Hobby.id).all()

# 自己连表查询
# join表,默认是inner join,自动按外键关联
# select * from Person inner join Hobby on Person.hobby_id=Hobby.id;
# res = session.query(Person).join(Hobby).all()

#isouter=True 外连,表示Person left join Favor,没有右连接,反过来即可
# select * from Person left join Hobby on Person.hobby_id=Hobby.id;
# res = session.query(Person).join(Hobby, isouter=True).all()
# 没有right join,通过这个实现
# res = session.query(Hobby).join(Person, isouter=True).all()

# # 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上
# select * from Person left join Hobby on Person.id=Hobby.id;
# res = session.query(Person).join(Hobby, Person.hobby_id == Hobby.id, isouter=True) #  sql本身有问题,只是给你讲, 自己指定链接字段
# 右链接
# print(res)



# 多对多关系连表
# 多对多关系,基于链表的跨表查
# 多表链接
#方式一:直接连
#select * FROM boy, girl, boy2girl WHERE boy.id = boy2girl.boy_id AND girl.id = boy2girl.girl_id
# res = session.query(Boy, Girl,Boy2Girl).filter(Boy.id == Boy2Girl.boy_id,Girl.id == Boy2Girl.girl_id)

# 方式二:join连
# SELECT* FROM boy INNER JOIN boy2girl ON boy.id = boy2girl.boy_id INNER JOIN girl ON girl.id = boy2girl.girl_id WHERE boy.id >= %(id_1)s
res = session.query(Boy).join(Boy2Girl).join(Girl).filter(Boy.id>=2)
print(res)

九 原生sql(django-orm如何执行原生sql)

9.1 sqlalchemy执行原生sql

# 有的复杂sql 用orm写不出来--->用原生sql查询

# 原生sql查询,查出的结果是对象
# 原生sql查询,查询结果列表套元组

from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/db001", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
#### 执行原生sql方式一:
# 查询:
# cursor = session.execute('select * from users')
# result = cursor.fetchall()
# print(result) #列表套元组

# 添加
# cursor = session.execute('insert into users(name,email) values(:name,:email)',
#                          params={"name": 'lqz', 'email': '3333@qq.com'})
# session.commit()
# print(cursor.lastrowid)


###执行原生sql方式二(以后都用session操作--->socpe_session线程安全)一般不用
# conn = engine.raw_connection()
# cursor = conn.cursor()
# cursor.execute(
#     "select * from app01_book"
# )
# result = cursor.fetchall()


# 执行原生sql方式三:结果是对象
# res = session.query(User).from_statement(text("SELECT * FROM boy where name=:name")).params(name='lqz').all()


session.close()

9.2 django执行原生sql

# 执行完的结果映射到对象中--->上面讲的  方式三:
from model import Book
books_obj_list = Book.objects.raw('select distinct id, book_name from test_book')
for book_obj in books_obj_list:
	print(book_obj.id, book_obj.book_name)


# 纯原生sql 
from django.db import connection
cur=connection.cursor() 
cur.execute('select distinct id, book_name from test_book')
print(cur.fetch_all())
cur.close()


with connection.cursor() as cur:
    cur.execute('select distinct id, book_name from test_book')

十 flask-sqlalchemy使用

10.1 sqlalchemy自己操作

目录结构

-src
	-__init__.py
    -models.py
    -session_sql.py
    -settings.py
    -views.py
-manage.py

src/init.py

from flask import Flask

app = Flask(__name__)
app.config.from_pyfile('settings.py')
from .views import user

app.register_blueprint(user)

src/models.py

from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.ext.declarative import declarative_base
import datetime
from sqlalchemy.orm import relationship

Base = declarative_base()


# print(type(Base))
class User(Base):
    # 以__开头的是配置
    __tablename__ = 'users'  # 数据库表名称,如果不写,以类名作为表名

    id = Column(Integer, primary_key=True)  # 主键索引,聚簇索引
    name = Column(String(64), index=True, nullable=False)  # name字段加辅助索引
    email = Column(String(32), unique=True)
    # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    ctime = Column(DateTime, default=datetime.datetime.now)
    extra = Column(Text, nullable=True)

    __table_args__ = (
        UniqueConstraint('id', 'name', name='uix_id_name'),  # 联合唯一
        Index('ix_id_name', 'name', 'email'),  # 索引
    )

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name


class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='篮球')

    def __str__(self):
        return self.caption

    def __repr__(self):
        return self.caption


class Person(Base):
    __tablename__ = 'person'
    id = Column(Integer, primary_key=True)  # 不会自动生成id
    name = Column(String(32), index=True, nullable=True)
    # hobby指的是tablename而不是类名
    # 一对多关系一旦确立,关联关系写在多的一方---》物理外键
    hobby_id = Column(Integer, ForeignKey("hobby.id"))

    # 跟数据库无关,不会新增字段,只用于快速链表操作
    # 类名,backref用于反向查询
    hobby = relationship('Hobby', backref='pers')  # 以后 person.hobby 就是hobby对象

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name


# 多对多关系
# 中间表  手动创建
class Boy2Girl(Base):
    __tablename__ = 'boy2girl'
    id = Column(Integer, primary_key=True, autoincrement=True)
    girl_id = Column(Integer, ForeignKey('girl.id'))
    boy_id = Column(Integer, ForeignKey('boy.id'))


class Girl(Base):
    __tablename__ = 'girl'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name


class Boy(Base):
    __tablename__ = 'boy'
    id = Column(Integer, primary_key=True, autoincrement=True) #autoincrement 默认就是true
    name = Column(String(64), unique=True, nullable=False)

    # 就是咱们之前的ManyToMany,不会在表中生成字段---》因为它是个表----》这个字段可以放在Girl表
    girls = relationship('Girl', secondary='boy2girl', backref='boys')

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name

src/session_sql.py

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session

from . import app
# engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/db001")
engine = create_engine(app.config['DB_URL'])
Session = sessionmaker(bind=engine)
session = scoped_session(Session)

src/settings.py

DEBUG=True
DB_URL='mysql+pymysql://root:123@127.0.0.1:3306/db001'

src/views.py

from flask import Blueprint
from .models import User

user = Blueprint('user', __name__)

from .session_sql import session


@user.route('/')
def index():
    # 查询数据
    res = session.query(User).filter_by(name='lqz').all()

    print(res)
    return 'asdfasdf'

manage.py

from src import app

if __name__ == '__main__':
    app.run()

10.2 使用flask-sqlalchemy

# sqlalchemy 集成到flask中


# 第三方: flask-sqlalchemy 封装了用起来,更简洁
#  使用flask-sqlalchemy集成
	1 导入 from flask_sqlalchemy import SQLAlchemy
    2 实例化得到对象
    	db = SQLAlchemy()
    3  将db注册到app中
    	db.init_app(app)
    4 视图函数中使用session
    	全局的db.session  # 线程安全的
    5 models.py 中继承Model
    	db.Model
    6 写字段 
    	username = db.Column(db.String(80), unique=True, nullable=False)
    7 配置文件中加入
    SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root@127.0.0.1:3306/ddd?charset=utf8"
    SQLALCHEMY_POOL_SIZE = 5
    SQLALCHEMY_POOL_TIMEOUT = 30
    SQLALCHEMY_POOL_RECYCLE = -1
    # 追踪对象的修改并且发送信号
    SQLALCHEMY_TRACK_MODIFICATIONS = False

init.py文件

from flask import Flask

# 第一步:导入
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 第二步:实例化得到对象
db = SQLAlchemy()
app.config.from_pyfile('settings.py')
# 第三步:注册到app中
db.init_app(app)
from .views import user

app.register_blueprint(user)

models.py

# 第四步:在models中,找所有东西 都从db中拿
from src import db

class User(db.Model):
    # 以__开头的是配置
    __tablename__ = 'users'  # 数据库表名称,如果不写,以类名作为表名
    id = db.Column(db.Integer, primary_key=True)  # 主键索引,聚簇索引
    name = db.Column(db.String(64), index=True, nullable=False)  # name字段加辅助索引
    email = db.Column(db.String(32), unique=True)
    __table_args__ = (
        db.UniqueConstraint('id', 'name', name='uix_id_name'),  # 联合唯一
        db.Index('ix_id_name', 'name', 'email'),  # 索引
    )

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name

views.py

from flask import Blueprint
from .models import User

user = Blueprint('user', __name__)

from src import db


@user.route('/')
def index():
    res = db.session.query(User).filter_by(name='lqz').all()

    print(res)
    return 'asdfasdf'

settings.py

DEBUG=True
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:123@127.0.0.1:3306/db001?charset=utf8"
SQLALCHEMY_POOL_SIZE = 5
SQLALCHEMY_POOL_TIMEOUT = 30
SQLALCHEMY_POOL_RECYCLE = -1
# 追踪对象的修改并且发送信号
SQLALCHEMY_TRACK_MODIFICATIONS = False