- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
Python 编程指南 Beta
Flink 中的分析程序是实现数据集转换的常规程序(例如,Filter,映射,连接,分组)。数据集最初是从某些来源创建的(例如,通过读取文件或从集合中创建)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)。Flink 程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地 JVM 中执行,也可以在许多计算机的集群上执行。
为了创建自己的 Flink 程序,我们建议您从 程序框架 开始, 逐步添加自己的 转换 。其余部分充当其他 算子操作和高级函数的参考。
示例程序
以下程序是 WordCount 的完整工作示例。您可以复制并粘贴代码以在本地运行它。
from flink.plan.Environment import get_environment
from flink.functions.GroupReduceFunction import GroupReduceFunction
class Adder(GroupReduceFunction):
def reduce(self, iterator, collector):
count, word = iterator.next()
count += sum([x[0] for x in iterator])
collector.collect((count, word))
env = get_environment()
data = env.from_elements("Who's there?",
"I think I hear them. Stand, ho! Who's there?")
data \
.flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
.group_by(1) \
.reduce_group(Adder(), combinable=True) \
.output()
env.execute(local=True)
程序框架
正如我们在示例中看到的那样,Flink 程序看起来像普通的 python 程序。每个程序包含相同的基本部分:
- 获得一个
Environment
, - 加载/创建初始数据,
- 指定此数据的转换,
- 指定计算结果的放置位置,和
- 执行你的程序。
我们现在将概述每个步骤,但请参阅相应部分以获取更多详细信息。
这 Environment
是所有 Flink 计划的基础。你可以在课堂上使用这些静态方法获得一个 Environment
:
get_environment()
为了指定数据源,运行环境有几种从文件中读取的方法。要将文本文件作为一系列行读取,您可以使用:
env = get_environment()
text = env.read_text("file:///path/to/file")
这将为您提供一个 DataSet,然后您可以在其上应用转换。有关数据源和输入格式的更多信息,请参阅 数据源 。
拥有 DataSet 后,您可以应用转换来创建新的 DataSet,然后您可以将其写入文件,再次转换或与其他 DataSet 结合使用。您可以通过使用自己的自定义转换函数调用 DataSet 上的方法来应用转换。例如,Map 转换如下所示:
data.map(lambda x: x*2)
这将通过将原始 DataSet 中的每个值加倍来创建新的 DataSet。有关更多信息和所有转换的列表,请参阅 转换 。
一旦有了需要写入磁盘的 DataSet,就可以在 DataSet 上调用其中一个方法:
data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE)
output()
最后一种方法仅对本地机器上的开发/调试有用,它会将 DataSet 的内容输出到标准输出。(请注意,在集群中,结果将转到集群节点的标准输出流,最终会出现在工作程序的 .out 文件中)。前两个顾名思义。有关写入文件的更多信息,请参阅 数据接收器 。
一旦您指定的完整程序,你需要调用 execute
的 Environment
。这将提交您的程序以在群集上执行。
项目设置
除了设置 Flink 外,无需额外的工作。python 包可以在 Flink 发行版的/ resource 文件夹中找到。flink 包以及计划和可选包在运行作业时通过 HDFS 自动分布到群集中。
Python API 在安装了 Python 2.7 或 3.4 的 Linux / Windows 系统上进行了测试。
默认情况下,Flink 将通过调用“python”来启动 python 进程。通过在 flink-conf.yaml 中设置“python.binary.path”键,您可以修改此行为以使用您选择的二进制文件。
懒惰的评价
所有 Flink 程序都是懒惰地执行:当执行程序的 main 方法时,数据加载和转换不会直接发生。而是创建每个 算子操作并将其添加到程序的计划中。当 execute()
在 Environment 对象上调用其中一个方法时,实际执行这些 算子操作。
懒惰的评估使您可以构建 Flink 作为一个整体计划单元执行的复杂程序。
转换
数据转换将一个或多个 DataSet 转换为新的 DataSet。程序可以将多个转换组合到复杂的程序集中。
本节简要概述了可用的转换。该 转换文档 与示例全部转换的完整描述。
转换: Map
描述:采用一个数据元并生成一个数据元。
data.map(lambda x: x * 2)
转换: FlatMap
描述:采用一个数据元并生成零个,一个或多个数据元。
data.flat_map(
lambda x,c: [(1,word) for word in line.lower().split() for line in x])
转换: MapPartition
描述:在单个函数调用中转换并行分区。该函数将分区作为“迭代器”,并可以生成任意数量的结果值。每个分区中的数据元数量取决于并行度和先前的 算子操作。
data.map_partition(lambda x,c: [value * 2 for value in x])
转换: Filter
描述:计算每个数据元的布尔函数,并保存函数返回 true 的数据元。
data.filter(lambda x: x > 1000)
转换: Reduce
描述:通过将两个数据元重复组合成一个数据元,将一组数据元组合成一个数据元。Reduce 可以应用于完整数据集或分组数据集。
data.reduce(lambda x,y : x + y)
转换: ReduceGroup
描述:将一组数据元组合成一个或多个数据元。ReduceGroup 可以应用于完整数据集或分组数据集。
class Adder(GroupReduceFunction):
def reduce(self, iterator, collector):
count, word = iterator.next()
count += sum([x[0] for x in iterator)
collector.collect((count, word))
data.reduce_group(Adder())
转换: 骨料
描述:在数据集或数据集的每个组中的所有元组的一个字段上执行内置 算子操作(sum,min,max)。聚合可以应用于完整数据集或分组数据集。
# This code finds the sum of all of the values in the first field and the maximum of all of the values in the second field
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
data.aggregate(Aggregation.Sum, 0).and_agg(Aggregation.Max, 1)
# min(), max(), and sum() syntactic sugar functions are also available
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
data.sum(0).and_agg(Aggregation.Max, 1)
转换: Join
描述:通过创建在其键上相等的所有数据元对来连接两个数据集。(可选)使用 JoinFunction 将数据元对转换为单个数据元。见 键 如何定义连接 Keys。
# In this case tuple fields are used as keys.
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
# "0" is the join field on the first tuple
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
# "1" is the join field on the second tuple.
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
result = input1.join(input2).where(0).equal_to(1)
转换: CoGroup
描述:reduce 算子操作的二维变体。将一个或多个字段上的每个输入分组,然后关联组。每对组调用转换函数。见 键 如何定义 CoGroup 键。
data1.co_group(data2).where(0).equal_to(1)
转换: 交叉
描述:构建两个输入的笛卡尔积(交叉乘积),创建所有数据元对。可选择使用 CrossFunction 将数据元对转换为单个数据元。
result = data1.cross(data2)
转换: Union
描述:生成两个数据集的并集。
data.union(data2)
转换: ZipWithIndex
描述:为每个数据元分配连续索引。有关详细信息,请参阅[Zip 数据元指南](zip_elements_guide.html#zip-with-a-dense-index)。
data.zip_with_index()
指定 Keys
某些转换(如 Join 或 CoGroup)要求在其参数 DataSets 上定义键,而其他转换(Reduce,GroupReduce)允许 DataSet 在应用之前在键上进行分组。
DataSet 被分组为
reduced = data \
.group_by(<define key here>) \
.reduce_group(<do something>)
Flink 的数据模型不基于键值对。因此,您无需将数据集类型物理打包到键和值中。键是“虚拟的”:它们被定义为实际数据上的函数,以指导分组算子。
定义元组的键
最简单的情况是在元组的一个或多个字段上对元组的数据集进行分组:
reduced = data \
.group_by(0) \
.reduce_group(<do something>)
数据集分组在元组的第一个字段中。因此,group-reduce 函数将接收第一个字段中具有相同值的元组。
grouped = data \
.group_by(0,1) \
.reduce(/*do something*/)
数据集在由第一个和第二个字段组成的复合键上分组,因此 reduce 函数将接收两个字段具有相同值的组。
关于嵌套元组的注释:如果你有一个带有嵌套元组的 DataSet 指定 group_by(<index of tuple>)
将导致系统使用完整的元组作为键。
将函数传递给 Flink
某些 算子操作需要用户定义的函数,而所有 算子操作都接受 lambda 函数和丰富的函数作为参数。
data.filter(lambda x: x > 5)
class Filter(FilterFunction):
def filter(self, value):
return value > 5
data.filter(Filter())
丰富的函数允许使用导入的函数,提供对广播变量的访问,可以使用 init ()进行参数化,并且是复杂函数的首选。它们也是 combine
为 reduce 算子操作定义可选函数的唯一方法。
Lambda 函数允许轻松插入单线。请注意,如果 算子操作可以返回多个值,则 lambda 函数必须返回 iterable。(所有接收收集器参数的函数)
数据类型
Flink 的 Python API 目前仅提供对原始 python 类型(int,float,bool,string)和字节数组的本机支持。
可以通过将序列化程序,反序列化程序和类型类传递给环境来扩展类型支持。
class MyObj(object):
def __init__(self, i):
self.value = i
class MySerializer(object):
def serialize(self, value):
return struct.pack(">i", value.value)
class MyDeserializer(object):
def _deserialize(self, read):
i = struct.unpack(">i", read(4))[0]
return MyObj(i)
env.register_custom_type(MyObj, MySerializer(), MyDeserializer())
元组/列表
您可以将元组(或列表)用于复合类型。Python 元组映射到 Flink Tuple 类型,它包含各种类型的固定数量的字段(最多 25 个)。元组的每个字段都可以是原始类型 - 包括更多元组,从而产生嵌套元组。
word_counts = env.from_elements(("hello", 1), ("world",2))
counts = word_counts.map(lambda x: x[1])
使用需要 Key 进行分组或匹配记录的 算子时,使用元组可以简单地指定要用作键的字段的位置。您可以指定多个位置以使用复合键(请参见 章节数据转换 )。
wordCounts \
.group_by(0) \
.reduce(MyReduceFunction())
数据源
数据源创建初始数据集,例如来自文件或集合。
基于文件的:
read_text(path)
- 按行读取文件并将其作为字符串返回。read_csv(path, type)
- 解析逗号(或其他字符)分隔字段的文件。返回元组的 DataSet。支持基本 java 类型及其 Value 对应作为字段类型。
基于集合:
from_elements(*args)
- 从 Seq 创建数据集。所有数据元generate_sequence(from, to)
- 并行生成给定间隔中的数字序列。
例子
env = get_environment
\# read text file from local files system
localLiens = env.read_text("file:#/path/to/my/textfile")
\# read text file from a HDFS running at nnHost:nnPort
hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")
\# read a CSV file with three fields, schema defined using constants defined in flink.plan.Constants
csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))
\# create a set from some given elements
values = env.from_elements("Foo", "bar", "foobar", "fubar")
\# generate a number sequence
numbers = env.generate_sequence(1, 10000000)
数据接收
数据接收器使用 DataSet 并用于存储或返回它们:
write_text()
- 按字符串顺序写入数据元。通过调用每个数据元的 str() 方法获得字符串。write_csv(...)
- 将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的 str() 方法。output()
- 打印标准输出上每个数据元的 str() 值。
可以将 DataSet 输入到多个 算子操作。程序可以编写或打印数据集,同时对它们执行其他转换。
例子
标准数据接收方法:
write DataSet to a file on the local file system
textData.write_text("file:///my/result/on/localFS")
write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")
write DataSet to a file and overwrite the file if it exists
textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)
tuples as lines with pipe as the separator "a|b|c"
values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")
this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.write_text("file:///path/to/the/result/file")
广播变量
除了常规的 算子操作输入之外,广播变量还允许您为 算子操作的所有并行实例提供数据集。这对于辅助数据集或与数据相关的参数化非常有用。然后,算子可以将数据集作为集合访问。
- 广播 :广播集通过名称注册
with_broadcast_set(DataSet, String)
- 访问 :可通过
self.context.get_broadcast_variable(String)
目标算子访问
class MapperBcv(MapFunction):
def map(self, value):
factor = self.context.get_broadcast_variable("bcv")[0][0]
return value * factor
# 1\. The DataSet to be broadcast
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
toBroadcast = env.from_elements(1, 2, 3)
data = env.from_elements("a", "b")
# 2\. Broadcast the DataSet
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
data.map(MapperBcv()).with_broadcast_set("bcv", toBroadcast)
bcv
注册和访问广播数据集时,请确保名称(在前面的示例中)匹配。
注意 :由于广播变量的内容保存在每个节点的内存中,因此不应该变得太大。对于像标量值这样的简单事物,您可以简单地参数化丰富的函数。
并行执行
本节介绍如何在 Flink 中配置程序的并行执行。Flink 程序由多个任务( 算子,数据源和接收器)组成。任务被分成几个并行实例以供执行,每个并行实例处理任务输入数据的子集。任务的并行实例数称为 并行__度 或 并行__度(DOP) 。
可以在不同级别的 Flink 中指定任务的并行度。
运行环境级别
Flink 程序在 运行环境 的上下文中 执行 。运行环境为其执行的所有算子,数据源和数据接收器定义默认并行性。可以通过显式配置 算子的并行性来覆盖运行环境并行性。
可以通过调用 set_parallelism()
方法来指定运行环境的默认并行性 。要以并行方式执行 WordCount 示例程序的所有 算子,数据源和数据接收器,请 3
按如下方式设置运行环境的默认并行度:
env = get_environment()
env.set_parallelism(3)
text.flat_map(lambda x,c: x.lower().split()) \
.group_by(1) \
.reduce_group(Adder(), combinable=True) \
.output()
env.execute()
系统级别
可以通过设置 parallelism.default
属性来定义所有运行环境的系统范围默认并行度 ./conf/flink-conf.yaml
。有关详细信息,请参阅 配置 文档
执行计划
要使用 Flink 运行计划,请转到 Flink 分发,然后从/ bin 文件夹运行 pyflink.sh 脚本。包含该计划的脚本必须作为第一个参数传递,然后是一些额外的 python 包,最后由 - 将被提供给脚本的其他参数分隔。
./bin/pyflink.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论