键值对数据库 Redis
Redis 是一个基于内存的键值对存储系统,常用作数据库、缓存和消息代理。它支持字符串、字典、列表、集合、有序集合、位图(Bitmaps)、地理位置、HyperLogLog 等多种数据结构,所以常常被称为数据结构服务器。Redis 支持事务、分片、主从复制,支持 RDB(将内存中的数据保存在文件中)和 AOF(类似于 MySQL 的 binlog)两种持久化方式,还支持订阅分发、Lua 脚本、集群(Redis 3.0 加入的功能)等特性。在用作缓存时 Redis 和 Memcached 功能类似,但它还能做到 Memcached 不能做到的几点:
1.Web 应用中常需要将一些重要数据持久化到硬盘,避免宕机等原因导致数据丢失。Redis 会周期性把更新的数据写入磁盘或者追加到命令日志中,并且在此基础上实现了主从同步。而 Memcached 在进程关闭之后数据就会丢失。
2.一些业务为了简化工作,需要使用列表、集合这样只有 Redis 才支持的数据结构。相对于 Memcahced,Redis 有更多的应用场景。
3.Redis 提供了丰富的命令。比如,可以通过通配符查看线上已经存在的键、判断一个键是否存在(Memcached 这点很不方便,没有设置缓存和设置的缓存为 None 不好区分),方便地获取服务器信息和统计数值(通过 INFO)等。在 Python 客户端中这些功能可以直接集成到项目中,能帮助运维收集服务状态监控数据,绘制性能图表等。
我们先安装 Redis:
> sudo apt-get install redis-server-yq
安装完毕 Redis 已经启动了,可以验证一下:
> redis-cli 127.0.0.1:6379>quit
安装 Redis 的 Python 客户端:
> pip install redis
操作 Redis
看看 Redis 操作列表的用法:
In:import redis In:conn=redis.Redis() In:conn.rpush('a', '1') # a 就是要操作的键 In:conn.lrange('a', 0,-1) #表示从索引为 0 的元素到最后一个元素 Out:['1'] In:conn.rpush('a', '2') Out:2L In:conn.lrange('a', 0,-1) Out:['1', '2']
可以看到 Redis 直接进行原子操作。Redis 还支持对于列表的其他类型的操作,我们接着演示。
In:conn.lpush('a', '3') #将值推入到列表的左端 Out:3L In:conn.lindex('a', 0) #返回列表中第 0 个元素的值 Out:'3' In:conn.rpush('a',*[4, 5, 6]) #为了测试效果,一次性推入 3 个元素 Out:6L In:conn.lrange('a', 0,-1) Out:['3', '1', '2', '4', '5', '6'] In:conn.ltrim('a', 1, 4) #对列表进行修剪,只保留索引从 1 到 4 的值 Out:True In:conn.lrange('a', 0,-1) Out:['1', '2', '4', '5'] In:conn.lpop('a') #移除并返回列表最左端的元素 Out:'1' In:conn.rpop('a') #移除并返回列表最右端的元素 Out:'5'
除了列表,另外一个常用的数据结构为字典,在 Redis 中这样使用:
In:conn.hset('d', 'a', 1) #给名字为 d 的键添加一个名字为 a,值为 1 的字段 Out:1L In:conn.hmset('d',{'b':2, 'c':3}) #一次性的添加多个字段 Out:True In:conn.hget('d', 'b') #获取字段 b 的值 Out:'2' In:conn.hmget('d', ['a', 'b']) #一次性的获取多个字段的值 Out:['1', '2'] In:conn.hgetall('d') #获取 d 的全部字段和值的对应关系(字典) Out:{'a':'1', 'b':'2', 'c':'3'}
这只是众多数据结构中的最常用的 2 种数据结构提供的主要操作功能,你可以从命令页(http://bit.ly/28VOVO2 )找到全部命令列表,本书就不一一列举了。但是可以想象,Redis 提供了非常多便利的方法来完成工作。
Redis 应用场景
有哪些场景可以使用 Redis 代替数据库呢?其实原则很简单:当不需要数据库的高级功能(比如事务提供的回滚、关联查询、UPDATE 操作等),且 Redis 能满足此应用场景时就可以选择 Redis。除了缓存,我们举例说明常用的几种场景。
取最新 N 个数据的操作
现在看一下使用 SQLAlchemy、Flask 和 Redis 的例子(lastest_files.py)。首先定义一个简单的 PasteFile 模型:
class PasteFile(db.Model): __tablename__='files' id=db.Column(db.Integer, primary_key=True) name=db.Column(db.String(5000), nullable=False) uploadtime=db.Column(db.DateTime, nullable=False) def__init__(self, name='', uploadtime=None): self.uploadtime=datetime.now() if uploadtime is None else uploadtime self.name=name
接着定义两个视图,第一个是更新的视图:
r=redis.StrictRedis(host='localhost', port=6379, db=0) MAX_FILE_COUNT=50 @app.route('/upload', methods=['POST']) def upload(): name=request.form.get('name') pastefile=PasteFile(name) db.session.add(pastefile) db.session.commit() r.lpush('latest.files', pastefile.id) r.ltrim('latest.files', 0, MAX_FILE_COUNT-1) return jsonify({'r':0})
r.lpush 表示对列表左侧放入新的数据库条目的 id,r.ltrim 用来修剪列表,只保留最近的 50 个结果。
第二个是获取最近上传文件列表的视图:
@app.route('/lastest_files') def get_lastest_files(): start=request.args.get('start', default=0, type=int) limit=request.args.get('limit', default=20, type=int) ids=r.lrange('latest.files', start, start+limit-1) files=PasteFile.query.filter(PasteFile.id.in_(ids)).all() return json.dumps([{'id':f.id, 'filename':f.name}for f in files])
先获得最近上传的文件 id 列表,再获得这些模型对象。
验证效果之前先创建一些数据:
In:from chapter6.section4.lastest_files import app, PasteFile, r In:import time In:import random In:import string In:with app.test_client() as client: ...: for_in range(100): ...: client.post('/upload', data={'name':''.join(random.sample(string. ascii_letters, 10))}) ...: time.sleep(0.5) ...:
使用 r.lrange 获取最近的 5 个条目:
In:start=0 In:limit=5 In:ids=r.lrange('latest.files', start, start+limit-1) In:ids Out:['100', '99', '98', '97', '96']
获取条目就是在内存中完成的。使用 SQLAlchemy,就需要用如下方式:
In:from sqlalchemy import desc In:[id for id, in PasteFile.query.with_entities(PasteFile.id).order_by(desc(PasteFile .id)).offset(start).limit(limit).all()] Out:[100L, 99L, 98L, 97L, 96L]
虽然 id 的类型不同,但效果是一样的:
In:PasteFile.query.get('1').name Out:u'gtRQMBePAm' In:PasteFile.query.get(1).name Out:u'gtRQMBePAm'
这个时候可能你会有疑问,之前使用 Libmc 是直接缓存模型对象的。这里比较曲折,通过 id 列表再去获得对应的模型列表。这是因为 Redis 并没有内置序列化工作,如果直接缓存对象,缓存的对象会变成字符串:
In:p=PasteFile.query.get(100) In:p Out:<lastest_files.PasteFile at 0x7f8ad0017350> In:r.set('a', p) Out:True In:a=r.get('a') In:a Out:'<lastest_files.PasteFile object at 0x7f8ad0017350>' In:type(a) Out:str
这涉及序列化/反序列化。将对象的状态信息转换为可以存储或传输的形式的过程就是序列化;把这个存储的内容还原成对象就是反序列化。
我们使用 MessagePack 来做序列化和反序列化的工作。MessagePack 是一个基于二进制的高效的对象序列化类库,可用于跨语言通信。它可以像 JSON 那样,在许多种语言之间交换结构对象,但是它比 JSON 更快速也更轻巧。不选择 cPickle 就是希望序列化的数据可以被跨语言使用。
我们先安装 MessagePack:
> pip install msgpack-python
PasteFile 基于 SQLAlchemy,而且属性 uploadtime 是 Datetime 类型,它们都不支持 Mes-sagePack,需要自定义序列化和反序列化的方法,也就是给 PasteFile 添加两个方法(lastest_files_with_msgpack.py):
import ast import msgpack class PasteFile(db.Model): ... def to_dict(self): #不一定整个对象的全部方法都要缓存,缓存对象中有用的属性就可以了 d={k:v for k, v in vars(self).items() if not k.startswith('_')} # datetime 格式不能被序列化,需要先转换成对应的时间字符串 d['uploadtime']=d['uploadtime'].strftime('%Y%m%dT%H:%M:%S.%f') return str(d) #序列化的数据必须是字符串类型 @classmethod def from_dict(cls, data): data=ast.literal_eval(data) # id 是自增长的字段,没有在__init__中传入,需要在生成对象之后再赋值 id 这个属性 id=data.pop('id') data['uploadtime']=datetime.strptime( data['uploadtime'], '%Y%m%dT%H:%M:%S.%f') p=cls(**data) p.id=id return p
其中 to_dict 用来序列化,from_dict 用来反序列化。封装之后就是如下函数:
def default(obj): if isinstance(obj, PasteFile): return msgpack.ExtType(42, obj.to_dict()) raise TypeError('Unknown type:%r'%(obj,)) def ext_hook(code, data): if code==42: p=PasteFile.from_dict(data) return p return msgpack.ExtType(code, data)
接下来试验一下:
In:p=PasteFile.query.get(100) In:p Out:<chapter6.section4.lastest_files_with_msgpack.PasteFile at 0x7eff3030d790> In:p.name Out:u'aIMrwLqdAi' In:packed=msgpack.packb(p, default=default) In:packed Out:"\xc7M*{'uploadtime':'20160602T12:34:01.000000', 'name':u'aIMrwLqdAi', 'id':100 L}" In:unpacked=msgpack.unpackb(packed, ext_hook=ext_hook) In:unpacked Out:<chapter6.section4.lastest_files_with_msgpack.PasteFile at 0x7eff3030d3d0> In:unpacked.name Out:u'aIMrwLqdAi'
可以看到序列化和反序列话之后,重要的属性都可以找到。
是否序列化以及如何序列化,根据业务会有不同的考虑。序列化的优点是可以把对象存放起来,在对应的位置再反序列化回来,这也是消息队列能实现的原因。如果想序列化缓存对象,关键要看对缓存的对象反序列化获得模型和只缓存 id 再获得对应模型的效率孰高孰低。
取 TOP N 操作(排行榜应用)
豆瓣东西曾经做过一些微信社会化营销的小游戏,在小游戏结束时会立刻列出来分数和排名,还可以分享给朋友。如果使用传统的 MySQL+Memcached,思路如下。
1.创建一张游戏表,表结构如下:
CREATE TABLE`game_score`( `id`int(10) unsigned NOT NULL AUTO_INCREMENT, `game_id`smallint(6) unsigned NOT NULL, `user_id`int(11) DEFAULT 0, `create_time`timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `score`float NOT NULL, PRIMARY KEY(`id`), KEY`idx_score`(`game_id`,`score`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
2.每当用户完成一次小游戏时,就把对应的游戏 id、用户和得分插入数据库:
insert into game_score(game_id, user_id, score) values (GAME_ID, USER_ID, SCORE)
3.通过如下两个 SQL 语句计算得分的排名情况:
select count(1) from game_score where game_id=GAME_ID #获取完成游戏总次数 select count(1) from game_score where game_id=%s and score<SCORE
4.由于插入和查询操作太频繁,对 COUNT 语句添加缓存,插入语句的执行放到异步队列,插入完成后会更新对应的缓存。
这看起来有点复杂,而且插入之后看到的排名由于缓存可能会稍微的延迟。如果使用 Redis 实现,将会比较简单(top_n.py):
import string import random import redis r=redis.StrictRedis(host='localhost', port=6379, db=0) GAME_BOARD_KEY='game.board' #插入 1000 条随机用户名和分数组成的记录。zadd 方法表示我们操作的是一个有序列表 for_in range(1000): score=round((random.random()*100), 2) user_id=''.join(random.sample(string.ascii_letters, 6)) r.zadd(GAME_BOARD_KEY, score, user_id) #随机获得一个用户和他的得分,zrevrange 表示从高到低对列表排序 user_id, score=r.zrevrange(GAME_BOARD_KEY, 0,-1, withscores=True)[random.randint(0, 200)] print user_id, score #获取全部记录条目数 board_count=r.zcount(GAME_BOARD_KEY, 0, 100) #这个用户分数超过了多少用户 current_count=r.zcount(GAME_BOARD_KEY, 0, score) print current_count, board_count print 'TOP 10' print '-'*20 #获取排行榜前 10 位的用户名和得分 for user_id, score in r.zrevrangebyscore(GAME_BOARD_KEY, 100, 0, start=0, num=10, withscores=True): print user_id, score
一个有序集合的元素数量可以达到 232 -1。使用 zadd 的复杂度是 O(log(N)),zrevrange 和 zrevrangebyscore 的复杂度是 O(log(N)+M) (N 是 Set 大小,M 是结果/操作元素的个数)。可见使用 Redis 多么方便,效率还很高。
计数器
Redis 非常适合用来做计数器:
In:COUNT_KEY='id:{id}'.format(id=100) In:COUNT_KEY Out:'id:100' In:r.get(COUNT_KEY) In:r.incr(COUNT_KEY) Out:1 In:r.incr(COUNT_KEY) Out:2 In:r.decr(COUNT_KEY) Out:1 In:count=r.incr(COUNT_KEY) In:count Out:2 In:r.incrby(COUNT_KEY, 2) Out:4 In:r.incrby(COUNT_KEY, 5) Out:9
实时统计
Redis 的位图提供了二进制的操作,非常适合存储布尔类型的值。常见场景是记录用户登录状态,用于日后计算一段时间内的活跃用户量。
位图可以对值进行基于二进制位的置位操作。我们可以把值的每一位当作一个用户,登录了就置为 1,否则还是默认的 0。假设现有 10 个用户,如果位图的值为 0100100001,表示第 2、5 和 10 个用户是活跃的。用这样的方式来存储非常省内存,而且计算起来很方便,否则需要记录专门的登录日志或者使用表来记录用户的登录情况,这也给数据库服务增加了压力。
我们看一下计算活跃用户数的例子(user_active.py)。首先创建一些登录数据:
import redis ACCOUNT_ACTIVE_KEY='account:active' r.flushall() #为了测试方便,每次启动后先清理 Redis now=datetime.utcnow() def record_active(account_id, t=None): if t is None: t=datetime.utcnow() p=r.pipeline() #使用 Redis 提供的事务 key=ACCOUNT_ACTIVE_KEY for arg in ('year', 'month', 'day'): key='{}:{}'.format(key, getattr(t, arg)) p.setbit(key, account_id, 1) #设置年、月、日三种键 p.execute() #事务提交 def gen_records(max_days, population, k):#随机生成一些数据 for day in range(1, max_days):#日期需要从 1 开始 time_=datetime(now.year, now.month, day) accounts=random.sample(range(population), k) for account_id in accounts: record_active(account_id, time_) gen_records(29, 10000, 2000)
最后会添加从 1 号到 28 号的数据,每天都从 10,000 个用户中随机选 2000 个用户登录。
在 IPython 交互模式下试验:
> ipython-i user_active.py #这个月总的活跃用户数 In:r.bitcount('{}:{}:{}'.format(ACCOUNT_ACTIVE_KEY, now.year, now.month)) Out:9991 #总用户为 10000,但是有 9 个用户没有登录过 #今天的活跃用户数,因为当时设置的是每天 2000,这里的值也就是 2000 In:r.bitcount('{}:{}:{}:{}'.format(ACCOUNT_ACTIVE_KEY, now.year, now.month, now.day)) Out:2000 In:account_id=1200 #随机找一个用户 #查看这个随机用户是否曾经登录过 In:r.getbit('{}:{}:{}'.format(ACCOUNT_ACTIVE_KEY, now.year, now.month), account_id) Out:1 #这个用户在活跃用户中 In:r.getbit('{}:{}:{}'.format(ACCOUNT_ACTIVE_KEY, now.year, now.month), 10001) Out:0 #这个用户肯定不在活跃用户中 #获取当月 1 号和 2 号的键 In:keys=['{}:{}:{}:{}'.format(ACCOUNT_ACTIVE_KEY, now.year, now.month, day) ...:for day in range(1, 3)] #获取在 1 号或者在 2 号的活跃用户数 In:r.bitop('or', 'destkey:or',*keys) Out:1250L #查询在 1 号或者在 2 号的活跃用户数 In:r.bitcount('destkey:or') Out:3584 #获取 1 号和 2 号的都活跃的用户数 In:r.bitop('and', 'destkey:and',*keys) Out:1250L #查询在 1 号和在 2 号都活跃用户数 In:r.bitcount('destkey:and') Out:416
如果位的位置超过了当前字符串的长度,会自动扩充这个字符串。看一下内存占用情况,先编写计算内存使用的函数:
def calc_memory(): r.flushall() print 'USED_MEMORY:{}'.format(r.info()['used_memory_human']) #执行前先看当前内存的占用情况 start=time.time() # 20*100000 次(100 万中选择 10 万) gen_records(21, 1000000, 100000) print 'COST:{}'.format(time.time()-start) #记录花费时间 print 'USED_MEMORY:{}'.format(r.info()['used_memory_human']) #记录添加记录之后的内存占用情况 In:calc_memory() USED_MEMORY:495.86K COST:377.873290062 USED_MEMORY:4.54M
通过 calc_memory 的执行结果可以看到使用位图存储非常省空间,200 万的用户活跃计数只占用了 4 MB 多一点的空间,而且需要强调的是,每次计数我们都是设置了 3 个键。
多了解自己的产品需求和技术难度,当对 Redis 数据结构和数据操作都非常了解熟悉之后,就可以想到非常多的 Redis 应用场景了。
分片和集群管理
之前我们讨论的都是单机 Redis,而在大型网站应用中,热点数据量往往非常大,一个实例可能会放不下。无论是物理主机还是云主机,内存资源都是有限的,这个时候就要考虑横向扩展:由多台主机协同提供服务。现在多核 CPU、几十 GB 内存、SSD 都非常普遍,硬件资源成本却越来越低。当单机的 Redis 实例不能满足需要时,我们可以通过一致性哈希(Consistent Hashing)算法将 Redis 数据的键进行散列,通过哈希函数,让特定的键映射到特定的 Redis 节点上:这就是分片。在 Redis 3.0 之前,需要借助客户端或者代理实现分片。遗憾的是,Redis 的 Python 客户端 redis-py 目前还没有实现这个分片功能。
常见的分片和集群管理方式有如下三种。
1.Twemproxy:它是在 Redis 3.0 之前通用的方式,它是 Twitter 开发的一个支持 Mem-cached ASCII 和 Redis 协议的、单线程,使用 C 编写的代理。一般一个 Redis 应用会由多个 Twemproxy 来管理,少数 Twemproxy 负责写,多数负责读。通常使用 Redis 自带的 Sentinel 来实现故障的自动切换以达到高可用。Twemproxy 可以定时向 Redis Sentinel 拉取信息,从而替换出现异常的节点。它最大的缺点是无法平滑地扩容/缩容,不便于运维;其次是没有友好的控制面板。需要注意的是,看上去 Twitter 已经不再继续维护它了(http://bit.ly/28W8aHV )。
2.Redis Cluster:它是 Redis 3.0 添加的集群方式,也是未来自动分片和高可用的首选方式。这种方式使用数据分片而非一致性哈希来实现,简单地说,就是一个 Redis 集群包含 16,384 个哈希槽,数据库中的每个键都属于这 16,384 个哈希槽的其中一个(使用 CRC16 函数对键获得校检值,对 16384 取余来计算键属于哪个槽)。集群中的每个节点负责处理一部分哈希槽,当添加新的节点时,只需要将对应的某些槽移动到新添加的节点上;当移出节点时,就把这个被移出节点上面的槽分配到其他节点上,而且这个添加和下线的过程不会阻塞整个集群。这个思路很好,只是目前还没有看到大型网站成功的案例,需要继续关注。
3.Codis:豌豆荚在生产环境使用的 Redis 分布式集群解决方案。对于上层的应用来说,连接到 Codis 代理和连接原生的 Redis Server 没有明显的区别(有少量还不支持的命令(http://bit.ly/28Y4TX3 )),上层应用可以像使用单机的 Redis 一样使用,Codis 底层会处理请求的转发,不停机的数据迁移等工作,所有后边的一切事情,对于前面的客户端来说是透明的,可以简单地认为后边连接的是一个内存无限大的 Redis 服务。它还兼容 Twemproxy,而且能很容易地把数据从 Twemproxy 迁到 Codis 上,让运维和监控更方便。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论