使用 MySQL
数据是动态网站的基础,如果把数据放进数据库,就可以通过数据库管理系统对数据进行管理。MySQL 是一个开源的关系型数据库管理系统。它性能高、免费、配置简单、可靠性好,已经成为最流行的开源数据库。
在 Flask 应用中可以自由使用 MySQL、PostgreSQL、SQLite、Redis、MongoDB 来写原生的语句实现功能,也可以使用更高级别的数据库抽象方式,如 SQLAlchemy 或 MongoEngine 这样的 OR(D)M。本节将演示用 MySQL 的 Python 驱动(MySQLdb)写原生语句,通过 SQLAlchemy 演示 ORM 的使用,让大家熟悉 Web 开发中的数据库开发工作。
安装 MySQL 和驱动
首先安装 MySQL:
> sudo apt-get install mysql-server libmysqlclient-dev-yq > sudo/etc/init.d/mysql start
安装之后默认已经启动了 MySQL。安装 mysql-server 的过程中需要指定 root 账户的密码,生产环境一定要设置强复杂度的密码。
现在安装 MySQLdb:
> pip install mysql-python
安装用的名字是 mysql-python,而不是 MySQLdb。
设置应用账号和权限
root 的权限很大,不应该在 Web 应用中直接使用此用户(root 密码一般都会被写进应用的配置文件中),应该使用一个单独的用户。我们将执行如下三步:
1.创建一个数据库 r。
2.创建一个用户,名为 web,密码为 web。
3.设置用户权限。用户 web 对数据库 r 有全部权限。
下面是执行过程:
> sudo mysql-u root mysql>create database r; Query OK, 1 row affected (0.00 sec) mysql>create user 'web'@'localhost' identified by 'web'; Query OK, 0 rows affected (0.00 sec) mysql>use r; Database changed mysql>grant all on r.*TO 'web'@'localhost'; Query OK, 0 rows affected (0.00 sec) mysql>quit; Bye
用 MySQLdb 写原生语句
下面会编写一系列数据库开发的例子,为了重复利用,把常量放到独立的 consts.py 文件里面:
HOSTNAME='localhost' DATABASE='r' USERNAME='web' PASSWORD='web' DB_URI='mysql://{}:{}@{}/{}'.format( USERNAME, PASSWORD, HOSTNAME, DATABASE)
我们看第一个例子(example1.py):
1 import MySQLdb 2 from consts import HOSTNAME, DATABASE, USERNAME, PASSWORD 3 4 try: 5 con=MySQLdb.connect(HOSTNAME, USERNAME, PASSWORD, DATABASE) 6 cur=con.cursor() 7 cur.execute("SELECT VERSION()") 8 ver=cur.fetchone() 9 print"Database version:%s"%ver 10 except MySQLdb.Error as e: 11 print"Error%d:%s"%(e.args[0], e.args[1]) 12 exit(1) 13 finally: 14 if con: 15 con.close()
分析一下这个程序的细节:
- 第 5 行,MySQLdb.connect 返回的 con 是一个数据库连接实例。使用完通过 con 关闭数据库连接是一个好习惯。
- 第 6 行,con.cursor() 返回一个游标,数据库操作都是在游标实例上执行的。
- 第 7 行,cur.execute 方法传入的就是要执行的 SQL 语句。
- 第 8 行,cur.fetchone() 返回执行结果,这一点一开始可能不适应,因为数据库操作的结果不是在 execute 中直接返回的,而需要使用 fetchone、fetchall、fetchmany 这样的方法获取结果。
数据库操作主要以 CRUD 为主。CRUD 是 Create(创建)、Read(读取)、Update(更新)和 Delete(删除)的缩写。再来看一个 CRUD 的例子(curd.py):
import MySQLdb from consts import HOSTNAME, DATABASE, USERNAME, PASSWORD con=MySQLdb.connect(HOSTNAME, USERNAME, PASSWORD, DATABASE) with con as cur: cur.execute('drop table if exists users') cur.execute('create table users(Id INT PRIMARY KEY AUTO_INCREMENT, ' 'Name VARCHAR(25))') cur.execute("insert into users(Name) values('xiaoming')") cur.execute("insert into users(Name) values('wanglang')") cur.execute('select*from users') rows=cur.fetchall() for row in rows: print row cur.execute('update users set Name=%s where Id=%s', ('ming', 1)) print 'Number of rows updated:', cur.rowcount cur=con.cursor(MySQLdb.cursors.DictCursor) cur.execute('select*from users') rows=cur.fetchall() for row in rows: print row['Id'], row['Name']
这次使用了 with 语句。connect 的__enter__方法返回了游标,在 with 中执行结束,它会判断当前是否有错误,有错误就回滚,没有则进行事务提交,相当于无须自己来写下面这样的异常处理:
try: cur=con.cursor() cur.execute("insert into example(Name) values('xiaoming')") con.commit() except MySQLdb.Error as e: con.rollback()
事务提交和回滚
事务主要用于处理操作量大、复杂度高的数据。如果操作的是一系列的动作,比如删除一个用户,不仅需要删除用户表中对应的记录,也要删除和用户表关联的表中对应的数据,甚至还有其他业务上的需要。这个时候事务处理可以用来维护数据库的完整性,保证成批的 SQL 语句要么全部执行,要么全部不执行。
MySQL 的 InnoDB 引擎支持事务,而默认的 MyISAM 引擎不支持,需要根据业务来取舍。
ORM 简介
随着项目越来越大,采用写原生 SQL 的方式在代码中会出现大量的 SQL 语句,那么问题就出现了:
1.SQL 语句重复利用率不高,越复杂的 SQL 语句条件越多,代码越长。你会看到很多很相近的 SQL 语句。
2.很多 SQL 语句是在业务逻辑中拼出来的,如果有数据库需要更改,就要求开发人员非常了解这些逻辑,否则就很容易漏掉对某些 SQL 语句的修改。
3.写 SQL 时容易忽略 Web 安全问题,给未来造成隐患。
ORM,全称 Object Relational Mapping,中文叫作对象关系映射,通过它我们可以直接使用 Python 的类的方式做数据库开发,而不再直接写原生的 SQL 语句(甚至不需要 SQL 的基础)。通过把表映射成类,把行作为实例,把字段作为属性,ORM 在执行对象操作的时候会把对应的操作转换为数据库原生语句的方式来完成数据库开发工作。
ORM 有如下优点:
- 易用性。使用这种 ORM 数据库抽象封装方式做开发,可以有效减少出现重复 SQL 语句的概率,写出来的模型也更直观、清晰。
- 性能损耗小。ORM 转换成底层数据库操作指令确实会有一些开销。以笔者的经验,性能损耗很少(不足 5%),只要不是对性能有严苛的要求,综合考虑开发效率的提升、代码重复利用率等因素,带来的好处要远大于性能损耗,而且项目越大其作用越明显。
- 设计灵活。可以很轻松地写复杂的查询。
- 可移植。比如 SQLAlchemy,它支持多个关系数据库引擎,包括流行的 MySQL、PostgreSQL 和 SQLite。可以近乎无痛地换数据库,只需要改很少的配置项即可。
使用 SQLAlchemy
SQLAlchemy 是最流行的关系型数据库的 ORM 框架,它由 Mako 的作者 Mike Bayer 创建。我们先安装它:
> pip install SQLAlchemy
连接数据库
首先需要连接到数据库:
In:from sqlalchemy import create_engine In:engine=create_engine('sqlite://', echo=False) In:with engine.connect() as con: ...:rs=con.execute('SELECT 1') ...:print rs.fetchone() ...: (1,)
create_engine 传入了一个数据库的 URI,sqlite://表示使用了一个 SQLite 的内存型数据库。URI 的格式如下:
dialect+driver://username:password@host:port/database
dialect 是数据库的实现,比如 MySQL、PostgreSQL、SQLite。driver 是 Python 对应的驱动,如果不指定,会选择默认的驱动。比如 MySQL 的默认驱动是 MySQLdb:
engine=create_engine('mysql+mysqldb://scott:tiger@localhost/foo')
因为 mysqldb 是默认的驱动,所以可以不写驱动部分:
engine=create_engine('mysql://scott:tiger@localhost/foo')
执行 SQL 时也可以更简略地这样写:
In:rs=engine.execute('select 1') In:rs.fetchone() Out:(1,)
如果你需要详细的输出,可以设置 echo=True。
使用原生 SQL
我们把之前的例子 curd.py 改写成使用 SQLAlchemy 的(raw_sql.py):
from sqlalchemy import create_engine from consts import DB_URI eng=create_engine(DB_URI) with eng.connect() as con: con.execute('drop table if exists users') con.execute('create table users(Id INT PRIMARY KEY AUTO_INCREMENT, ' 'Name VARCHAR(25))') con.execute("insert into users(name) values('xiaoming')") con.execute("insert into users(name) values('wanglang')") rs=con.execute('select*from users') for row in rs: print row
它和之前的 MySQLdb 例子的不同之处在于,结果通过返回值获取,不再需要执行 fetchone 或者 fetchall 也能获取到。
使用表达式
SQLAlchemy 支持使用表达式的方式来操作数据库,这种方式和 Ruby On Rails 中的 Active Record 模式很像(exp_sql.py):
from sqlalchemy import (create_engine, Table, MetaData, Column, Integer, String, tuple_) from sqlalchemy.sql import select, asc, and_ from consts import DB_URI eng=create_engine(DB_URI) meta=MetaData(eng) users=Table( 'Users', meta, Column('Id', Integer, primary_key=True, autoincrement=True), Column('Name', String(50), nullable=False), ) if users.exists(): users.drop() def execute(s): print '-'*20 rs=con.execute(s) for row in rs: print row['Id'], row['Name'] with eng.connect() as con: for username in ('xiaoming', 'wanglang', 'lilei'): user=users.insert().values(Name=username) con.execute(user) stm=select([users]).limit(1) execute(stm) k=[(2,)] stm=select([users]).where(tuple_(users.c.Id).in_(k)) execute(stm) stm=select([users]).where(and_(users.c.Id>2, users.c.Id<4)) execute(stm) stm=select([users]).order_by(asc(users.c.Name)) execute(stm) stm=select([users]).where(users.c.Name.like('%min%')) execute(stm)
分析一下这个例子:
- 使用 users.create() 的方式来创建表,如果需要创建的表比较多,也可以选择使用 meta.create_all(eng)。
- 代码中的 stm 变量就是一个生成好的 SQL 语句。比如“stm=select([users]).order_by(asc (users.c.Name))”,stm 的结果是:
SELECT`Users`.`Id`,`Users`.`Name` FROM`Users`ORDER BY`Users`.`Name`ASC
使用 ORM
ORM 是基于 SQLAlchemy 表达式语言的,看一个例子(orm_sql.py):
from sqlalchemy import create_engine, Column, Integer, String, Sequence from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import and_, or_ from sqlalchemy.orm import sessionmaker from consts import DB_URI eng=create_engine(DB_URI) Base=declarative_base() class User(Base): __tablename__='users' id=Column(Integer, Sequence('user_id_seq'), primary_key=True, autoincrement=True) name=Column(String(50)) Base.metadata.drop_all(bind=eng) #删除表 Base.metadata.create_all(bind=eng) Session=sessionmaker(bind=eng) session=Session() session.add_all([User(name=username) for username in ('xiaoming', 'wanglang', 'lilei')]) session.commit() def get_result(rs): print '-'*20 for user in rs: print user.name rs=session.query(User).all() get_result(rs) rs=session.query(User).filter(User.id.in_([2, ])) get_result(rs) rs=session.query(User).filter(and_(User.id>2, User.id<4)) get_result(rs) rs=session.query(User).filter(or_(User.id==2, User.id==4)) get_result(rs) rs=session.query(User).filter(User.name.like('%min%')) get_result(rs) user=session.query(User).filter_by(name='xiaoming').first() get_result([user])
分析一下这个例子:
- 定义的 User 类会生成一张表,__tablename__的值就是表名。
- 通过 sessionmaker 创建一个会话,会话提供了事务控制的支持。模型实例对象本身独立存在,如果要让其修改(创建)生效,需要把它们加入某个会话;如果不希望对其生效就从会话中去掉由 session 管理的实例对象。执行 session.commit() 时修改被提交到数据库,执行 session.rollback() 可以回滚变更。
例子里在执行开始前会先删除表,但是实际工作请不要这样用。
还可以通过 sqlalchemy.text 写复杂的条件语句来操作数据库(text_sql.py):
rs=session.query(User).filter( text('id>2 and id<4')).order_by(text('id')).all() get_result(rs) rs=session.query(User).filter(text('id<:value and name=:name')).params( value=3, name='xiaoming').all() get_result(rs) rs=session.query(User).from_statement( text('SELECT*FROM users where name=:name')).params(name='wanglang').all() get_result(rs)
数据库关联
InnoDB 类型的表可以使用外键进行多表关联,保证数据的一致性和实现一些级联操作。我们看一个例子(rel_sql.py):
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship from consts import DB_URI eng=create_engine(DB_URI) Base=declarative_base() class User(Base): __tablename__='users' id=Column(Integer, primary_key=True, autoincrement=True) name=Column(String(50)) class Address(Base): __tablename__='address' id=Column(Integer, primary_key=True, autoincrement=True) email_address=Column(String(128), nullable=False) user_id=Column(Integer, ForeignKey('users.id')) # user 字段关联到 User 表 user=relationship('User', back_populates='addresses') #这个字段需要放在 Address 类定义之后 User.addresses=relationship('Address', order_by=Address.id, back_populates='user') Base.metadata.drop_all(bind=eng) Base.metadata.create_all(bind=eng) Session=sessionmaker(bind=eng) session=Session() user=User(name='xiaoming') user.addresses=[Address(email_address='a@gmail.com', user_id=user.id), Address(email_address='b@gmail.com', user_id=user.id)] session.add(user) session.commit()
Address 表的 user_id 字段其实就是 User 的 id 字段,使用 ForeignKey 关联之后,就不需要在 Address 上独立存储一份 user_id 数据。我们通过“ipython-i”执行上面的脚本并继续验证:
> ipython--no-banner-i chapter3/section3/rel_sql.py In:for u, a in session.query(User, Address).\ ...:filter(User.id==Address.user_id).\ ...:filter(Address.email_address=='b@gmail.com').\ ...:all(): ...: print 'User ID:{}'.format(u.id) ...: print 'Email Address:{}'.format(a.email_address) ...: User ID:1 Email Address:b@gmail.com In:session.query(Address).join(User).filter(Address.id.in_([2])).all()[0]. email_address Out:'b@gmail.com' In:session.query(User).join(Address).filter(Address.email_address=='a@gmail.com').one ().name Out:'xiaoming'
虽然使用外键可以降低开发成本,减少数据量,但是在用户量大、并发度高的时候,不推荐使用外键来关联,数据的一致性和完整性问题可以通过事务来保证。
在 Flask 中使用 SQLAlchemy
Flask-SQLAlchemy(https://github.com/mitsuhiko/flask-sqlalchemy )可以帮助我们在 Flask 中很方便地使用 SQLAlchemy,它是 Flask 作者写的扩展。首先安装 Flask-SQLAlchemy:
> pip install Flask-SQLAlchemy
一个大型项目,模型应该对应地存放在不同的模型文件中,我们把 users 这个表的模型存放到 users.py:
from ext import db class User(db.Model): __tablename__='users' id=db.Column(db.Integer, primary_key=True, autoincrement=True) name=db.Column(db.String(50)) def__init__(self, name): self.name=name
db.Model 其实还是基于 declarative_base 实现的,Flask-SQLAlchemy 提供了一个和 Django 风格很像的基类。在这里重新定义了 User 的__init__方法,因为它默认需要传入所有字段,而 id 是一个自增长的字段,不需要传入。
ext 模块存放了 Flask 第三方的扩展:
from flask_sqlalchemy import SQLAlchemy db=SQLAlchemy()
这样的好处是,db 是一个没有依赖的常量,app 也可以“from ext import db”,不会造成循环依赖。
把配置也独立出来,放入 config.py 中:
from consts import DB_URI DEBUG=True SQLALCHEMY_DATABASE_URI=DB_URI SQLALCHEMY_TRACK_MODIFICATIONS=False
现在,看一个能在 Web 上创建用户的应用(app_with_sqlalchemy.py):
1 from flask import Flask, request, jsonify 2 3 from ext import db 4 from users import User 5 6 app=Flask(__name__) 7 app.config.from_object('config') 8 db.init_app(app) 9 10 with app.app_context(): 11 db.drop_all() 12 db.create_all() 13 14 15 @app.route('/users', methods=['POST']) 16 def users(): 17 username=request.form.get('name') 18 19 user=User(username) 20 print 'User ID:{}'.format(user.id) 21 db.session.add(user) 22 db.session.commit() 23 24 return jsonify({'id':user.id}) 25 26 27 if__name__=='__main__': 28 app.run(host='0.0.0.0', port=9000)
上述例子有如下细节:
- 第 7 行,使用 from_object 加载 config.py 里的配置,生产环境中推荐这样管理配置。
- 第 8 行,把第三方扩展放在 ext.py 之后,这里只需要使用 xx.init_app(app) 的方式初始化,这也是推荐的用法。
- 第 10-12 行,drop_all 和 create_all 要在定义 model 之后再执行。Flask-SQLAlchemy 要求执行的时候有应用上下文,但是在这里还没有,所以需要使用“with app.app_context()”创建应用上下文。关于应用上下文稍后会详细介绍,也会介绍正确的用法。
- 第 20 行,print 的 user.id 永远是 None,因为在没有 commit 之前还没有创建它。commit 之后 user.id 会自动改成在表中创建的条目 id。
记录慢查询
数据库性能是开发者必须关注的重点之一,在复杂的业务代码逻辑前提下,如果只是通过 MySQL 的日志去看慢查询的日志很难定位问题,那么可以借用 SQLALCHEMY_RECORD_QUERIES 和 DATABASE_QUERY_TIMEOUT 将慢查询及相关上下文信息记录到日志中。下面展示一个基于 app_with_sqlalchemy.py 的例子(logger_slow_query.py):
import logging from logging.handlers import RotatingFileHandler from flask_sqlalchemy import get_debug_queries ... app.config['DATABASE_QUERY_TIMEOUT']=0.0001 app.config['SQLALCHEMY_RECORD_QUERIES']=True formatter=logging.Formatter( "[%(asctime)s]{%(pathname)s:%(lineno)d}%(levelname)s-%(message)s") handler=RotatingFileHandler('slow_query.log', maxBytes=10000, backupCount=10) handler.setLevel(logging.WARN) handler.setFormatter(formatter) app.logger.addHandler(handler) @app.after_request def after_request(response): for query in get_debug_queries(): if query.duration>=app.config['DATABASE_QUERY_TIMEOUT']: app.logger.warn( ('Context:{}\nSLOW QUERY:{}\nParameters:{}\n' 'Duration:{}\n').format(query.context, query.statement, query.parameters, query.duration)) return response
这个例子主要做了 3 件事:
1.启用查询记录功能。
2.给 app.logger 添加一个记录日志到名为 slow_query.log 的文件的处理器,这个日志会按大小切分。
3.添加 after_request 钩子,每次请求结束后获取执行的查询语句,假如超过阈值则记录日志。
我们看一下效果:
> http-f post http://localhost:9000/users name=xiaoming
执行完成后,除了在启动 Flask 应用的终端之外,还会在 slow_query.log 日志中看到如下记录:
[2016-07-14 20:50:11,145]{chapter3/section3/logger_slow_query.py:48}WARNING- Context:chapter3/section3/logger_slow_query.py:36 (users) SLOW QUERY:INSERT INTO users (name) VALUES (%s) Parameters:('lihang',) Duration:0.00047492980957
日志中包含了出现问题的代码位置以及对应的 SQL 语言,我们就直接知道问题的根源了。
上例中的变量 DATABASE_QUERY_TIMEOUT 的值为 0.0001 只是为了演示,生产环境需要按需调大这个阈值。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论