使用 MapReduce 做日志分析
使用 MapReduce
MapReduce 是谷歌提出的一个编程模型,它把对数据集的大规模操作拆分后分发给网络上的多个节点,每个节点会周期性地把完成的工作和状态的更新报告回来,以实现并行计算的目的。Map 表示映射,一个映射函数读取被分配的一小部分数据任务,计算之后输出中间的键值对的集合;Reduce 表示归纳,一个归纳函数收集具有相同中间键的值,合并这些值,形成一个较小的值的集合。
在开始使用 MapReduce 之前,先看一个使用 multiprocessing 实现并发计算的例子(paral-lel.py):
import os import time import multiprocessing def task(args): time.sleep(1) pid=os.getpid() return pid, args start=time.time() pool=multiprocessing.Pool(processes=4) result=pool.map(task, range(10)) print result print 'Cost:{}'.format(time.time()-start)
把 0~9 这 10 个数作为参数传给函数 task,每个任务都 sleep 1 秒,完成 10 个任务需要多久呢?执行一下看看:
> python ~/web_develop/chapter11/section1/parallel.py [(8568, 0), (8569, 1), (8570, 2), (8571, 3), (8568, 4), (8569, 5), (8570, 6), (8571, 7) , (8568, 8), (8569, 9)] Cost:3.05770802498
虚拟机是双核 CPU,上例开启了 4 个进程,从执行的结果可以发现,执行任务的就是这 4 个进程,更好地利用了虚拟机的 CPU 资源;同时也可以看到使用 multiprocessing.Pool 的 map 方法实现了带有队列功能的并发。
有一个从某段日期的日志中获得符合条件的记录的需求,其实现步骤如下:
1.解压缩日志。为了节省空间,早期的日志会压缩后再存储,每天压缩后的日志约 200 GB,由非常多的小文件组成,单是解压缩这些文件就需要很长时间。
2.遍历每一行记录,记录的字段按\t 分隔,需要找到符合条件的记录。
3.在符合条件的全部记录中,统计符合不同条件的记录数量。
这个需求使用单进程串行的方式运行,完成一次耗时超过 24 小时,且占用大量内存。笔者接手之后改进了一版:
1.遍历过程中定期执行垃圾回收。
2.修改为多进程的方式,启动与 CPU 核数(24 核)相同的进程,每个进程通过“hash(filename)%24==0”的方式只执行对应的 1/24。每个进程把符合条件的记录存进 SQLite 数据库。全部执行完毕后查询 SQLite 获得结果。
这种方法可以把时间缩短到 2 个多小时,比之前的版本效率提高了 10 倍。但还是有不合理的地方:有些日志文件相对较大,需要的时间要多一些,累积起来的结果就是有些进程先跑完了,但是还要等那些跑得慢的进程完成,等待的过程中先跑完的进程是闲置的,并没有被充分利用。
这个时候当然可以使用队列实现进程间通信,把日志文件放入队列,其他进程从队列中取数据执行,但这样做会增加代码的复杂度。我们依据 pymotw(http://bit.ly/1WQsuvq)上的例子完成最后一版(map_reduce.py)。
首先定义抽象的 MapReduce 类:
import collections import itertools import multiprocessing class MapReduce(object): def __init__(self, map_func, reduce_func, num_workers=None): self.map_func=map_func # map 函数 self.reduce_func=reduce_func # reduce 函数 # num_workers 为 None 会使用和 CPU 核数相同数量的进程 self.pool=multiprocessing.Pool(num_workers) def partition(self, mapped_values): partitioned_data=collections.defaultdict(list) for key, value in mapped_values: partitioned_data[key].append(value) return partitioned_data.items() def __call__(self, inputs, chunksize=1): # inputs 是一个需要处理的列表,chunksize 表示每次给 mapper 的量,根据需求调整 这个值 # 第一次 pool.map 是为了把大任务分组 map_responses=self.pool.map( self.map_func, inputs, chunksize=chunksize) # chain 把 mapper 的结果链接为一个可迭代的对象 partitioned_data=self.partition(itertools.chain(*map_responses)) # 第二次 pool.map 是为了聚合结果实现 reduce,map 方法继续用来实现并行计算 reduced_values=self.pool.map(self.reduce_func, partitioned_data) return reduced_values
看完这个类就可以明白,Map 和 Reduce 是借用 map 方法实现的。MapReduce 接收两个必选参数:map 函数和 reduce 函数。map 函数接收的参数是需要处理的数据,返回过滤后的结果:
import import bz2 def mapper_match(one_file): '''第一次的 map 函数,从每个文件里面获取符合的条目''' output=[] for line in bz2.BZ2File(one_file).readlines(): l=line.rstrip().split() if l[3]=='web' and l[5]=='0': output.append((l[4], 1))
mapper_match 函数的返回值是“[(a, 1), (b, 1)]”这样的第二个值为 1 的元组组成的列表。
reduce 函数用来对函数返回的结果进行归纳:
def reducer_match(item): cookie, occurances=item return (cookie, sum(occurances))
归纳听起来不好理解,举个例子,假设 mapper 返回了“[('a', 1), ('b', 1), ('a', 1)]”,reducer 函数对这三个元素遍历,对相同的第一个元素聚合,reducer_match 函数的返回值就是“[('a', 2), ('b', 1)]”。
我们还会基于第一次的结果进行第二次 MapReduce 操作,这次把符合条件的元素的数量作为键:
def mapper_count(item): _, count=item return [(count, 1)] def reducer_count(item): freq, occurances=item return (freq, sum(occurances))
假设第一次 MapReduce 的返回值是“[('a', 2), ('b', 1), ('c', 2), ('d', 3)]”,第二次的结果就是“[(1, 1), (2, 2), (3, 1)]”,表示出现 1 次的键(b)有 1 个,出现 2 次的键(a 和 c)有 2 个,出现 3 次的键(d)也只有 1 个。
完整的日志处理过程如下:
import glob import operator input_files=glob.glob( '/home/ubuntu/web_develop/chapter11/section1/data/*.bz2') mapper=MapReduce(mapper_match, reducer_match) cookie_feq=mapper(input_files) print 'Result:{}'.format(cookie_feq) mapper=MapReduce(mapper_count, reducer_count) cookie_feq=mapper(cookie_feq) cookie_feq.sort(key=operator.itemgetter(1), reverse=True) for freq, count in cookie_feq: print '{0}\t{1}'.format(freq, count)
这个 MapReduce 的方式依赖服务器的 CPU 核数。理论上核数越多运行越快。使用这个方案,有 24 核的单个服务器完成一次计算只需要 40 几分钟,利用后面提到的 Cython 还可以再提速 30%左右。其实很多时候不是 Python 太慢,而是没有选对正确的方法。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论