返回介绍

使用 DPark

发布于 2025-04-20 18:52:17 字数 14222 浏览 0 评论 0 收藏

DPark 是一个基于 Mesos 的集群计算框架,是 Spark 的 Python 实现版本,类似于 MapReduce,但是比其更灵活。DPark 有如下特点:

  • 它是基于 Python 实现的数据处理平台。
  • 快速开发,快速运算,高吞吐量,可以轻松处理海量数据。
  • 支持循环迭代计算。DPark 摆脱了传统 MapReduce 平台对迭代计算的限制,利用内存计算资源加速计算。
  • 高健壮性,能容忍局部错误。DPark 实现了 RDD(Resilient Distributed Datasets),在出现局部故障的时候可以自动重试计算,避免整个任务失败。
  • 原生对 MooseFS 的支持。DPark 可以智能分析数据分布,来调度计算资源分布,尽可能实现 I/O 本地化。

分布式文件系统 MooseFS

NFS(Network File System)即网络文件系统,允许网络中的计算机之间通过 TCP/IP 网络共享资源。也就是说,本地 NFS 的客户端应用可以透明地读写位于远端 NFS 服务器上的文件,就像访问本地文件一样。这样做的好处是,既节省本地存储空间,又达到数据共享。但是 NFS 性能很差,当用户访问量大的时候会让 NFS 不堪重负,而且存在单点故障:一旦 NFS 服务器发生故障,所有靠共享提供数据的应用就不再可用。这个时候应使用分布式文件系统:服务器之间的数据访问不再是一对多的关系(1 个 NFS 服务器,多个 NFS 客户端),而是多对多的关系,性能得到提升的同时也解决了单点故障。MooseFS(以下简称 MFS)是分布式文件系统中非常知名的一个方案,选择它是出于以下的原因:

  • 使用简单。MFS 的安装、部署和配置都很容易,很快就可以跑起来服务。
  • 支持在线扩容。无须停止服务就可以扩容。
  • 使用方便。通过挂载映射就能像访问本地文件一样来访问文件服务器资源。

MFS 系统由 4 部分组成。

1.元数据服务器(Master):负责管理所有文件的系统,保管每一个文件的元数据(如大小、文件存放位置等)。

2.数据存储服务器(Chunkserver):真正存储用户数据的服务器。存储文件时,首先把文件分成块,这些块在数据存储服务器之间复制,一般有多个数据服务器。

3.元数据日志服务器(Metalogger):负责备份 Master 的变化日志文件,文件类型为 changelog_ml.*.mfs,以便于在 Master 出问题的时候接替其进行工作。

4.客户端(Client):使用 MFS 文件系统来存储和访问的主机称为 MFS 的客户端。成功挂接 MFS 文件系统以后,就可以像以前使用 NFS 一样共享这个虚拟的存储了。

Mesos

Mesos 是一个集群管理器,用来作为资源统一管理与调度平台,让你就像使用一台服务器一样使用整个集群。Mesos 有如下特性:

  • 可扩展到 10,000 个节点。
  • 支持多种应用,如 Hadoop、Spark、Kafka、Elastic Search 等。
  • 具备多资源调度能力,可调度内存、CPU、磁盘、端口等。
  • 提供 Java、Python、C++等多种语言的 API。
  • 提供一个 Web 界面查看集群状态。

Mesos 由如下 5 个组件组成。

  • Mesos-Master:负责管理各个 Framework 和 Slave,并将 Slave 上的资源分配给各个 Framework。
  • Mesos-Slave:负责管理本节点上的各个任务(Task),比如为各个 Executor 分配资源。
  • Framework:计算框架,如 Hadoop,Spark 等,本节将使用 DPark。
  • Executor:执行器,安装到 Mesos-Slave 上,用于启动计算框架中的任务。
  • Scheduler:调度器,通过注册到 Mesos-Master 来获取集群资源。

一般 Framework 调度运行一个任务,遵循如下的流程:

1.SlaveX 向 Master 报告自己的资源状况,比如有 24 个 CPU 和 64 GB 内存可用。

2.Master 使用 Resource Offers(资源供给)实现跨应用细粒度资源共享,如 CPU、内存、磁盘、网络等。Master 根据公平共享、优先级等策略来决定分配多少资源给 Framework,发送给 FrameworkX,其中描述 SlaveX 有多少可用资源。

3.FrameworkX 中的 Scheduler 会答复 Master 有什么任务需要运行在 SlaveX,以及每个任务需要什么样的资源,比如需要 2 个 CPU、1GB 内存。

4.Master 发送这些任务给 SlaveX。如果 SlaveX 还有未使用的 CPU/内存资源,还可以把这些资源提供给 FrameworkY,重复第 2~4 步。

配置 DPark 环境

基于 MooseFS+Mesos 搭建一个 Dpark 环境,服务器架构如表 11.1 所示。

表 11.1 服务器架构

ServerMooseFS 角色Mesos 角色
192.168.1.230Master/ClientMesos-Master
192.168.1.231Chunkserver/ClientMesos-Slave

测试环境,没有用到 Metalogger。

在 DPark 项目源码的 docker 目录下存放了一些 Docker 配置,但是为了增加通用性,使用最新的 MooseFS 和 Mesos。由于编译安装 Mesos 会占用大量内存,所以需要开启交换分区充当内存。

首先搭建 DPark 通用环境:

> sudo apt-get-y install build-essential python-dev python-boto libcurl4-nss-dev
     libsasl2-dev libsasl2-modules maven libapr1-dev libsvn-dev
> cd/srv

安装最新版 MooseFS:

> wget http://ppa.moosefs.com/src/moosefs-3.0.74-1.tar.gz
> tar zxf moosefs-3.0.74-1.tar.gz
> cd moosefs-3.0.74
> ./configure
> sudo make install
> cd-

安装最新版 Mesos:

> wget http://mirrors.cnnic.cn/apache/mesos/0.28.1/mesos-0.28.1.tar.gz
> tar zxf mesos-0.28.1.tar.gz
> cd mesos-0.28.1
> ./bootstrap
> mkdir build
> cd build
> ../configure--disable-java
> sudo make install
> cd -

克隆 DPark 代码:

> git clone https://github.com/douban/dpark

设置 mfs 用户及其他:

> sudo pip install -r dpark/docker/base/scripts/requirements.txt
> sudo mkdir/mfs
> sudo useradd -r moosefs
> sudo mkdir -p/var/run/mfs
> sudo chown moosefs.moosefs/var/run/mfs
> sudo ldconfig
> sudo mkdir -p/var/{log,lib}/mesos

Mesos 集群管理可以使用 Mesos 提供的方法,集群设置的文件都存在/usr/local/etc/mesos 目录下,我们可以改 230 服务器上的设置,然后这个目录同步到其他 Mesos 服务器上。

> cat/usr/local/etc/mesos/masters
192.168.1.230 # 指定 Mesos-Master 的 IP
> cat/usr/local/etc/mesos/slaves
192.168.1.231 # 指定 Mesos-Slave 的 IP,当前环境下只有一个 Slave
> cat/usr/local/etc/mesos/mesos-master-env.sh # 启动 Mesos-Master 用到的设置
IP=`ifconfig eth1|head-2|tail-1|awk '{print$2}'|cut-c 6-` # eth0 是虚拟机
    默认的 NAT 网卡,这里新增网卡 eth1 用来和其他服务器通信
export MESOS_ip=${IP}
export MESOS_log_dir=/var/log/mesos
export MESOS_work_dir=/var/lib/mesos
> cat/usr/local/etc/mesos/mesos-slave-env.sh # 启动 Mesos-Slave 用到的设置
IP=`ifconfig eth1|head-2|tail-1|awk '{print$2}'|cut-c 6-`
export MESOS_ip=${IP}
export MESOS_master=192.168.1.230:5050
export MESOS_log_dir=/var/log/mesos
export MESOS_work_dir=/var/lib/mesos

修改/etc/hosts 文件,添加绑定主机名部分。每个服务器上都需要添加:

192.168.1.230 dpark1
192.168.1.231 dpark2

现在是 192.168.1.230 的设置:

> sudo cp dpark/docker/master/etc/* /etc
> sudo cp dpark/docker/master/mfs/* /usr/local/etc/mfs/
> sudo cp /usr/local/var/mfs/metadata.mfs.empty /var/run/mfs/metadata.mfs
> mknod /dev/fuse c 10 229
> mfsmaster -d
> mfsmount /mfs-H 192.168.1.230

作为 Mesos-Master,先设置与其他 Mesos-Slave 的 SSH 信任。需要注意,Ubuntu 默认不允许 root 用户使用 SSH 登录,所以需要修改/etc/ssh/sshd_config 文件,设置“PermitRootLogin yes”,然后重启 sshd。

非常重要的一步,修改主机名,每个服务器都要保证主机名是正确的:

> sudo hostname dpark2 # 退出,重新登录后生效,但是需要注意,这样修改的话,重启
     后会失效
> sudo sh-c"echo 'dpark2'>/etc/hostname" # 保证重启后永久生效

如果重启没有生效,还需要添加或者更新/etc/hosts:

127.0.1.1 dpark2

再使用“sudo reboot”重启就可以了。

然后是 192.168.1.231 的设置:

> sudo cp dpark/docker/slave/mfs/* /usr/local/etc/mfs
> sudo mkdir /mfsdata
> sudo chown moosefs.moosefs/mfsdata
> MASTER_IP=192.168.1.230
> mknod /dev/fuse c 10 229
> mfschunkserver -d # 启动 mfschunkserver,可以使用 Supervisor 管理
> sed -i "s/#MASTER#/${MASTER_IP}/g" /usr/local/etc/mfs/mfschunkserver.cfg
> sudo sh -c "echo 'dpark2' > /etc/hostname"
> mfsmount /mfs-H${MASTER_IP}
  
> sudo sh -c "echo 'dpark2' > /etc/hostname"
> sudo hostname dpark2

同样需要退出,再重新登录,至此配置完成。

在 Mesos-Master 上就可以使用 Mesos 自带的脚本管理集群了。启动集群:

> sudo mesos-start-cluster.sh

如果想关闭的话就使用:

> sudo mesos-stop-cluster.sh

需要注意,每个想要运行的 DPark 的服务器上都要有/etc/dpark.conf 配置。配置如下:

> cat/etc/dpark.conf
MESOS_MASTER="192.168.1.230:5050"
MOOSEFS_MOUNT_POINTS={
     '/mfs':'192.168.1.230'
}
MOOSEFS_DIR_CACHE=True
MEM_PER_TASK=200.0 # 默认一个任务使用 1GB 内存,但是通过配置文件可以修改这个值,
    现在设置为 200 MB

从 WordCount 开始

假设现在有一个几 GB 的文本文件,想统计文件中出现的频率最高的前三个单词。使用 Python 可以这样实现:

result=defaultdict(int)
with open('/mfs/sample.txt') as f:
     for line in f:
         for word in filter(None, line.rstrip().split(' ')):
             result[word]+=1
     print sorted(result.items(), key=lambda (x,y):y, reverse=True)[:3]

如果使用 DPark,则可以用如下代码实现(wordcount.py):

from dpark import DparkContext
    
dpark=DparkContext()
    
def parse(line):
    for word in filter(None, line.rstrip().split(' ')):
        yield word, 1
    
print dpark.textFile('/mfs/sample.txt')\
                    .flatMap(parse)\
                    .reduceByKey(lambda x,y:x+y)\
                    .top(3, lambda (x,y):y)

假设这个文件的内容是:

Hello World
Hello Python

我们来解析下这 4 行代码。

1.dpark.textFile 是从 MFS 上找到文件,读取文件或者目录。它可以并行读取已经分块压缩的文件。

2.flatMap 把内容根据设置的解析函数(这里是 parse 函数)拆成了单词:

In : f=open('/mfs/sample.txt')
# makeRDD/parallelize 将本地内存中的 list 变成一个 RDD
In : b=dpark.parallelize(f.readlines())
In : f=b.flatMap(parse)
In : f.collect() # 使用 LocalScheduler 来本地执行
Out : [('Hello', 1), ('World', 1), ('Hello', 1), ('Python', 1)]

3.reduceByKey:根据键对结果合并。

# 相当于从一个 RDD 变为另外一个 RDD 的函数
In : r=f.reduceByKey(lambda x,y:x+y)
In : r.collect()
Out : [('Python', 1), ('World', 1), ('Hello', 2)]

4.top:按值的数量排序,取数量最多的前 N 个。

In : r.top(2, lambda (x, y):y) # 取前 2 个
Out : [('Hello', 2), ('Python', 1)]

使用 Mesos 集群运行它:

> python wordcount.py -m mesos

其中“-m/--master”用来指定计算集群的位置。它支持 5 种模式。

  • local:使用本地模式,不加参数的默认模式。
  • process:使用本地多进程模式。
  • mesos:根据配置文件或 MESOS_MASTER 环境变量寻找计算集群。export MESOS_MASTER=zk://zk1:2181,zk2:2181,zk3:2181/mesos_master
  • host[:port]:根据 ip 和 port 来指定集群位置。
  • zk://hosts/path:根据 Zookeeper 集群地址和路径来寻找计算集群。

其他常用参数如表 11.2 所示。

表 11.2 其他常用的参数

参数含义
-M/–mem指定每个任务默认使用的内存大小。如不指定,则默认是 200 MB
-p/–parallel指定每个计算节点上最多可以同时执行多少任务,可以用来限制任务并发程度
-g/–group只在指定的计算节点组上运行

你可能遇到浮点数问题,比如 CPU 要求 1.01 个,超出了真实的 1 个,造成任务卡住,那么可以添加-c 参数,设定需要使用的 CPU 数,如:

> python wordcount.py -m dpark1:5050 -c 0.5

还可以通过 Dpark 使用指南(http://bit.ly/23gduHc )获取其他常用函数的用法,其中还有开发的注意事项、性能调优方面的指导。DPark 项目的 examples 下有一些常见需求的例子,可以用来作为进一步资料了解。dgrep 和 tools 目录下的 drun 是非常好用的工具。drun 把任务分配到各计算节点来运行,避免大家的程序都拥堵在同一台机器运行。如果任务和服务器节点无关,可以开启。至于 dgrep,如果 grep 的是目标是共享内容(如在 MFS 上),可以让各计算节点一起来运行。

PV&UV 统计

PV/UV 是网站分析中最基础、最常见的指标。PV 是 Page View 的缩写,表示页面浏览量,UV 是 Unique Visitor 的缩写,表示独立访客量。

网站都会记录多种类型的日志,如用来做数据分析、算法推荐、产品决策等。假设现在有一种类型的日志,记录了用户 ID、访问的 URL、浏览器类型、访问时间等。日志的格式如下:

2016/06/08\t11:39:59\t31632288\twww.dongwm.com/movie/12345\t22\t200\t1349387474\twww.
   google.com\t2

字段之间用 Tab 分开,其含义如表 11.3 所示。

表 11.3 日志中的字段及其含义

字段含义
date访问日期
time访问时间
uid用户 ID
url访问的 URL
browser_type浏览器类型,它由用户代理转换获得
statue_code访问返回的状态码
encrypted_ip转换为整数的 IP
url_referer本次访问的 Referer 来源
access_type访问类型,类型包含移动设备(1)、PC(2)、爬虫(3)等

通常每次都会有多个 PV/UV 需求,日期一般是一个范围,假如每个需求都使用 DPark 运行一遍的话未免效率太低下了(因为这是在重复地解析相同的日志)。可以通过如下方式一次性获得多种 PV/UV 的数据。

首先定义一个去除异常日志的函数,去掉状态码大于或等于 400、访问类型大于或等于 3 的访问记录:

def exclude_unusual(f):
    @wraps(f)
    def wrapper(log,*a,**kw):
    if int(log.access_type)<3 and int(log.statue_code)<400:
            return f(log,*a,**kw)
    return []
    return wrapper

为了方便地使用日志属性,使用 collections.namedtuple 定义了一个 Weblog 的类:

import collections
     
_Weblog=collections.namedtuple('Weblog', [
    'date', 'time', 'uid', 'url', 'browser_type', 'statue_code',
    'encrypted_ip', 'url_referer', 'access_type']
)
    
class Weblog(_Weblog):
    @classmethod
    def from_line(cls, line):
        fields=line.strip().split('\t')
        return cls(*fields)

每条日志都可以转化为 Weblog 实例,如上面的那条记录,log.date 的值就是“2016/06/08”,log.browser_type 的值就是 22。

先给 UV 和 PV 定义 Runner 基类,添加一些共用的方法:

class BaseRunner(object):
    def __init__(self, log_path=None, date_=None, match_rules=None):
        if log_path is None:
            log_path='/mfs/log/web_log'
        self.date=[] if date_is None else date_
        self.match_rules={}if match_rules is None else match_rules

通过 self.date 传入日期范围,而通过 self.match_rules 传入多个需要执行的规则。

_filter_func 方法收集了全部的过滤规则,但是作为装饰器传入,在 Dpark 中才执行:

def _filter_func(self):
    filter_func_list=self.match_rules.values()
    
    def wrapper(log,*args,**kwargs):
        return any(func(log,*args,**kwargs)
                  for func in filter_func_list)
     
    return wrapper

通过下面两个方法获得日期范围内的全部日志文件列表:

import glob
import itertools
    
def _get_paths_by_date(self, date_):
    return glob.glob(os.path.join(
        self.log_path, '*', '*', '*', date_.strftime('%Y%m%d')
    ))
    
@property
def paths(self):
    return itertools.chain(self._get_paths_by_date(date_)
                          for date_in self.date)

然后把 Dpark 的用法封装起来:

def get_rdd(self):
    dpark=DparkContext()
        
    return dpark.union(
        [dpark.textFile(path, splitSize=64<<20)
        for path in self.paths]
    ).map(Weblog.from_line)
     
def get_flat_mapped_rdd(self):
    filter_func=self._filter_func()
    map_func=self._map_func()
    return (self.get_rdd()
           .filter(filter_func)
           .map(map_func)
           .flatMap(lambda x:x))

其中_map_func 由于 PV/UV 的归纳方式不同,需要在子类中实现;get_flat_mapped_rdd 方法抽象了 PV/UV 使用 DPark 的相同部分。

PVRunner/UVRunner 的实现如下:

class PVRunner(BaseRunner):
    
    def _map_func(self):
        def wrapper(log,*args,**kw):
            values=[]
            for k, v in self.match_rules.iteritems():
                if v(log,*args,**kw):
                    values.append((k, 1))
            return values
    return wrapper
    
    def get_result(self):
        flat_mapped_rdd=self.get_flat_mapped_rdd()
        return (flat_mapped_rdd.reduceByKey(lambda x, y:x+y)
               .collectAsMap())
    
class UVRunner(BaseRunner):
    def _map_func(self):
        def wrapper(log,*args,**kw):
            values=[]
            for k, v in self.match_rules.iteritems():
                if v(log,*args,**kw):
                    values.append((k, log.uid, 1))
            return values
    return wrapper
    
    def get_result(self):
        flat_mapped_rdd=self.get_flat_mapped_rdd()
        return (flat_mapped_rdd.uniq()
               .map(lambda x:(x[0], 1))
        .reduceByKey(lambda x, y:x+y)
        .collectAsMap())

现在需要统计访问/subject/、/item/、去掉异常访问的/subject/这三种数据,在 2016 年 5 月 22 日的 PV 数,以及 2016 年 5 月 22 日到 2016 年 5 月 27 日(包含)的 UV 数。可以这样写:

from tyrion import PVRunner, UVRunner, exclude_unusual
    
def is_subject(log):
    return 'show/' in log.url
    
def is_item(log):
    return 'item/' in log.url
    
match_rules={
    'item':is_item,
    'subject':is_subject,
    'excluded_subject':exclude_unusual(is_subject)
}
runner=PVRunner(date(2016, 5, 22))
runner.match_rules=match_rules
pv=runner.get_result()
    
date_range=[date(2016, 5, 22)+timedelta(days=i) for i in range(6)]
runner=UVRunner(date_range)
runner.match_rules=match_rules
uv=runner.get_result()

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。