- 内容提要
- 序
- 前言
- 第一部分 背景知识
- 第 1 章 Spring Data 项目
- 第 2 章 Repository:便利的数据访问层
- 第 3 章 使用 Querydsl 实现类型安全的查询
- 第二部分 关系型数据库
- 第 4 章 JPA Repository
- 第 5 章 借助 Querydsl SQL 实现类型安全的 JDBC 编程
- 第三部分 NoSQL
- 第 6 章 MongoDB: 文档存储
- 第 7 章 Neo4j:图数据库
- 第 8 章 Redis:键/值存储
- 第四部分 快速应用开发
- 第 9 章 使用 Spring Roo 实现持久层
- 第 10 章 REST Repository 导出器
- 第五部分 大数据
- 第 11 章 Spring for Apache Hadoop
- 第 12 章 使用 Hadoop 分析数据
- 第 13 章 使用 Spring Batch 和 Spring Integration 创建大数据管道
- 第六部分 数据网格
- 第 14 章 分布式数据网格:GemFire
- 关于封面
12.2 使用 Pig
要编写 MapReduce 应用来分析存储在 HDFS 中的数据,Pig 是另外一个可选方案。Pig 应用程序使用 Pig Latin 语言编写,Pig Latin 是一种高级数据处理语言,它更多地秉持了 sed 或 awk 的精神而不是像 Hive 那样提供类 SQL 的语言。Pig Latin 脚本描述了一系列的步骤,每一个步骤都会对集合中的数据项(item)进行转换。简单的步骤可以包括加载、过滤以及保存数据,也支持更复杂的操作,如基于相同的值对两个数据项进行连接操作。Pig 可以使用用户自定义函数(User-Defined Function,UDF)来扩展,它们封装了常用的功能,如特定的算法或读写常用的数据格式,如 Apache HTTPD 日志文件。PigServer 负责将 Pig Latin 脚本转换成基于 MapReduce API 的多个 Job 并执行它们。
通常 Pig Latin 脚本开发的入门方式是使用 Pig 附带的 Grunt 控制台进行交互。可以采用两种不同的运行模式来执行脚本。第一种是 LOCAL 模式,它会使用存储在本地文件系统的数据以及嵌入式版本的 Hadoop 在本地执行 MapReduce Job。第二种是 MAPREDUCE 模式,使用 HDFS 并且在 Hadoop 集群上运行 MapReduce Job。使用本地文件系统,可以使用少量的数据集以迭代的方式开发脚本。对脚本功能比较满意时,可以轻松地切换到集群中并用完整的数据集运行同样的脚本。除了使用交互式控制台或者在命令行运行 Pig 外,也可以将 Pig 嵌入到应用程序中。PigServer 类封装了以编程式连接 Pig、执行脚本以及注册函数的方法。
Spring for Apache Hadoop 能够很容易地将 PigServer 嵌入到应用程序中并以编码的方式运行 Pig Latin 脚本。因为 Pig Latin 没有像条件分支(if-else)或循环这样的控制流语句,因此 Java 可以用来填充这个空白。以编程的方式使用 Pig 时,也可以使用 Spring Integration 来执行 Pig 脚本以响应事件驱动的活动,或是使用 Spring Batch 处理较大型的工作流程。
为了熟悉 Pig,我们首先将编写一个基础的应用程序,它会使用 Pig 的命令行工具来分析 UNIX password 文件。然后我们将介绍如何借助 Spring for Apache Hadoop 开发使用 Pig 的 Java 应用程序。要获取 Pig 和 Pig Latin 安装、运行以及开发的更详细信息,可以参考项目的站点( http://pig.apache.org/ )以及《 Programming Pig 》(O’Reilly 出版)一书。
12.2.1 Hello World
作为 Hello World 级别的练习,我们将对 UNIX password 文件进行一个小型的分析,分析的目标在于建立一个特定 Shell(例如 bash 或 sh)使用者数量的报表。通过熟悉的 UNIX 工具,可以很容易地看到正在使用 bash Shell 的人数,如示例 12-15 所示。
示例 12-15 使用 UNIX 工具统计 bash Shell 用户数
为了使用 Pig 进行类似的分析,我们先将/etc/password 文件加载到 HDFS 中,如示例 12-16 所示。
示例 12-16 复制/etc/password 文件到 HDFS 中
…
要安装 Pig,首先要从 Pig 主页下载( http://pig.apache.org/ )。安装完发布版后,需要将 Pig 发布版的 bin 目标加入到路径之中,并将环境变量 PIG_CLASSPATH 指向 Hadoop 配置目录(例如,export PIG_CLASSPATH = $HADOOP_INSTALL/conf/)。
现在以 LOCAL 模式启动 Pig 交互控制台 Grunt,并执行一些 Pig Latin 命令,如示例 12-17 所示。
示例 12-17 使用 Grunt 执行 Pig Latin 命令
由于示例的数据集比较小,所有结果会被调整成一个用制表符分隔的文件,如示例 12-18 所示。
示例 12-18 命令行执行结果
在 Pig Latin 脚本中进行的数据转换流程如下,脚本的第一行把 HDFS 文件/test/passwd 中的数据加载到 passwd 变量中。LOAD 命令会获取文件在 HDFS 中的位置以及格式,文件中的行会使用这个格式进行分解进而创建数据集(也称为 Pig 关联)。在这个示例中,我们使用 PigStorage 函数来加载文本文件并用冒号来分割文件中的字段。Pig 可以将模式应用到文件中的列上,这是通过定义每列的名称与数据类型实现的。将输出结果集指定给 passwd 变量后,可以操作数据集以将其转换为其他衍生的数据集了。我们可使用 GROUP 操作来创建数据集 grouped_by_shell。grouped_by_shell 数据集会使用 Shell 的名字作为 key,并且还会包含 passwd 数据集中所有具有该 Shell 值的记录所组成的集合。如果使用 DUMP 操作查看以/bin/bash 为键的 grouped_by_shell 数据集中的内容,将会看到如示例 12-19 所示的结果。
示例 12-19 group-by-shell 结果集
Shell 名称作为键,值是具有相同键的 password 记录所组成集合或 bag。在下一行,表达式 FOREACH grouped_by_shell GENER ATE 会操作 grouped_by_shell 数据集的每一行记录并且生成新的记录。所形成的结果集会基于相同的键值对所有记录进行分组并且进行计数。
我们也可以将脚本参数化来避免硬编码,例如,输入和输出的位置。在示例 12-20 中,把所有输入到控制台的命令放入名为 password-analysis.pig 的文件中,通过 inputFile 和 outputDir 变量实现参数化。
示例 12-20 Pig 脚本参数化
使用 run 命令能够在交互式控制台运行该脚本,如示例 12-21 所示。
示例 12-21 在 Grunt 中运行参数化的 Pig 脚本
或者直接在命令行中运行脚本,如示例 12-22 所示。
示例 12-22 在命令行中运行参数化的 Pig 脚本
12.2.2 运行 PigServer
现在我们将以更结构化和程序化的方式来运行 Pig 脚本。通过 Spring for Apache Hadoop,可以很容易地在 Java 应用程序中以声明的方式配置和创建 PigServer,与 Grunt Shell 的底层实现类似。在 Spring 容器中运行 PigServer 时,能够参数化和外部化配置一些属性,这些属性会控制哪个 Hadoop 集群执行脚本、PigServer 的配置以及传入脚本的参数等。Spring 针对 Hadoop 的 XML 命名空间简化了 PigServer 的创建和配置。如示例 12-23 所示,它的配置方式与其他应用的配置方式相同。可选的 Pig 初始化脚本位置可以是任意的 Spring 资源 URL,可位于文件系统、类路径、HDFS 或 HTTP 之中。
示例 12-23 配置 PigServer
在示例 12-24 中,脚本通过类路径来定位,要替换的变量包含在属性文件 hadoop.properties 中。
示例 12-24 hadoop.properties
<pig-factory/>命名空间的其他一些属性包括:properties-location,它引用一个属性文件以配置 PigServer 的属性;job-tracker,当与 Hadoop 配置不同的时候,可使用该属性设置 Job Tracker 的位置;job-name,它可以设置 Pig 所创建的 MapReduce Job 的根名称,这样就能很容易地识别它们属于该脚本。
由于 PigServer 类不是线程安全的对象而且在每次执行之后都需要清除一个已建立的状态,<pig-factory/>命名空间会创建一个 PigServerFactory 实例,这样在需要时可以很容易地创建 PigServer 实例。类似于 JobRunner 和 HiveRunner 的用途,PigRunner 帮助类提供了一个便捷的方法,可以重复执行 Pig Job,也可在执行它们之前或之后执行 HDFS 脚本。示例 12-25 为 PigRunner 的配置。
示例 12-25 配置 PigRunner
我们将 run-at-startup 元素设置为 true,让 Pig 脚本在 Spring 应用上下文启动时执行(默认为 false)。示例应用程序位于目录 hadoop/pig 之中。要运行示例 password 文件分析程序,可运行如示例 12-26 所示的命令。
示例 12-26 构建并运行 Pig 示例脚本的命令
因为 PigServerFactory 和 PigRunner 是由 Spring 管理的对象,它们也可以注入其他由 Spring 管理的对象中。通常会注入 PigRunner 帮助类来确保每次执行脚本时都会建立新的 PigServer 实例,并且在执行结束后将它的所有资源进行清理。例如,为了在应用的服务层异步运行 Pig Job,我们注入 PigRunner 并使用 Spring 的 @Async 注解,如示例 12-27 所示。
示例 12-27 依赖注入 PigRunner 并异步执行 Pig Job
12.2.3 控制运行期脚本的执行
为了可以更多地在运行期控制要执行的 Pig 脚本与传入的参数,可以使用 PigTemplate 类。类似于 Spring 中的其他模板类,PigTemplate 会管理底层资源,一旦配置完成它就是线程安全的,并且会将 Pig 的错误和异常转换为 Spring 便利的 DAO 异常体系( http://static.springsource.org/spring/docs/current/spring-framework-reference/html/dao.html )。通过 Spring 的 DAO 异常体系,可以轻松地使用不同的数据访问技术,不需要捕获特定技术的异常或者查找返回码。Spring DAO 异常体系也会帮助你分别出暂时性的异常和非暂时性的异常。如果出现暂时性的异常,重新执行失败的操作有可能会成功。这个功能是通过在数据访问层使用 Spring AOP(面向切面编程)的重试通知(advice)实现的。因为 Spring 的 JDBC 帮助类也会执行相同的异常转换,由 Spring JDBC 所支持的基于 Hive 的数据访问也会映射到 DAO 体系之中。在 Hive 和 Pig 之间切换不是一个轻松的任务,因为分析脚本需要重写,但至少可以按照 Hive 以及 Pig 数据访问层的差异来隔离调用的代码,也可以把调用 Hive 和 Pig 的数据访问层类混合在一起,并且使用一致的错误处理机制。
要配置 PigTemplate,可以和之前一样创建一个 PigServerFactory 定义并且加入<pig-template/>元素,如示例 12-28 所示。可以在 properties-location 元素指定的属性文件中定义 PigServer 的通用配置属性,然后在 DAO 或 Repository 类中引用模板 - 在本例中为 PigPasswordRepository。
示例 12-28 配置 PigTemplate
PigPasswordRepository(如示例 12-29 所示)可以在运行期传入输入文件。processPasswordFiles 方法展现了如何以编程的方式在 Java 中处理多个文件。例如,或许你想基于一套复杂的规则来选择一组输入文件,而这套规则又不能以 Pig Latin 或 Pig 用户自定义函数来指定。注意,PigTemplate 类实现了 PigOperations 接口。这是 Spring 模板类通用的实现方式,因为接口很容易模拟或提供存根实现,便于进行单元测试。
示例 12-29 基于 Pig 的 PasswordRepository
Pig 脚本 password-analysis.pig 是通过 Spring 的资源抽象功能加载的,在这个例子中由类路径加载。若要运行使用 PigPasswordRepository 的应用程序,可使用如示例 12-30 所示的命令。
示例 12-30 构建并运行使用 PigPasswordRepository 的 Pig 脚本示例
这个应用程序实际所执行的代码片段如示例 12-31 所示。
示例 12-31 使用 PigPasswordRepository
12.2.4 在 Spring Integration 数据管道中调用 Pig 脚本
要在 Spring Integration 数据管道中运行 Pig Latin 脚本,可以参考 Spring Integration 服务催化器(service activator)定义的 PigPasswordRepository,如示例 12-32 所示。
示例 12-32 在 Spring Integration 数据管道中调用 Pig 脚本
以同步模式还是异步模式来执行服务催化器,取决于所使用的输入通道(channel)类型。如果它是 DirectChannel(默认)将会以同步方式执行;如果它是 ExecutorChannel,则以异步方式执行,并由 TaskExecutor 代理执行。服务类 PasswordService 如示例 12-33 所示。
示例 12-33 执行 Pig 分析 Job 的 Spring Integration 服务催化器
这个处理方法的参数将从 Spring Integration 消息头中获取,方法参数的值是在消息头中与键 hdfs_path 相关联的值。如同之前所使用的 FsShellWritingMessageHandler,消息值由 MessageHandler 实现进行填充,而且必须在数据处理管道中的服务催化器执行之前调用。
从创建一个简单的基于 Pig 的应用程序来执行脚本,到使用运行期参数替换来执行脚本,再到使用 Spring Integration 数据管道来执行脚本,本节的示例展示了一个循序渐进的演进过程。13.2 小节“Hadoop Workflows”中会介绍如何在步骤较多时使用 Spring Batch 来协调 Pig 脚本的执行过程。
12.2.5 使用 Pig 分析 Apache 日志文件
下面对 Apache HTTPD 日志文件执行简单的分析,并且展示如何为 Apache 日志文件使用自定义的加载器。如示例 12-34 所示,运行这个分析的配置类似于之前分析 password 文件时的配置。
示例 12-34 使用 Pig 分析 Apache HTTD 日志文件的配置
copy-files.groovy 脚本的职责是将示例日志文件复制到 HDFS 中,并删除输出路径的内容。
Pig 脚本会产生一个文件,里面包含了每个 URL 的累计点击数,供更全面的分析使用。脚本会提取最少与最多的点击数并形成一个简单的表格用来显示简单的点击数分布图。Pig 简化了在各种条件下的数据过滤和预处理。例如,只保留成功的 GET 请求并删除针对图片的 GET 请求。Pig 脚本与 Spring 应用上下文启动时运行脚本的 PigRunner 配置分别如示例 12-35 与示例 12-36 所示。
示例 12-35 基本 Apache HTTPD 日志分析的 Pig 脚本
示例 12-36 用于分析 Apache HTTPD 文件的 PigRunner 配置
Pig 脚本的参数指定了 jar 文件的位置,这个 jar 文件包含了用来读取与解析 Apache 日志文件的自定义加载器以及输入与输出的路径位置。为了解析 Apache HTTPD 日志文件,我们使用 Pig 发布版提供的自定义加载器,它作为 Piggybank 项目的一部分以源码的方式发布,编译后的 Piggybank jar 文件位于示例应用程序的 lib 目录。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论