返回介绍

键值对数据库 Redis

发布于 2025-04-20 18:52:15 字数 13327 浏览 0 评论 0 收藏

Redis 是一个基于内存的键值对存储系统,常用作数据库、缓存和消息代理。它支持字符串、字典、列表、集合、有序集合、位图(Bitmaps)、地理位置、HyperLogLog 等多种数据结构,所以常常被称为数据结构服务器。Redis 支持事务、分片、主从复制,支持 RDB(将内存中的数据保存在文件中)和 AOF(类似于 MySQL 的 binlog)两种持久化方式,还支持订阅分发、Lua 脚本、集群(Redis 3.0 加入的功能)等特性。在用作缓存时 Redis 和 Memcached 功能类似,但它还能做到 Memcached 不能做到的几点:

1.Web 应用中常需要将一些重要数据持久化到硬盘,避免宕机等原因导致数据丢失。Redis 会周期性把更新的数据写入磁盘或者追加到命令日志中,并且在此基础上实现了主从同步。而 Memcached 在进程关闭之后数据就会丢失。

2.一些业务为了简化工作,需要使用列表、集合这样只有 Redis 才支持的数据结构。相对于 Memcahced,Redis 有更多的应用场景。

3.Redis 提供了丰富的命令。比如,可以通过通配符查看线上已经存在的键、判断一个键是否存在(Memcached 这点很不方便,没有设置缓存和设置的缓存为 None 不好区分),方便地获取服务器信息和统计数值(通过 INFO)等。在 Python 客户端中这些功能可以直接集成到项目中,能帮助运维收集服务状态监控数据,绘制性能图表等。

我们先安装 Redis:

> sudo apt-get install redis-server-yq

安装完毕 Redis 已经启动了,可以验证一下:

> redis-cli
127.0.0.1:6379>quit

安装 Redis 的 Python 客户端:

> pip install redis

操作 Redis

看看 Redis 操作列表的用法:

In:import redis
In:conn=redis.Redis()
In:conn.rpush('a', '1') # a 就是要操作的键
In:conn.lrange('a', 0,-1) #表示从索引为 0 的元素到最后一个元素
Out:['1']
In:conn.rpush('a', '2')
Out:2L
In:conn.lrange('a', 0,-1)
Out:['1', '2']

可以看到 Redis 直接进行原子操作。Redis 还支持对于列表的其他类型的操作,我们接着演示。

In:conn.lpush('a', '3') #将值推入到列表的左端
Out:3L
In:conn.lindex('a', 0) #返回列表中第 0 个元素的值
Out:'3'
In:conn.rpush('a',*[4, 5, 6]) #为了测试效果,一次性推入 3 个元素
Out:6L
In:conn.lrange('a', 0,-1)
Out:['3', '1', '2', '4', '5', '6']
In:conn.ltrim('a', 1, 4) #对列表进行修剪,只保留索引从 1 到 4 的值
Out:True
In:conn.lrange('a', 0,-1)
Out:['1', '2', '4', '5']
In:conn.lpop('a') #移除并返回列表最左端的元素
Out:'1'
In:conn.rpop('a') #移除并返回列表最右端的元素
Out:'5'

除了列表,另外一个常用的数据结构为字典,在 Redis 中这样使用:

In:conn.hset('d', 'a', 1) #给名字为 d 的键添加一个名字为 a,值为 1 的字段
Out:1L
In:conn.hmset('d',{'b':2, 'c':3}) #一次性的添加多个字段
Out:True
In:conn.hget('d', 'b') #获取字段 b 的值
Out:'2'
In:conn.hmget('d', ['a', 'b']) #一次性的获取多个字段的值
Out:['1', '2']
In:conn.hgetall('d') #获取 d 的全部字段和值的对应关系(字典)
Out:{'a':'1', 'b':'2', 'c':'3'}

这只是众多数据结构中的最常用的 2 种数据结构提供的主要操作功能,你可以从命令页(http://bit.ly/28VOVO2 )找到全部命令列表,本书就不一一列举了。但是可以想象,Redis 提供了非常多便利的方法来完成工作。

Redis 应用场景

有哪些场景可以使用 Redis 代替数据库呢?其实原则很简单:当不需要数据库的高级功能(比如事务提供的回滚、关联查询、UPDATE 操作等),且 Redis 能满足此应用场景时就可以选择 Redis。除了缓存,我们举例说明常用的几种场景。

取最新 N 个数据的操作

现在看一下使用 SQLAlchemy、Flask 和 Redis 的例子(lastest_files.py)。首先定义一个简单的 PasteFile 模型:

class PasteFile(db.Model):
    __tablename__='files'
    id=db.Column(db.Integer, primary_key=True)
    name=db.Column(db.String(5000), nullable=False)
    uploadtime=db.Column(db.DateTime, nullable=False)

    def__init__(self, name='', uploadtime=None):
        self.uploadtime=datetime.now() if uploadtime is None else uploadtime
        self.name=name

接着定义两个视图,第一个是更新的视图:

r=redis.StrictRedis(host='localhost', port=6379, db=0)
MAX_FILE_COUNT=50


@app.route('/upload', methods=['POST'])
def upload():
    name=request.form.get('name')

    pastefile=PasteFile(name)
    db.session.add(pastefile)
    db.session.commit()
    r.lpush('latest.files', pastefile.id)
    r.ltrim('latest.files', 0, MAX_FILE_COUNT-1)
 
    return jsonify({'r':0})

r.lpush 表示对列表左侧放入新的数据库条目的 id,r.ltrim 用来修剪列表,只保留最近的 50 个结果。

第二个是获取最近上传文件列表的视图:

@app.route('/lastest_files')
def get_lastest_files():
    start=request.args.get('start', default=0, type=int)
    limit=request.args.get('limit', default=20, type=int)
    ids=r.lrange('latest.files', start, start+limit-1)
    files=PasteFile.query.filter(PasteFile.id.in_(ids)).all()
    return json.dumps([{'id':f.id, 'filename':f.name}for f in files])

先获得最近上传的文件 id 列表,再获得这些模型对象。

验证效果之前先创建一些数据:

In:from chapter6.section4.lastest_files import app, PasteFile, r
In:import time
In:import random
In:import string
In:with app.test_client() as client:
...:   for_in range(100):
...:      client.post('/upload', data={'name':''.join(random.sample(string.
   ascii_letters, 10))})
...:     time.sleep(0.5)
...:

使用 r.lrange 获取最近的 5 个条目:

In:start=0
In:limit=5
In:ids=r.lrange('latest.files', start, start+limit-1)
In:ids
Out:['100', '99', '98', '97', '96']

获取条目就是在内存中完成的。使用 SQLAlchemy,就需要用如下方式:

In:from sqlalchemy import desc
In:[id for id, in PasteFile.query.with_entities(PasteFile.id).order_by(desc(PasteFile
    .id)).offset(start).limit(limit).all()]
Out:[100L, 99L, 98L, 97L, 96L]

虽然 id 的类型不同,但效果是一样的:

In:PasteFile.query.get('1').name
Out:u'gtRQMBePAm'
In:PasteFile.query.get(1).name
Out:u'gtRQMBePAm'

这个时候可能你会有疑问,之前使用 Libmc 是直接缓存模型对象的。这里比较曲折,通过 id 列表再去获得对应的模型列表。这是因为 Redis 并没有内置序列化工作,如果直接缓存对象,缓存的对象会变成字符串:

In:p=PasteFile.query.get(100)
In:p
Out:<lastest_files.PasteFile at 0x7f8ad0017350>
In:r.set('a', p)
Out:True
In:a=r.get('a')
In:a
Out:'<lastest_files.PasteFile object at 0x7f8ad0017350>'
In:type(a)
Out:str

这涉及序列化/反序列化。将对象的状态信息转换为可以存储或传输的形式的过程就是序列化;把这个存储的内容还原成对象就是反序列化。

我们使用 MessagePack 来做序列化和反序列化的工作。MessagePack 是一个基于二进制的高效的对象序列化类库,可用于跨语言通信。它可以像 JSON 那样,在许多种语言之间交换结构对象,但是它比 JSON 更快速也更轻巧。不选择 cPickle 就是希望序列化的数据可以被跨语言使用。

我们先安装 MessagePack:

> pip install msgpack-python

PasteFile 基于 SQLAlchemy,而且属性 uploadtime 是 Datetime 类型,它们都不支持 Mes-sagePack,需要自定义序列化和反序列化的方法,也就是给 PasteFile 添加两个方法(lastest_files_with_msgpack.py):

import ast

import msgpack


class PasteFile(db.Model):
    ...
    def to_dict(self):
        #不一定整个对象的全部方法都要缓存,缓存对象中有用的属性就可以了
        d={k:v for k, v in vars(self).items() if not k.startswith('_')}
        # datetime 格式不能被序列化,需要先转换成对应的时间字符串
        d['uploadtime']=d['uploadtime'].strftime('%Y%m%dT%H:%M:%S.%f')
        return str(d) #序列化的数据必须是字符串类型

    @classmethod
    def from_dict(cls, data):
        data=ast.literal_eval(data)
        # id 是自增长的字段,没有在__init__中传入,需要在生成对象之后再赋值 id 这个属性
        id=data.pop('id')
        data['uploadtime']=datetime.strptime(
            data['uploadtime'], '%Y%m%dT%H:%M:%S.%f')
        p=cls(**data)
        p.id=id
        return p

其中 to_dict 用来序列化,from_dict 用来反序列化。封装之后就是如下函数:

def default(obj):
   if isinstance(obj, PasteFile):
       return msgpack.ExtType(42, obj.to_dict())
   raise TypeError('Unknown type:%r'%(obj,))


def ext_hook(code, data):
    if code==42:
        p=PasteFile.from_dict(data)
        return p
    return msgpack.ExtType(code, data)

接下来试验一下:

In:p=PasteFile.query.get(100)
In:p
Out:<chapter6.section4.lastest_files_with_msgpack.PasteFile at 0x7eff3030d790>
In:p.name
Out:u'aIMrwLqdAi'
In:packed=msgpack.packb(p, default=default)
In:packed
Out:"\xc7M*{'uploadtime':'20160602T12:34:01.000000', 'name':u'aIMrwLqdAi', 'id':100
   L}"
In:unpacked=msgpack.unpackb(packed, ext_hook=ext_hook)
In:unpacked
Out:<chapter6.section4.lastest_files_with_msgpack.PasteFile at 0x7eff3030d3d0>
In:unpacked.name
Out:u'aIMrwLqdAi'

可以看到序列化和反序列话之后,重要的属性都可以找到。

是否序列化以及如何序列化,根据业务会有不同的考虑。序列化的优点是可以把对象存放起来,在对应的位置再反序列化回来,这也是消息队列能实现的原因。如果想序列化缓存对象,关键要看对缓存的对象反序列化获得模型和只缓存 id 再获得对应模型的效率孰高孰低。

取 TOP N 操作(排行榜应用)

豆瓣东西曾经做过一些微信社会化营销的小游戏,在小游戏结束时会立刻列出来分数和排名,还可以分享给朋友。如果使用传统的 MySQL+Memcached,思路如下。

1.创建一张游戏表,表结构如下:

CREATE TABLE`game_score`(
`id`int(10) unsigned NOT NULL AUTO_INCREMENT,
`game_id`smallint(6) unsigned NOT NULL,
`user_id`int(11) DEFAULT 0,
`create_time`timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`score`float NOT NULL,
PRIMARY KEY(`id`),
KEY`idx_score`(`game_id`,`score`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

2.每当用户完成一次小游戏时,就把对应的游戏 id、用户和得分插入数据库:

insert into game_score(game_id, user_id, score) values (GAME_ID, USER_ID, SCORE)

3.通过如下两个 SQL 语句计算得分的排名情况:

select count(1) from game_score where game_id=GAME_ID #获取完成游戏总次数
select count(1) from game_score where game_id=%s and score<SCORE

4.由于插入和查询操作太频繁,对 COUNT 语句添加缓存,插入语句的执行放到异步队列,插入完成后会更新对应的缓存。

这看起来有点复杂,而且插入之后看到的排名由于缓存可能会稍微的延迟。如果使用 Redis 实现,将会比较简单(top_n.py):

import string
import random

import redis

r=redis.StrictRedis(host='localhost', port=6379, db=0)
GAME_BOARD_KEY='game.board'

#插入 1000 条随机用户名和分数组成的记录。zadd 方法表示我们操作的是一个有序列表
for_in range(1000):
    score=round((random.random()*100), 2)
    user_id=''.join(random.sample(string.ascii_letters, 6))
    r.zadd(GAME_BOARD_KEY, score, user_id)

#随机获得一个用户和他的得分,zrevrange 表示从高到低对列表排序
user_id, score=r.zrevrange(GAME_BOARD_KEY, 0,-1,
                           withscores=True)[random.randint(0, 200)]
print user_id, score

#获取全部记录条目数
board_count=r.zcount(GAME_BOARD_KEY, 0, 100)
#这个用户分数超过了多少用户
current_count=r.zcount(GAME_BOARD_KEY, 0, score)

print current_count, board_count

print 'TOP 10'
print '-'*20

#获取排行榜前 10 位的用户名和得分
for user_id, score in r.zrevrangebyscore(GAME_BOARD_KEY, 100, 0, start=0,
                                         num=10, withscores=True):
    print user_id, score

一个有序集合的元素数量可以达到 232 -1。使用 zadd 的复杂度是 O(log(N)),zrevrange 和 zrevrangebyscore 的复杂度是 O(log(N)+M) (N 是 Set 大小,M 是结果/操作元素的个数)。可见使用 Redis 多么方便,效率还很高。

计数器

Redis 非常适合用来做计数器:

In:COUNT_KEY='id:{id}'.format(id=100)
In:COUNT_KEY
Out:'id:100'
In:r.get(COUNT_KEY)
In:r.incr(COUNT_KEY)
Out:1
In:r.incr(COUNT_KEY)
Out:2
In:r.decr(COUNT_KEY)
Out:1
In:count=r.incr(COUNT_KEY)
In:count
Out:2
In:r.incrby(COUNT_KEY, 2)
Out:4
In:r.incrby(COUNT_KEY, 5)
Out:9

实时统计

Redis 的位图提供了二进制的操作,非常适合存储布尔类型的值。常见场景是记录用户登录状态,用于日后计算一段时间内的活跃用户量。

位图可以对值进行基于二进制位的置位操作。我们可以把值的每一位当作一个用户,登录了就置为 1,否则还是默认的 0。假设现有 10 个用户,如果位图的值为 0100100001,表示第 2、5 和 10 个用户是活跃的。用这样的方式来存储非常省内存,而且计算起来很方便,否则需要记录专门的登录日志或者使用表来记录用户的登录情况,这也给数据库服务增加了压力。

我们看一下计算活跃用户数的例子(user_active.py)。首先创建一些登录数据:

import redis

ACCOUNT_ACTIVE_KEY='account:active'

r.flushall() #为了测试方便,每次启动后先清理 Redis
now=datetime.utcnow()

def record_active(account_id, t=None):
    if t is None:
        t=datetime.utcnow()
    p=r.pipeline() #使用 Redis 提供的事务
    key=ACCOUNT_ACTIVE_KEY
    for arg in ('year', 'month', 'day'):
        key='{}:{}'.format(key, getattr(t, arg))
        p.setbit(key, account_id, 1) #设置年、月、日三种键
    p.execute() #事务提交


def gen_records(max_days, population, k):#随机生成一些数据
    for day in range(1, max_days):#日期需要从 1 开始
        time_=datetime(now.year, now.month, day)
        accounts=random.sample(range(population), k)
        for account_id in accounts:
            record_active(account_id, time_)

gen_records(29, 10000, 2000)

最后会添加从 1 号到 28 号的数据,每天都从 10,000 个用户中随机选 2000 个用户登录。

在 IPython 交互模式下试验:

> ipython-i user_active.py
#这个月总的活跃用户数
In:r.bitcount('{}:{}:{}'.format(ACCOUNT_ACTIVE_KEY, now.year, now.month))
Out:9991 #总用户为 10000,但是有 9 个用户没有登录过
#今天的活跃用户数,因为当时设置的是每天 2000,这里的值也就是 2000
In:r.bitcount('{}:{}:{}:{}'.format(ACCOUNT_ACTIVE_KEY, now.year, now.month, now.day))
Out:2000
In:account_id=1200 #随机找一个用户
#查看这个随机用户是否曾经登录过
In:r.getbit('{}:{}:{}'.format(ACCOUNT_ACTIVE_KEY, now.year, now.month), account_id)
Out:1 #这个用户在活跃用户中
In:r.getbit('{}:{}:{}'.format(ACCOUNT_ACTIVE_KEY, now.year, now.month), 10001)
Out:0 #这个用户肯定不在活跃用户中
#获取当月 1 号和 2 号的键
In:keys=['{}:{}:{}:{}'.format(ACCOUNT_ACTIVE_KEY, now.year, now.month, day)
...:for day in range(1, 3)]
#获取在 1 号或者在 2 号的活跃用户数
In:r.bitop('or', 'destkey:or',*keys)
Out:1250L
#查询在 1 号或者在 2 号的活跃用户数
In:r.bitcount('destkey:or')
Out:3584
#获取 1 号和 2 号的都活跃的用户数
In:r.bitop('and', 'destkey:and',*keys)
Out:1250L
#查询在 1 号和在 2 号都活跃用户数
In:r.bitcount('destkey:and')
Out:416

如果位的位置超过了当前字符串的长度,会自动扩充这个字符串。看一下内存占用情况,先编写计算内存使用的函数:

def calc_memory():
    r.flushall()

    print 'USED_MEMORY:{}'.format(r.info()['used_memory_human']) #执行前先看当前内存的占用情况

    start=time.time()
    # 20*100000 次(100 万中选择 10 万)
    gen_records(21, 1000000, 100000)

    print 'COST:{}'.format(time.time()-start) #记录花费时间
    print 'USED_MEMORY:{}'.format(r.info()['used_memory_human']) #记录添加记录之后的内存占用情况

In:calc_memory()
USED_MEMORY:495.86K
COST:377.873290062
USED_MEMORY:4.54M

通过 calc_memory 的执行结果可以看到使用位图存储非常省空间,200 万的用户活跃计数只占用了 4 MB 多一点的空间,而且需要强调的是,每次计数我们都是设置了 3 个键。

多了解自己的产品需求和技术难度,当对 Redis 数据结构和数据操作都非常了解熟悉之后,就可以想到非常多的 Redis 应用场景了。

分片和集群管理

之前我们讨论的都是单机 Redis,而在大型网站应用中,热点数据量往往非常大,一个实例可能会放不下。无论是物理主机还是云主机,内存资源都是有限的,这个时候就要考虑横向扩展:由多台主机协同提供服务。现在多核 CPU、几十 GB 内存、SSD 都非常普遍,硬件资源成本却越来越低。当单机的 Redis 实例不能满足需要时,我们可以通过一致性哈希(Consistent Hashing)算法将 Redis 数据的键进行散列,通过哈希函数,让特定的键映射到特定的 Redis 节点上:这就是分片。在 Redis 3.0 之前,需要借助客户端或者代理实现分片。遗憾的是,Redis 的 Python 客户端 redis-py 目前还没有实现这个分片功能。

常见的分片和集群管理方式有如下三种。

1.Twemproxy:它是在 Redis 3.0 之前通用的方式,它是 Twitter 开发的一个支持 Mem-cached ASCII 和 Redis 协议的、单线程,使用 C 编写的代理。一般一个 Redis 应用会由多个 Twemproxy 来管理,少数 Twemproxy 负责写,多数负责读。通常使用 Redis 自带的 Sentinel 来实现故障的自动切换以达到高可用。Twemproxy 可以定时向 Redis Sentinel 拉取信息,从而替换出现异常的节点。它最大的缺点是无法平滑地扩容/缩容,不便于运维;其次是没有友好的控制面板。需要注意的是,看上去 Twitter 已经不再继续维护它了(http://bit.ly/28W8aHV )。

2.Redis Cluster:它是 Redis 3.0 添加的集群方式,也是未来自动分片和高可用的首选方式。这种方式使用数据分片而非一致性哈希来实现,简单地说,就是一个 Redis 集群包含 16,384 个哈希槽,数据库中的每个键都属于这 16,384 个哈希槽的其中一个(使用 CRC16 函数对键获得校检值,对 16384 取余来计算键属于哪个槽)。集群中的每个节点负责处理一部分哈希槽,当添加新的节点时,只需要将对应的某些槽移动到新添加的节点上;当移出节点时,就把这个被移出节点上面的槽分配到其他节点上,而且这个添加和下线的过程不会阻塞整个集群。这个思路很好,只是目前还没有看到大型网站成功的案例,需要继续关注。

3.Codis:豌豆荚在生产环境使用的 Redis 分布式集群解决方案。对于上层的应用来说,连接到 Codis 代理和连接原生的 Redis Server 没有明显的区别(有少量还不支持的命令(http://bit.ly/28Y4TX3 )),上层应用可以像使用单机的 Redis 一样使用,Codis 底层会处理请求的转发,不停机的数据迁移等工作,所有后边的一切事情,对于前面的客户端来说是透明的,可以简单地认为后边连接的是一个内存无限大的 Redis 服务。它还兼容 Twemproxy,而且能很容易地把数据从 Twemproxy 迁到 Codis 上,让运维和监控更方便。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。