- 内容提要
- 序
- 前言
- 第一部分 背景知识
- 第 1 章 Spring Data 项目
- 第 2 章 Repository:便利的数据访问层
- 第 3 章 使用 Querydsl 实现类型安全的查询
- 第二部分 关系型数据库
- 第 4 章 JPA Repository
- 第 5 章 借助 Querydsl SQL 实现类型安全的 JDBC 编程
- 第三部分 NoSQL
- 第 6 章 MongoDB: 文档存储
- 第 7 章 Neo4j:图数据库
- 第 8 章 Redis:键/值存储
- 第四部分 快速应用开发
- 第 9 章 使用 Spring Roo 实现持久层
- 第 10 章 REST Repository 导出器
- 第五部分 大数据
- 第 11 章 Spring for Apache Hadoop
- 第 12 章 使用 Hadoop 分析数据
- 第 13 章 使用 Spring Batch 和 Spring Integration 创建大数据管道
- 第六部分 数据网格
- 第 14 章 分布式数据网格:GemFire
- 关于封面
13.1 收集并将数据加载到 HDFS
截至目前,我们所介绍的示例都是将一组固定存储在本地目录的数据文件复制到 HDFS 中。实际上,被加载到 HDFS 的文件会由其他程序持续生成,例如 Web 服务器。本地目录将会被不断产生的日志文件所填充,日志文件一般遵循如 myapp-timestamp.log 的命名规范。日志文件通常会由远程的机器持续产生,如 Web 农场(Web Farm),这些日志文件需要传送到另外一台机器并加载到 HDFS 中。我们可以通过 Spring Integration 结合 Spring for Apache Hadoop 来实现这些场景。
本章首先简要地介绍了 Spring Integration,然后针对刚才所描述的每个场景实现应用程序。除此之外,本章还将介绍如何使用 Spring Integration 对来自事件流的数据进行处理并加载到 HDFS 之中。最后,本章介绍 Spring Integration 的特性:通过 JMX(Java Management Extensions,Java 管理扩展)和 HTTP 对这些应用程序启用丰富的运行时管理功能。
13.1.1 Spring Integration 介绍
Spring Integration 是一个基于 Apache 2 许可的开源项目,该项目起源于 2007 年,基于已建立的企业集成模式(Enterprise Integration Pattern, http://www.eaipatterns.com/ )来编写应用程序。这些模式提供了关键的构件块来结合新系统和现有系统开发集成应用。这些模式是基于消息传递模型而建立的,消息在应用内部或外部系统之间进行交换。采用消息模型可带来许多好处,例如,可让组件之间实现逻辑和物理的解耦,消息的消费者不需要了解生产者。这种解耦使得构建集成应用变得更为容易,因为应用开发时,可以将这些独立的组件装配起来。消息模型也使得应用程序的测试变得更简便,因为可以先针对单个构件进行测试,这样在开发初期就可以发现错误,而不需要等到在分布式系统测试时才发现,到那个时候追踪错误根源将会变得非常困难。图 13-1 展示了 Spring Integration 应用的关键构件以及它们之间如何是进行通信的。
图 13-1 Spring Integration 应用的构件
端点(endpoint)是消息的生产者或是消费者,它们通过通道(channel)进行连接。消息是一个简单的数据结构,头信息(header)中包含了键/值对,负载(payload)中可以包含任意的对象类型。端点可能是与外部系统通信的适配器(adapter),例如 Email、FTP、TCP、JMS、RabbitMQ 或者 syslog,也可能是消息从一个通道转移到另一个通道时对消息所采取的操作。
Spring Integration 支持通用的消息操作,包括:根据消息头路由到一个或多个通道、将负载数据由字符串类型转换成复杂的数据类型并且支持对消息进行过滤,这样只有符合过滤条件的消息才会传递到下游通道。如图 13-2 所示的示例来自于金融服务行业的 Spring/C24( http://www.c24.biz/ )合作项目,展示了 Spring Integration 可构建的数据处理通道类型。
图 13-2 Spring Integration 处理管道
这张图的左半部分展示了金融交易消息会被 3 个 RabbitMQ 适配器接收,它们分别对应 3 个外部的交易数据来源。然后这些消息会被解析、验证并且转换成规范的数据格式。需要注意的是,这些格式并不一定是 XML,通常会是 POJO。接着消息头会被扩充,然后将这条交易数据储存到关系型数据库中,同时也传递到过滤器中。过滤器只会选择高额的交易,随后将其放入可进行实时处理的 GemFire 数据网格中。我们可以通过 XML 或者 Scala 以声明式定义这个处理管道,尽管大部分应用程序可以采用声明式配置,但是一些组件为了便于进行单元测试,需要编写成 POJO。
除了端点、通道以及消息,Spring Integration 的另一个重要组件是它的管理功能。可以很容易地通过 JMX 暴露位于数据管道中的所有组件,并且执行诸如停止与启动适配器的操作。控制总线(control bus)组件可以发送一小段代码(例如使用 Groovy)来进行复杂的操作以修改系统的状态,例如改变过滤规则或启动与停止适配器。控制总线随后会被连接到一个中间件适配器,这样它就可以接收要执行的代码,常见的可选方案是 HTTP 和面向消息的中间件适配器。
我们不会对 Spring Integration 的内部原理进行太深入的探讨,也不会涉及适配器所提供的每一个功能,但是你应该深刻体会如何组合使用 Spring Integration 与 Spring for Apache Hadoop 创建复杂的数据管道解决方案。这里的示例程序中包含了使用 HDFS 时的一些自定义代码,根据计划这些代码将纳入 Spring Integration 项目中。如果想进一步了解 Spring Integration 的其他信息,可以访问项目网站( http://www. springsource.org/spring-integration ),在这里包含了大量的参考文档链接、示例程序以及 Spring Integration 相关参考书籍的链接。
13.1.2 复制日志文件
在日志文件持续产生时,将它们复制到 Hadoop 中是一项很常见的任务。创建两个将持续生成的日志文件加载到 HDFS 的应用程序,其中一个应用程序会使用入站文件适配器(inbound file adapter)来轮询目录里的文件,另一个则轮询 FTP 站点。出站适配器(Outbound Adapter)会对 HDFS 进行写操作,它是使用 Spring for Apache Hadoop 所提供的 FsShell 类实现的,关于这个类的介绍请参考 11.5 小节的“在 JVM 中编写 HDFS 脚本”。这个数据管道图如图 13-3 所示。
图 13-3 Spring Integration 数据管道轮询目录中的文件并将它们复制到 HDSF 中
在文件入站适配器中配置了用于轮询文件的目录以及确定哪些文件会被适配器检测到的文件名模式。这些值会被外部化到属性文件中,因此这些值可以很容易地改变以适应不同的运行时环境。由于文件系统不是事件驱动源,所以适配器使用轮询器来检测目录。可以使用多种方式配置轮询器,其中最常见的方式是使用固定延时(fixed delay)、固定速率(fixed rate)或 cron 表达式。在这个示例中,没对这两个适配器之间的管道进行任何额外操作,但是如果需要的话,也可以很容易加上所需要的功能。示例 13-1 是配置这个数据管道的配置文件。
示例 13-1 轮询目录中的文件并加载到 HDFS 的数据管道
这个管道的相关配置参数已外部化到 polling.properties 文件中,如示例 13-2 所示。
示例 13-2 轮询目录并将其加载到 HDFS 中的外部化属性配置
这个配置会每隔 5 秒轮询一次 /opt/application/logs 目录,并搜索匹配 *.txt 模式的文件。在默认情况下,指定了 filename-pattern 时可避免复制文件,这个状态会被保存在内存中。之后会对这个文件适配器进行增强,使它能持久化保存应用程序状态。FsShellWritingMessageHandler 类负责将文件复制到 HDFS 中,它使用了 FsShell 的 copyFromLocal 方法。如果想在传输结束之后将文件从轮询目录中删除的话,需要将 FsShellWritingMessageHandler 类的 deleteSourceFiles 属性设置为 true。也可以锁住文件,这样当多个进程并发访问同一目录时,能够阻止它们同时读取这些文件。更多信息可以阅读 Spring Integration 参考指南。
要构建和运行这个应用,可以使用如示例 13-3 所示的命令。
示例 13-3 构建并运行文件轮询示例的命令
相关的输出如示例 13-4 所示。
示例 13-4 文件轮询示例运行时的输出
在这个日志中,可以看到第一次轮询时检测到目录中有一个文件,接着会把该文件当作已处理过的文件,所以在第二次轮询的时候,文件入站适配器将不会再对它进行处理。FsShellWritingMessageHandler 额外的可选项还包括能够生成一个带有内嵌日期或 UUID(通用唯一标识码)的目录路径。将属性 generateDestinationDirectory 设置为 true,会使用默认的路径格式(年/月/日/小时/分钟/秒),也就是输出带有日期的路径。若将 generateDestinationDirectory 设置为 true,文件将被写入到 HDFS 中,如示例 13-5 所示。
示例 13-5 将 generateDestinationDirectory 设置为 true 来运行文件轮询示例的部分输出
将文件移入到 HDFS 的另外一种方法是通过 FTP 从远程机器来采集它们,如图 13-4 所示。
图 13-4 轮询 FTP 站点文件,并将它们复制到 HDFS 的 Spring Integration 数据管道
示例 13-6 的配置类似于文件轮询的配置,唯一不同的是入站适配器的配置发生了变化。
示例 13-6 定义在 FTP 站点轮询文件并将其加载到 HDFS 的数据管道
可以使用命令构建并运行这个应用程序,如示例 13-7 所示。
示例 13-7 构建并运行文件轮询示例的命令
这个配置文件假定在 FTP 主机上有一个 testuser 账户。一旦在 FTP 的传出目录中放置了文件,就会看到数据管道开始活动,首先会将文件复制到本地目录,然后会复制到 HDFS 中。
13.1.3 事件流
流 (Stream)是另一种常见的数据来源,你可能希望将其存储到 HDFS 中,并在它流入系统时执行实时的分析。为了满足这个需求,Spring Integration 提供了几个入站适配器来处理数据流。一旦数据进入 Spring Integration,它们会通过处理链(processing chain)传送并存储到 HDFS 中。管道也可以参与数据流的处理并将数据写入到其他数据库,包括关系型数据库和 NoSQL,并且可以使用众多出站适配器中的一个来实现将数据流转发给其他系统。图 13-2 是这类数据管道的示例。接下来通过 TCP(Transmission Control Protocol,传输控制协议)和 UDP(User Datagram Protocol,用户数据报协议)入站适配器来使用 syslog( http://en.wikipedia.org/wiki/Syslog )产生的数据,并将其写入到 HDFS。
示例 13-8 展示了如何搭建处理链,使其能够通过 TCP 将 syslog 的数据转移到 HDFS 中。
示例 13-8 定义数据管道,通过 TCP 接收 syslog 文件并将它加载到 HDFS 之中
这个通道的相关配置参数外部化到 streaming.properties 文件,如示例 13-9 所示。
示例 13-9 从 syslog 到 HDFS 的数据流所需要的外部化属性
图 13-5 是这个数据管道的示意图。
图 13-5 数据流从 syslog 流向 HDFS 的 Spring Integration 数据管道
这个配置将会创建连接工厂,它会在 1514 端口监听外部传入的 TCP 连接。为了将传入的 syslog 流分解为事件,序列化程序会根据换行符来将传入的字节流分段。需要注意的是,为了简化配置,未来底层的序列化配置将会被封装在 syslog XML 命名空间中。入站通道适配器会从 TCP 数据流中读取 syslog 消息,并把它解析成字节数组,数组会被设定为传入消息的有效负载。
Spring Integration 的链组件会自动将一系列端点(endpoint)组合在一起,我们不需要显式声明连接它们的管道。链中的第一个元素会解析 byte[]数组,并将其转换成 java.util.Map 对象,这个 Map 中包含了 syslog 消息的键/值对。在这个阶段,可以对数据执行其他的操作,例如过滤、加工、实时分析或者路由到其他数据库。在这个示例中,使用内置的对象到字符串(object-to-string)转换器将负载数据(也就是 Map)转换成字符串。接着字符串会被传入到 HdfsWritingMessageHandler 中,并将其写入到 HDFS。HdfsWritingMessageHandler 可以让你直接配置写入文件的 HDFS 目录、文件命名规则,以及文件大小滚动(rollover)策略。在这个示例中,文件滚动的阈值设置较低(500 字节,默认值为 10MB),以便于在简单的测试示例中展现文件滚动的效果。
可以使用如示例 13-10 所示的命令来构建并运行这个应用程序。
示例 13-10 构建并运行 syslog 流示例的命令
若要发送测试消息,可以使用日志工具,如示例 13-11 所示。
示例 13-11 发送消息到 syslog
由于我们已经将 HdfsWritingMessageHandler 的 rolloverThresholdInBytes 属性值设得很低,所以在发送几条消息或者等待消息从操作系统进入后,在 HDFS 中可以看到如示例 13-12 所示的文件。
示例 13-12 HDFS 内的 syslog 数据
使用 UDP 取代 TCP,需删除 TCP 相关定义,并添加如示例 13-13 所示的命令。
示例 13-13 配置以 UDP 来使用 syslog 数据
13.1.4 事件转发
当需要处理来自不同机器的大量数据时,将数据从生成者转发到另一个服务器来处理是非常实用的(与在本地处理相比较)操作。TCP 入站和出站适配器可以成对出现在应用程序中,以便将数据从某个服务器转发到另一个服务器。连接两个适配器的通道可以与多个持久化消息的存储组合使用。在 Spring Integration 中,消息存储以 MessageStore 接口来表示,可使用 JDBC、Redis、MongoDB 和 GemFire 的实现。因为入站和出站适配器成对出现在应用程序中会影响消息处理流程,所以消息在发送到接收的应用程序前,会先保存在发送者所在应用程序的消息存储中。一旦确认消息接收者已接收到消息,消息发送者就会将消息删除。接收者成功将接收到的消息放到自己的消息存储支撑(message-store-backed)通道中,然后发送确认消息。这个配置通过 TCP 保证了额外级别的“存储转发(store and forward)”,这种机制通常出现在像 JMS 或 RabbitMQ 这样的消息中间件之中。
示例 13-14 简单演示了 TCP 流量信息转发,并使用 Spring 所提供的支持功能启动了嵌入式 HSQL 数据库作为消息存储。
示例 13-14 使用 TCP 适配器跨进程存储并转发数据
13.1.5 管理
Spring Integration 提供了两项重要功能,能够在运行时管理数据管道:将通道和端点导出到 JMX 以及控制总线。非常类似于 JMX,控制总线允许你调用操作指令并查看每个组件相关的信息,但是它的用途更加广泛,因为它允许在运行中的应用程序内执行一段小程序来改变其状态与行为。
将通道和端点导出到 JMX 很简单,只需要添加如示例 13-15 所示的 XML 配置即可。
示例 13-15 将通道和端点导出到 JMX
请运行前一节的 TCP 流示例,之后启动 JConsole 将会显示 JMX 度量(JMX metrics)信息以及可用的操作,如图 13-6 所示。这些例子是用来启动和停止 TCP 适配器,并且取得 MessageHandler 的最小、最大以及平均处理时间的。
图 13-6 JConsole JMX 应用程序截屏界面,展示了在 TopAdapter、通道和 HdfsWritingMessageHandler 中可用的操作和属性
控制总线可以执行 Groovy 脚本或者 Spring 表达式语言(SpEL),可以在应用程序中以编程的方式来管理组件的状态。在默认情况下,Spring Integration 会通过控制总线暴露所有可存取的组件。停止 TCP 入站适配器的 SpEL 表达式语法为 @tcpAdapter.stop()。@前缀是一个可从 Spring 应用上下文中根据名称检索对象的操作符。在这个例子中,名称为 tcpAdapter,要调用的方法是 stop。执行相同动作的 Groovy 脚本不需要 @前缀符。为了声明控制总线,请加上如示例 13-16 所示的配置。
示例 13-16 配置基于 Groovy 的控制总线
通过将入站通道适配器或网关连接到控制总线的输入通道,可以远程执行脚本。也可以创建 Spring MVC 应用程序并让控制器发送消息到控制总线的输入通道,如示例 13-17 所示,如果想要提供更多诸如安全管理或者其他视图的 Web 应用功能,这是一种更为自然的方法。示例 13-17 展示了 Spring MVC 控制器将传入的 Web 请求体转发给控制总线并返回 String 类型的响应。
示例 13-17 在 Spring MVC 控制器中发送消息到控制总线
重新运行示例应用程序,可以通过 HTTP 使用 curl 与控制总线交互并查询和修改入站 TCP 适配器的状态,如示例 13-18 所示。
示例 13-18 配置控制总线
13.1.6 Spring Batch 简介
Spring Batch 项目起源于 2007 年,它是由 SpringSource 和埃森哲(Accenture)合作开发的,它有一个综合的批处理框架用来开发健壮的批处理应用程序。这些批处理应用程序需要对大量的数据进行处理,这些数据对业务操作至关重要。Spring Batch( http://static.springsource.org/spring-batch/ )已经在世界各地成千上万的企业应用中广泛使用。批处理 Job 有它自己的最佳实践和领域概念,这是埃森哲长年累月的咨询业务所构建起来的,并且已经封装到 Spring Batch 项目中。因此,Spring Batch 可以通过许多功能来支持大量数据的处理,例如失败后自动重试、跳过记录、从最后一次失败地方重新启动工作、定期批量提交给事务型数据库、可重用组件(如解析器、映射器、读取器、处理器、写入器和校验器)以及工作流定义。作为 Spring 生态系统一部分,Spring Batch 项目基于 Spring 框架的核心功能构建,例如 Spring 表达式语言的使用。Spring Batch 也延续了 Spring 框架的设计理念,强调基于 POJO 的开发方式并且促进创建可维护、可测试的代码。
Spring Batch 中工作流的概念在 Spring Batch 中转换成了 Job(注意不要和 MapReduce Job 混淆)。批处理 Job 是一个有向图,图中每个节点代表一个数据处理的 Step。Step 可以串行或者并行执行,这取决于 Step 的配置。Job 可以被启动、停止和重新启动。Job 已执行的 Step 进度将通过 JobRepository 存储在数据库中,所以 Job 可以重新启动。Job 也可以进行组合,所以可以在 Job 中包含 Job。图 13-7 展示了 Spring Batch 应用中的基本组件。JobLauncher 负责启动工作,通常是通过调度器(scheduler)来触发。Spring 框架提供了基本的调度功能,并支持与 Quartz 集成,但企业通常会采用它们自己的调度引擎,例如 Tivoli 或者 Control-M。其他启动 Job 的方式有:通过 RESTful 管理 API、Web 应用程序或者通过编程的方式对外部事件进行响应。最后一种方式,通常会使用 Spring Integration 项目以及它的众多通道适配器与外部系统通信。可以在《 Spring Integration in Action 》[Fisher12]这本书中看到更多关于 Spring Integration 结合 Spring Batch 使用的资料。如果想更多地了解 Spring Batch,可以访问项目网站( http://static.spring.source.org/spring-batch ),这里包含各种参考文档的链接、应用示例以及一些参考书籍的链接。
图 13-7 Spring Batch 概览
在每个 Step 中数据处理的执行可以分为 3 个阶段:ItemReader、ItemProcessor 和 ItemWriter,如图 13-8 所示,其中 ItemProcessor 是可选的。
图 13-8 Spring Batch Step 组件
Spring Batch 的主要使用场景之一就是处理大型文件的内容并将数据加载到关系型数据库中。在这个示例中,FlatFileItemReader 和 JdbcItemWriter 与自定义逻辑一起使用,既可以以声明的方式来配置也可以直接在 ItemProcessor 里编写代码。为了提高性能,Step 会被“分块(chunked)”,即有一个数据块,例如 100 行数据,会被聚集在一起然后传送到 ItemWriter,这样就可以高效地使用许多数据库所提供的批处理 API 来插入数据。示例 13-19 展示了使用 Spring Batch XML 命名空间的配置片段,它可以从文件中读取数据并将它们写入到数据库中。在下文中,我们将深入探讨读取器、写入器和处理器的配置。
示例 13-19 配置 Spring Batch Step 来处理 Flat 文件数据并将其复制到数据库中
Spring Batch 的其他特性可以扩展(scale up and out)执行 Job,以处理高容量和高效率的批处理 Job 需求。这些主题的相关信息,可以参考 Spring Batch 参考指南或者 Spring Batch 相关书籍([CoTeGreBa11], [Minella11])。
非常重要的一点是,Spring Batch 应用的执行模型独立于 Hadoop 集群。Spring Batch 应用程序在并发处理不同文件时可通过使用不同的线程数来扩展,或者使用 Spring Batch 自带的主从远程分区模型(Master-Slave Remote Partitioning Model)来实现扩展。实际上,加大线程数足以满足大多数用户的性能需求。在使用远程分区前,应该先试着将增加线程数作为扩展的首选策略。另外一个将要开发的执行模型是在 Hadoop 集群内部运行 Spring Batch Job,利用集群资源管理功能的优势将数据处理扩展到集群中的各个节点,并且兼顾 HDFS 的本地数据存储。这两种模型都各有优劣,在这里性能并不是决定选择使用哪个执行模型的唯一标准。在 Hadoop 集群外部执行批处理 Job,通常更便于数据在不同系统以及多个 Hadoop 集群之间进行移动。
在下一节使用 Spring Batch 框架来处理数据,并且从关系型数据库中将数据加载到 HDFS 中。在 13.3 小节的“从 HDFS 中导出数据”中,会将数据从 HDFS 导出到关系型数据库和 MongoDB 文档数据库。
13.1.7 从数据库中加载并处理数据
为了对数据进行处理,并将其从关系型数据库加载到 HDFS 中,需要使用 JdbcItemReader 和 HdfsTextItemWriter 来配置 Spring Batch Tasklet。本章节的示例应用程序位于 ./hadoop/batch-import 目录中,它基于《 Spring Batch in Action 》( http://code.google.com/p/springbatch-in-action/ )一书中的示例应用程序。示例应用的领域是一个在线商店,需要维护所销售产品的目录。我们已经对原有示例进行了一些微调,以取代 Flat 文件系统来对 HDFS 进行写入操作。Spring Batch Tasklet 的配置如示例 13-20 所示。
示例 13-20 从数据库中读取数据并写入到 HDFS 的 Spring Batch Step 配置
我们使用标准的 JDBC DataSource 以及从 product 表中查询数据的 SQL 语句来配置 JdbcCursorItemReader,这些数据会被加载到 HDFS 中。执行示例 13-21 中的命令来启动数据库并初始化数据库的样例数据,将会弹出浏览器以浏览数据库内容,其中包含了 product 表和 Spring Batch 用来实现 Job Repository 的表。
示例 13-21 初始化并运行 H2 数据库的命令,以提供给 Spring Batch 应用程序使用
提交间隔(interval)设置为 100 条,这已经超过了这个简单示例中可用的数据量,但通常推荐使用这个数量。每从数据库中读取 100 条记录,更新 Job 执行元数据的事务就会被提交到数据库。这样当执行失败时可以在停止的地方重新启动 Job。
JdbcCursorItemReader 的 rowMapper 属性是 Spring 的 RowMapper 接口实现,它也是 Spring JDBC 功能集合的一部分。当 ResultSet 中单条记录与某个 POJO 实例匹配时,RowMapper 接口可便捷地将 JDBC ResultSet 转换成 POJO 对象。Spring 封装了 ResultSet 的迭代和异常处理(通常这很冗长且容易出错),你只需专注于映射代码的编写。在示例 13-22 中,应用程序中使用 ProductRowMapper 将 ResultSet 对象的每一条记录转换为 Product Java 对象。Product 类是一个简单的 POJO,包含 product 表中所选择的列对应的 getter 和 setter 方法。
示例 13-22 将 ResultSet 中的一行记录转换成 Product 对象的 ProductRowMapper
JdbcCursorItemReader 类依赖于底层 JDBC 驱动的流功能,从而能够以高效的方式遍历结果集。可以设置 fetchSize 属性,指定驱动程序只加载一定数量的数据,这些数据将会被加载到运行在客户端进程的驱动之中。fetchSize 值的设置取决于 JDBC 驱动。例如,在 MySQL 中,官方文档建议将 fetchSize 的值设置为 Integer.MIN_VALUE,但对于大数据结果集的处理这样设置对于效率提升并不是很明显。值得注意的是,Spring Batch 也提供了 JdbcPagingItemReader 类,作为另一种策略来控制由数据库加载到客户端程序的数据量,并且具有从存储过程加载数据的功能。
这个应用程序的最后一部分配置是 hdfsWriter,如示例 13-23 所示。
示例 13-23 HdfsTextItemWriter 的配置
Hadoop 的配置和我们在前一节中所看到的差不多,但新增了<hdp:file-system/>,它负责根据 Hadoop 的配置创建相应的 org.apache.hadoop.fs.FileSystem。可选的实现会通过( hdfs:// )、HFTP( hftp:// )或 WebHDFS( webhdfs:// )与 HDFS 通信。HdfsTextItemWriter 使用 FileSystem 将纯文本文件写入到 HDFS。HdfsTextItemWriter 的属性配置 basePath、baseFileName 和 file Suffix 会将文件以类似 product-0.txt 和 product-1.txt 的命名方式写入到 /import/data/products 目录。为了展示滚动的效果,我们将 rolloverThresholdInBytes 设置成非常低的值。
ItemWriters 通常需要一个协作对象,即 LineAggreator 接口的实现类,用来将正在处理的条目转换成字符串。在这个示例中,使用 Spring Batch 提供的 PassThroughFieldExtractor,它会委托 Product 类的 toString() 方法来创建字符串。Product 类的 toString() 方法用逗号分割来连接 ID、名称、说明以及价格。
为了运行应用程序并将数据由数据库导入到 HDFS,应执行如示例 13-24 所示的命令。
示例 13-24 将数据由数据库导入到 HDFS 的命令
示例 13-25 显示了在 HDFS 中的结果内容。
示例 13-25 HDFS 中已导入 product 数据
在 Spring Batch 中还有其他大量的 LineAggregator 接口实现类来提供声明式控制,用以确定要将哪些字段写入到文件中以及使用什么字符来分割每个字段。示例 13-26 展示了其中一种实现。
示例 13-26 对于 Product 对象,指定 JavaBean 的属性名以创建写入到 HDFS 的字符串
在默认情况下,DelimitedLineAggregator 会使用逗号分隔字段,JavaBean 属性名称会传入到 BeanWrapperFieldExtractor,这样 BeanWrapperFieldExtractor 将选取的 Product 对象写成一行输出到 HDFS。
使用 LineAggregator 的配置再次运行应用程序,将会在 HDFS 中创建文件,文件内容如示例 13-27 所示。
示例 13-27 使用另一种格式导入 HDFS 的 Product 数据
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论