使用 DPark
DPark 是一个基于 Mesos 的集群计算框架,是 Spark 的 Python 实现版本,类似于 MapReduce,但是比其更灵活。DPark 有如下特点:
- 它是基于 Python 实现的数据处理平台。
- 快速开发,快速运算,高吞吐量,可以轻松处理海量数据。
- 支持循环迭代计算。DPark 摆脱了传统 MapReduce 平台对迭代计算的限制,利用内存计算资源加速计算。
- 高健壮性,能容忍局部错误。DPark 实现了 RDD(Resilient Distributed Datasets),在出现局部故障的时候可以自动重试计算,避免整个任务失败。
- 原生对 MooseFS 的支持。DPark 可以智能分析数据分布,来调度计算资源分布,尽可能实现 I/O 本地化。
分布式文件系统 MooseFS
NFS(Network File System)即网络文件系统,允许网络中的计算机之间通过 TCP/IP 网络共享资源。也就是说,本地 NFS 的客户端应用可以透明地读写位于远端 NFS 服务器上的文件,就像访问本地文件一样。这样做的好处是,既节省本地存储空间,又达到数据共享。但是 NFS 性能很差,当用户访问量大的时候会让 NFS 不堪重负,而且存在单点故障:一旦 NFS 服务器发生故障,所有靠共享提供数据的应用就不再可用。这个时候应使用分布式文件系统:服务器之间的数据访问不再是一对多的关系(1 个 NFS 服务器,多个 NFS 客户端),而是多对多的关系,性能得到提升的同时也解决了单点故障。MooseFS(以下简称 MFS)是分布式文件系统中非常知名的一个方案,选择它是出于以下的原因:
- 使用简单。MFS 的安装、部署和配置都很容易,很快就可以跑起来服务。
- 支持在线扩容。无须停止服务就可以扩容。
- 使用方便。通过挂载映射就能像访问本地文件一样来访问文件服务器资源。
MFS 系统由 4 部分组成。
1.元数据服务器(Master):负责管理所有文件的系统,保管每一个文件的元数据(如大小、文件存放位置等)。
2.数据存储服务器(Chunkserver):真正存储用户数据的服务器。存储文件时,首先把文件分成块,这些块在数据存储服务器之间复制,一般有多个数据服务器。
3.元数据日志服务器(Metalogger):负责备份 Master 的变化日志文件,文件类型为 changelog_ml.*.mfs,以便于在 Master 出问题的时候接替其进行工作。
4.客户端(Client):使用 MFS 文件系统来存储和访问的主机称为 MFS 的客户端。成功挂接 MFS 文件系统以后,就可以像以前使用 NFS 一样共享这个虚拟的存储了。
Mesos
Mesos 是一个集群管理器,用来作为资源统一管理与调度平台,让你就像使用一台服务器一样使用整个集群。Mesos 有如下特性:
- 可扩展到 10,000 个节点。
- 支持多种应用,如 Hadoop、Spark、Kafka、Elastic Search 等。
- 具备多资源调度能力,可调度内存、CPU、磁盘、端口等。
- 提供 Java、Python、C++等多种语言的 API。
- 提供一个 Web 界面查看集群状态。
Mesos 由如下 5 个组件组成。
- Mesos-Master:负责管理各个 Framework 和 Slave,并将 Slave 上的资源分配给各个 Framework。
- Mesos-Slave:负责管理本节点上的各个任务(Task),比如为各个 Executor 分配资源。
- Framework:计算框架,如 Hadoop,Spark 等,本节将使用 DPark。
- Executor:执行器,安装到 Mesos-Slave 上,用于启动计算框架中的任务。
- Scheduler:调度器,通过注册到 Mesos-Master 来获取集群资源。
一般 Framework 调度运行一个任务,遵循如下的流程:
1.SlaveX 向 Master 报告自己的资源状况,比如有 24 个 CPU 和 64 GB 内存可用。
2.Master 使用 Resource Offers(资源供给)实现跨应用细粒度资源共享,如 CPU、内存、磁盘、网络等。Master 根据公平共享、优先级等策略来决定分配多少资源给 Framework,发送给 FrameworkX,其中描述 SlaveX 有多少可用资源。
3.FrameworkX 中的 Scheduler 会答复 Master 有什么任务需要运行在 SlaveX,以及每个任务需要什么样的资源,比如需要 2 个 CPU、1GB 内存。
4.Master 发送这些任务给 SlaveX。如果 SlaveX 还有未使用的 CPU/内存资源,还可以把这些资源提供给 FrameworkY,重复第 2~4 步。
配置 DPark 环境
基于 MooseFS+Mesos 搭建一个 Dpark 环境,服务器架构如表 11.1 所示。
表 11.1 服务器架构
Server | MooseFS 角色 | Mesos 角色 |
192.168.1.230 | Master/Client | Mesos-Master |
192.168.1.231 | Chunkserver/Client | Mesos-Slave |
测试环境,没有用到 Metalogger。
在 DPark 项目源码的 docker 目录下存放了一些 Docker 配置,但是为了增加通用性,使用最新的 MooseFS 和 Mesos。由于编译安装 Mesos 会占用大量内存,所以需要开启交换分区充当内存。
首先搭建 DPark 通用环境:
> sudo apt-get-y install build-essential python-dev python-boto libcurl4-nss-dev libsasl2-dev libsasl2-modules maven libapr1-dev libsvn-dev > cd/srv
安装最新版 MooseFS:
> wget http://ppa.moosefs.com/src/moosefs-3.0.74-1.tar.gz > tar zxf moosefs-3.0.74-1.tar.gz > cd moosefs-3.0.74 > ./configure > sudo make install > cd-
安装最新版 Mesos:
> wget http://mirrors.cnnic.cn/apache/mesos/0.28.1/mesos-0.28.1.tar.gz > tar zxf mesos-0.28.1.tar.gz > cd mesos-0.28.1 > ./bootstrap > mkdir build > cd build > ../configure--disable-java > sudo make install > cd -
克隆 DPark 代码:
> git clone https://github.com/douban/dpark
设置 mfs 用户及其他:
> sudo pip install -r dpark/docker/base/scripts/requirements.txt > sudo mkdir/mfs > sudo useradd -r moosefs > sudo mkdir -p/var/run/mfs > sudo chown moosefs.moosefs/var/run/mfs > sudo ldconfig > sudo mkdir -p/var/{log,lib}/mesos
Mesos 集群管理可以使用 Mesos 提供的方法,集群设置的文件都存在/usr/local/etc/mesos 目录下,我们可以改 230 服务器上的设置,然后这个目录同步到其他 Mesos 服务器上。
> cat/usr/local/etc/mesos/masters 192.168.1.230 # 指定 Mesos-Master 的 IP > cat/usr/local/etc/mesos/slaves 192.168.1.231 # 指定 Mesos-Slave 的 IP,当前环境下只有一个 Slave > cat/usr/local/etc/mesos/mesos-master-env.sh # 启动 Mesos-Master 用到的设置 IP=`ifconfig eth1|head-2|tail-1|awk '{print$2}'|cut-c 6-` # eth0 是虚拟机 默认的 NAT 网卡,这里新增网卡 eth1 用来和其他服务器通信 export MESOS_ip=${IP} export MESOS_log_dir=/var/log/mesos export MESOS_work_dir=/var/lib/mesos > cat/usr/local/etc/mesos/mesos-slave-env.sh # 启动 Mesos-Slave 用到的设置 IP=`ifconfig eth1|head-2|tail-1|awk '{print$2}'|cut-c 6-` export MESOS_ip=${IP} export MESOS_master=192.168.1.230:5050 export MESOS_log_dir=/var/log/mesos export MESOS_work_dir=/var/lib/mesos
修改/etc/hosts 文件,添加绑定主机名部分。每个服务器上都需要添加:
192.168.1.230 dpark1 192.168.1.231 dpark2
现在是 192.168.1.230 的设置:
> sudo cp dpark/docker/master/etc/* /etc > sudo cp dpark/docker/master/mfs/* /usr/local/etc/mfs/ > sudo cp /usr/local/var/mfs/metadata.mfs.empty /var/run/mfs/metadata.mfs > mknod /dev/fuse c 10 229 > mfsmaster -d > mfsmount /mfs-H 192.168.1.230
作为 Mesos-Master,先设置与其他 Mesos-Slave 的 SSH 信任。需要注意,Ubuntu 默认不允许 root 用户使用 SSH 登录,所以需要修改/etc/ssh/sshd_config 文件,设置“PermitRootLogin yes”,然后重启 sshd。
非常重要的一步,修改主机名,每个服务器都要保证主机名是正确的:
> sudo hostname dpark2 # 退出,重新登录后生效,但是需要注意,这样修改的话,重启 后会失效 > sudo sh-c"echo 'dpark2'>/etc/hostname" # 保证重启后永久生效
如果重启没有生效,还需要添加或者更新/etc/hosts:
127.0.1.1 dpark2
再使用“sudo reboot”重启就可以了。
然后是 192.168.1.231 的设置:
> sudo cp dpark/docker/slave/mfs/* /usr/local/etc/mfs > sudo mkdir /mfsdata > sudo chown moosefs.moosefs/mfsdata > MASTER_IP=192.168.1.230 > mknod /dev/fuse c 10 229 > mfschunkserver -d # 启动 mfschunkserver,可以使用 Supervisor 管理 > sed -i "s/#MASTER#/${MASTER_IP}/g" /usr/local/etc/mfs/mfschunkserver.cfg > sudo sh -c "echo 'dpark2' > /etc/hostname" > mfsmount /mfs-H${MASTER_IP} > sudo sh -c "echo 'dpark2' > /etc/hostname" > sudo hostname dpark2
同样需要退出,再重新登录,至此配置完成。
在 Mesos-Master 上就可以使用 Mesos 自带的脚本管理集群了。启动集群:
> sudo mesos-start-cluster.sh
如果想关闭的话就使用:
> sudo mesos-stop-cluster.sh
需要注意,每个想要运行的 DPark 的服务器上都要有/etc/dpark.conf 配置。配置如下:
> cat/etc/dpark.conf MESOS_MASTER="192.168.1.230:5050" MOOSEFS_MOUNT_POINTS={ '/mfs':'192.168.1.230' } MOOSEFS_DIR_CACHE=True MEM_PER_TASK=200.0 # 默认一个任务使用 1GB 内存,但是通过配置文件可以修改这个值, 现在设置为 200 MB
从 WordCount 开始
假设现在有一个几 GB 的文本文件,想统计文件中出现的频率最高的前三个单词。使用 Python 可以这样实现:
result=defaultdict(int) with open('/mfs/sample.txt') as f: for line in f: for word in filter(None, line.rstrip().split(' ')): result[word]+=1 print sorted(result.items(), key=lambda (x,y):y, reverse=True)[:3]
如果使用 DPark,则可以用如下代码实现(wordcount.py):
from dpark import DparkContext dpark=DparkContext() def parse(line): for word in filter(None, line.rstrip().split(' ')): yield word, 1 print dpark.textFile('/mfs/sample.txt')\ .flatMap(parse)\ .reduceByKey(lambda x,y:x+y)\ .top(3, lambda (x,y):y)
假设这个文件的内容是:
Hello World Hello Python
我们来解析下这 4 行代码。
1.dpark.textFile 是从 MFS 上找到文件,读取文件或者目录。它可以并行读取已经分块压缩的文件。
2.flatMap 把内容根据设置的解析函数(这里是 parse 函数)拆成了单词:
In : f=open('/mfs/sample.txt') # makeRDD/parallelize 将本地内存中的 list 变成一个 RDD In : b=dpark.parallelize(f.readlines()) In : f=b.flatMap(parse) In : f.collect() # 使用 LocalScheduler 来本地执行 Out : [('Hello', 1), ('World', 1), ('Hello', 1), ('Python', 1)]
3.reduceByKey:根据键对结果合并。
# 相当于从一个 RDD 变为另外一个 RDD 的函数 In : r=f.reduceByKey(lambda x,y:x+y) In : r.collect() Out : [('Python', 1), ('World', 1), ('Hello', 2)]
4.top:按值的数量排序,取数量最多的前 N 个。
In : r.top(2, lambda (x, y):y) # 取前 2 个 Out : [('Hello', 2), ('Python', 1)]
使用 Mesos 集群运行它:
> python wordcount.py -m mesos
其中“-m/--master”用来指定计算集群的位置。它支持 5 种模式。
- local:使用本地模式,不加参数的默认模式。
- process:使用本地多进程模式。
- mesos:根据配置文件或 MESOS_MASTER 环境变量寻找计算集群。export MESOS_MASTER=zk://zk1:2181,zk2:2181,zk3:2181/mesos_master
- host[:port]:根据 ip 和 port 来指定集群位置。
- zk://hosts/path:根据 Zookeeper 集群地址和路径来寻找计算集群。
其他常用参数如表 11.2 所示。
表 11.2 其他常用的参数
参数 | 含义 |
-M/–mem | 指定每个任务默认使用的内存大小。如不指定,则默认是 200 MB |
-p/–parallel | 指定每个计算节点上最多可以同时执行多少任务,可以用来限制任务并发程度 |
-g/–group | 只在指定的计算节点组上运行 |
你可能遇到浮点数问题,比如 CPU 要求 1.01 个,超出了真实的 1 个,造成任务卡住,那么可以添加-c 参数,设定需要使用的 CPU 数,如:
> python wordcount.py -m dpark1:5050 -c 0.5
还可以通过 Dpark 使用指南(http://bit.ly/23gduHc )获取其他常用函数的用法,其中还有开发的注意事项、性能调优方面的指导。DPark 项目的 examples 下有一些常见需求的例子,可以用来作为进一步资料了解。dgrep 和 tools 目录下的 drun 是非常好用的工具。drun 把任务分配到各计算节点来运行,避免大家的程序都拥堵在同一台机器运行。如果任务和服务器节点无关,可以开启。至于 dgrep,如果 grep 的是目标是共享内容(如在 MFS 上),可以让各计算节点一起来运行。
PV&UV 统计
PV/UV 是网站分析中最基础、最常见的指标。PV 是 Page View 的缩写,表示页面浏览量,UV 是 Unique Visitor 的缩写,表示独立访客量。
网站都会记录多种类型的日志,如用来做数据分析、算法推荐、产品决策等。假设现在有一种类型的日志,记录了用户 ID、访问的 URL、浏览器类型、访问时间等。日志的格式如下:
2016/06/08\t11:39:59\t31632288\twww.dongwm.com/movie/12345\t22\t200\t1349387474\twww. google.com\t2
字段之间用 Tab 分开,其含义如表 11.3 所示。
表 11.3 日志中的字段及其含义
字段 | 含义 |
date | 访问日期 |
time | 访问时间 |
uid | 用户 ID |
url | 访问的 URL |
browser_type | 浏览器类型,它由用户代理转换获得 |
statue_code | 访问返回的状态码 |
encrypted_ip | 转换为整数的 IP |
url_referer | 本次访问的 Referer 来源 |
access_type | 访问类型,类型包含移动设备(1)、PC(2)、爬虫(3)等 |
通常每次都会有多个 PV/UV 需求,日期一般是一个范围,假如每个需求都使用 DPark 运行一遍的话未免效率太低下了(因为这是在重复地解析相同的日志)。可以通过如下方式一次性获得多种 PV/UV 的数据。
首先定义一个去除异常日志的函数,去掉状态码大于或等于 400、访问类型大于或等于 3 的访问记录:
def exclude_unusual(f): @wraps(f) def wrapper(log,*a,**kw): if int(log.access_type)<3 and int(log.statue_code)<400: return f(log,*a,**kw) return [] return wrapper
为了方便地使用日志属性,使用 collections.namedtuple 定义了一个 Weblog 的类:
import collections _Weblog=collections.namedtuple('Weblog', [ 'date', 'time', 'uid', 'url', 'browser_type', 'statue_code', 'encrypted_ip', 'url_referer', 'access_type'] ) class Weblog(_Weblog): @classmethod def from_line(cls, line): fields=line.strip().split('\t') return cls(*fields)
每条日志都可以转化为 Weblog 实例,如上面的那条记录,log.date 的值就是“2016/06/08”,log.browser_type 的值就是 22。
先给 UV 和 PV 定义 Runner 基类,添加一些共用的方法:
class BaseRunner(object): def __init__(self, log_path=None, date_=None, match_rules=None): if log_path is None: log_path='/mfs/log/web_log' self.date=[] if date_is None else date_ self.match_rules={}if match_rules is None else match_rules
通过 self.date 传入日期范围,而通过 self.match_rules 传入多个需要执行的规则。
_filter_func 方法收集了全部的过滤规则,但是作为装饰器传入,在 Dpark 中才执行:
def _filter_func(self): filter_func_list=self.match_rules.values() def wrapper(log,*args,**kwargs): return any(func(log,*args,**kwargs) for func in filter_func_list) return wrapper
通过下面两个方法获得日期范围内的全部日志文件列表:
import glob import itertools def _get_paths_by_date(self, date_): return glob.glob(os.path.join( self.log_path, '*', '*', '*', date_.strftime('%Y%m%d') )) @property def paths(self): return itertools.chain(self._get_paths_by_date(date_) for date_in self.date)
然后把 Dpark 的用法封装起来:
def get_rdd(self): dpark=DparkContext() return dpark.union( [dpark.textFile(path, splitSize=64<<20) for path in self.paths] ).map(Weblog.from_line) def get_flat_mapped_rdd(self): filter_func=self._filter_func() map_func=self._map_func() return (self.get_rdd() .filter(filter_func) .map(map_func) .flatMap(lambda x:x))
其中_map_func 由于 PV/UV 的归纳方式不同,需要在子类中实现;get_flat_mapped_rdd 方法抽象了 PV/UV 使用 DPark 的相同部分。
PVRunner/UVRunner 的实现如下:
class PVRunner(BaseRunner): def _map_func(self): def wrapper(log,*args,**kw): values=[] for k, v in self.match_rules.iteritems(): if v(log,*args,**kw): values.append((k, 1)) return values return wrapper def get_result(self): flat_mapped_rdd=self.get_flat_mapped_rdd() return (flat_mapped_rdd.reduceByKey(lambda x, y:x+y) .collectAsMap()) class UVRunner(BaseRunner): def _map_func(self): def wrapper(log,*args,**kw): values=[] for k, v in self.match_rules.iteritems(): if v(log,*args,**kw): values.append((k, log.uid, 1)) return values return wrapper def get_result(self): flat_mapped_rdd=self.get_flat_mapped_rdd() return (flat_mapped_rdd.uniq() .map(lambda x:(x[0], 1)) .reduceByKey(lambda x, y:x+y) .collectAsMap())
现在需要统计访问/subject/、/item/、去掉异常访问的/subject/这三种数据,在 2016 年 5 月 22 日的 PV 数,以及 2016 年 5 月 22 日到 2016 年 5 月 27 日(包含)的 UV 数。可以这样写:
from tyrion import PVRunner, UVRunner, exclude_unusual def is_subject(log): return 'show/' in log.url def is_item(log): return 'item/' in log.url match_rules={ 'item':is_item, 'subject':is_subject, 'excluded_subject':exclude_unusual(is_subject) } runner=PVRunner(date(2016, 5, 22)) runner.match_rules=match_rules pv=runner.get_result() date_range=[date(2016, 5, 22)+timedelta(days=i) for i in range(6)] runner=UVRunner(date_range) runner.match_rules=match_rules uv=runner.get_result()
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论