- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
Python 编程指南(流)Beta
Flink 中的分析流程序是实现流数据集转换(例如,Filter,映射,Join,分组)的常规程序。流数据集最初是从某些源创建的(例如,通过读取文件或从集合中创建)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)。Flink 流处理程序可在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地 JVM 中执行,也可以在许多计算机的集群上执行。
为了创建自己的 Flink 流处理程序,我们建议您从 程序框架 开始, 逐步添加自己的 转换 。其余部分充当其他 算子操作和高级函数的参考。
Jython 框架
Flink Python 流 API 使用 Jython 框架(请参阅 http://www.jython.org/archive/21/docs/whatis.html )来驱动给定脚本的执行。Python 流层实际上是现有 Java 流 API 的薄打包层。
约束
使用 Jython 有两个主要限制:
- 最新的 Python 支持版本是 2.7
- 使用 Python C 扩展并不简单
流程序示例
以下流处理程序是 WordCount 的完整工作示例。您可以复制并粘贴代码以在本地运行它(请参阅本节后面的注释)。它计算句子流中每个单词的数量(不区分大小写),窗口大小为 50 毫秒,并将结果打印到标准输出中。
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction
from org.apache.flink.api.java.functions import KeySelector
from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
class Generator(SourceFunction):
def __init__(self, num_iters):
self._running = True
self._num_iters = num_iters
def run(self, ctx):
counter = 0
while self._running and counter < self._num_iters:
ctx.collect('Hello World')
counter += 1
def cancel(self):
self._running = False
class Tokenizer(FlatMapFunction):
def flatMap(self, value, collector):
for word in value.lower().split():
collector.collect((1, word))
class Selector(KeySelector):
def getKey(self, input):
return input[1]
class Sum(ReduceFunction):
def reduce(self, input1, input2):
count1, word1 = input1
count2, word2 = input2
return (count1 + count2, word1)
def main(factory):
env = factory.get_execution_environment()
env.create_python_source(Generator(num_iters=1000)) \
.flat_map(Tokenizer()) \
.key_by(Selector()) \
.time_window(milliseconds(50)) \
.reduce(Sum()) \
.output()
env.execute()
笔记:
- 在多节点群集上执行需要共享介质存储,需要预先配置(.eg HDFS)。
- 给定脚本的输出定向到标准输出。因此,输出将写入相应的工作
.out
文件。如果脚本在 IntelliJ IDE 中执行,则输出将显示在控制台选项卡中。
程序框架
正如我们在示例中看到的那样,Flink 流程序看起来像普通的 Python 程序。每个程序包含相同的基本部分:
- 一个
main(factory)
函数定义,带有一个环境工厂参数 - 程序入口点, Environment
从工厂获得,- 加载/创建初始数据,
- 指定此数据的转换,
- 指定计算结果的放置位置,和
- 执行你的程序。
我们现在将概述每个步骤,但请参阅相应部分以获取更多详细信息。
该 main(factory)
函数是必须的,Flink 执行层使用它来运行给定的 Python 流程序。
这 Environment
是所有 Flink 计划的基础。您可以使用工厂提供的工厂方法获得一个:
factory.get_execution_environment()
为了指定数据源,流运行环境有几种方法。要将文本文件作为一系列行读取,您可以使用:
env = factory.get_execution_environment()
text = env.read_text_file("file:///path/to/file")
这将为您提供一个 DataStream,然后您可以在其上应用转换。有关数据源和输入格式的更多信息,请参阅 数据源 。
拥有 DataStream 后,您可以应用转换来创建新的 DataStream,然后可以将其写入文件,再次转换或与其他 DataStream 结合使用。您可以通过使用自己的自定义转换函数调用 DataStream 上的方法来应用转换。例如,Map 转换如下所示:
class Doubler(MapFunction):
def map(self, value):
return value * 2
data.map(Doubler())
这将通过将原始 DataStream 中的每个值加倍来创建新的 DataStream。有关更多信息和所有转换的列表,请参阅 转换 。
一旦有了需要写入磁盘的 DataStream,就可以在 DataStream 上调用其中一个方法:
data.write_as_text("<file-path>")
data.write_as_text("<file-path>", mode=WriteMode.OVERWRITE)
data.output()
最后一种方法仅对本地机器上的开发/调试有用,它会将 DataSet 的内容输出到标准输出。(请注意,在集群中,结果将转到集群节点的标准输出流,最终会出现在工作程序的 .out 文件中)。前两个顾名思义。有关写入文件的更多信息,请参阅 数据接收器 。
一旦您指定的完整程序,你需要调用 execute
的 Environment
。这将在本地计算机上执行或提交程序以在群集上执行,具体取决于 Flink 的启动方式。
项目设置
除了设置 Flink 外,无需额外的工作。使用 Jython 执行 Python 脚本意味着不需要外部包,程序就像是一个 jar 文件一样执行。
Python API 在 Windows / Linux / OSX 系统上进行了测试。
懒惰的评价
所有 Flink 程序都是懒惰地执行:当执行程序的 main 方法时,数据加载和转换不会直接发生。而是创建每个 算子操作并将其添加到程序的计划中。当 execute()
在 Environment 对象上调用其中一个方法时,实际执行这些 算子操作。程序是在本地执行还是在集群上执行取决于程序的环境。
懒惰的评估使您可以构建 Flink 作为一个整体计划单元执行的复杂程序。
转换
数据转换将一个或多个 DataStream 转换为新的 DataStream。程序可以将多个转换组合到复杂的程序集中。
本节简要概述了可用的转换。该 转换文档 与示例全部转换的完整描述。
转换: Map PythonDataStream→PythonDataStream
描述:采用一个数据元并生成一个数据元。
class Doubler(MapFunction):
def map(self, value):
return value * 2
data_stream.map(Doubler())
转换: FlatMap PythonDataStream→PythonDataStream
描述:采用一个数据元并生成零个,一个或多个数据元。
class Tokenizer(FlatMapFunction):
def flatMap(self, word, collector):
collector.collect((1, word))
data_stream.flat_map(Tokenizer())
转换: Filter PythonDataStream→PythonDataStream
描述:计算每个数据元的布尔函数,并保存函数返回 true 的数据元。
class GreaterThen1000(FilterFunction):
def filter(self, value):
return value > 1000
data_stream.filter(GreaterThen1000())
转换: KeyBy PythonDataStream→PythonKeyedStream
描述:逻辑上将流分区为不相交的分区,每个分区包含相同 Keys 的数据元。在内部,这是通过散列分区实现的。见 键 如何指定键。此转换返回 PythonKeyedDataStream。
class Selector(KeySelector):
def getKey(self, input):
return input[1] # Key by the second element in a tuple
data_stream.key_by(Selector()) // Key by field "someKey"
转换: Reduce PythonKeyedStream→PythonDataStream
描述:被 Keys 化数据流上的“滚动”Reduce。将当前数据元与最后一个 Reduce 的值组合并发出新值。
class Sum(ReduceFunction):
def reduce(self, input1, input2):
count1, val1 = input1
count2, val2 = input2
return (count1 + count2, val1)
data.reduce(Sum())
转换: Window PythonKeyedStream→PythonWindowedStream
描述:可以在已经分区的 KeyedStream 上定义 Windows。Windows 根据某些特征(例如,在最后 5 秒内到达的数据)对每个 Keys 中的数据进行分组。有关 窗口 的完整说明,请参见 windows。
keyed_stream.count_window(10, 5) # Last 10 elements, sliding (jumping) by 5 elements
keyed_stream.time_window(milliseconds(30)) # Last 30 milliseconds of data
keted_stream.time_window(milliseconds(100), milliseconds(20)) # Last 100 milliseconds of data, sliding (jumping) by 20 milliseconds
转换: Window Apply PythonWindowedStream→PythonDataStream
描述:将一般函数应用于整个窗口。下面是一个手动求和窗口数据元的函数。
class WindowSum(WindowFunction):
def apply(self, key, window, values, collector):
sum = 0
for value in values:
sum += value[0]
collector.collect((key, sum))
windowed_stream.apply(WindowSum())
转换: Window Reduce PythonWindowedStream→PythonDataStream
描述:将函数缩减函数应用于窗口并返回缩小的值。
class Sum(ReduceFunction):
def reduce(self, input1, input2):
count1, val1 = input1
count2, val2 = input2
return (count1 + count2, val1)
windowed_stream.reduce(Sum())
转换: Union PythonDataStream *→PythonDataStream
描述:两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元。
data_stream.union(other_stream1, other_stream2, ...);
转换: Split PythonDataStream→PythonSplitStream
描述:根据某些标准将流拆分为两个或更多个流。
class StreamSelector(OutputSelector):
def select(self, value):
return ["even"] if value % 2 == 0 else ["odd"]
splited_stream = data_stream.split(StreamSelector())
转换: Select SplitStream→DataStream
描述:从拆分流中选择一个或多个流。
even_data_stream = splited_stream.select("even")
odd_data_stream = splited_stream.select("odd")
all_data_stream = splited_stream.select("even", "odd")
转换: Iterate PythonDataStream→PythonIterativeStream→PythonDataStream
描述:通过将一个 算子的输出重定向到某个先前的 算子,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码以流开头并连续应用迭代体。大于 0 的数据元将被发送回反馈通道,其余数据元将向下游转发。有关完整说明,请参阅 迭代 。
class MinusOne(MapFunction):
def map(self, value):
return value - 1
class PositiveNumber(FilterFunction):
def filter(self, value):
return value > 0
class LessEquelToZero(FilterFunction):
def filter(self, value):
return value <= 0
iteration = initial_stream.iterate(5000)
iteration_body = iteration.map(MinusOne())
feedback = iteration_body.filter(PositiveNumber())
iteration.close_with(feedback)
output = iteration_body.filter(LessEquelToZero())
将函数传递给 Flink
某些 算子操作需要用户定义的函数作为参数。所有函数都应该定义为派生自相关 Flink 函数的 Python 类。用户定义的函数被序列化并发送到 TaskManagers 以供执行。
class Filter(FilterFunction):
def filter(self, value):
return value > 5
data_stream.filter(Filter())
丰富的函数(.eg RichFilterFunction
)允许定义(覆盖)可选 算子操作: open
& close
。用户可以使用这些函数进行初始化和清理。
class Tokenizer(RichMapFunction):
def open(self, config):
pass
def close(self):
pass
def map(self, value):
pass
data_stream.map(Tokenizer())
open
在启动流式传输管道之前,Worker 会调用该函数。 close
在流管道停止后,Worker 调用该函数。
数据类型
Flink 的 Python Streaming API 支持原始 Python 类型(int,float,bool,string),以及字节数组和用户定义的类。
class Person:
def __init__(self, name, age):
self.name = name
self.age = age
class Tokenizer(MapFunction):
def map(self, value):
return (1, Person(*value))
data_stream.map(Tokenizer())
元组/列表
您可以将元组(或列表)用于复合类型。Python 元组映射到 Jython 本机对应类型,这些类型由 Python 打包器薄层处理。
word_counts = env.from_elements(("hello", 1), ("world",2))
class Tokenizer(MapFunction):
def map(self, value):
return value[1]
counts = word_counts.map(Tokenizer())
数据源
数据源创建初始数据流,例如来自文件或集合。
基于文件的:
read_text_file(path)
- 按行读取文件并将其作为字符串流返回。
基于集合:
from_elements(*args)
- 从所有数据元创建数据流。generate_sequence(from, to)
- 并行生成给定间隔中的数字序列。
例子
env = factory.get_execution_environment()
\# read text file from local files system
localLiens = env.read_text("file:///path/to/my/textfile")
\# read text file from a HDFS running at nnHost:nnPort
hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")
\# create a set from some given elements
values = env.from_elements("Foo", "bar", "foobar", "fubar")
\# generate a number sequence
numbers = env.generate_sequence(1, 10000000)
数据接收
数据接收器使用 DataStream 并用于存储或返回它们:
write_as_text()
- 按字符串顺序写入数据元。通过调用每个数据元的 str() 方法获得字符串。output()
- 打印标准输出上每个数据元的 str() 值。write_to_socket()
- 将 DataStream 作为字节数组写入套接字[host:port]。
可以将 DataStream 输入到多个 算子操作。程序可以编写或打印数据流,同时对它们执行其他转换。
例子
标准数据接收方法:
write DataStream to a file on the local file system
textData.write_as_text("file:///my/result/on/localFS")
write DataStream to a file on a HDFS with a namenode running at nnHost:nnPort
textData.write_as_text("hdfs://nnHost:nnPort/my/result/on/localFS")
write DataStream to a file and overwrite the file if it exists
textData.write_as_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)
this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.write_as_text("file:///path/to/the/result/file")
并行执行
本节介绍如何在 Flink 中配置程序的并行执行。Flink 程序由多个任务( 算子,数据源和接收器)组成。任务被分成几个并行实例以供执行,每个并行实例处理任务输入数据的子集。任务的并行实例数称为 并行__度 或 并行__度(DOP) 。
可以在不同级别的 Flink 中指定任务的并行度。
运行环境级别
Flink 程序在 运行环境 的上下文中 执行 。运行环境为其执行的所有算子,数据源和数据接收器定义默认并行性。可以通过显式配置 算子的并行性来覆盖运行环境并行性。
可以通过调用 set_parallelism()
方法来指定运行环境的默认并行性 。要以并行方式执行 WordCount 示例程序的所有 算子,数据源和数据接收器,请 3
按如下方式设置运行环境的默认并行度:
env = factory.get_execution_environment()
env.set_parallelism(3)
text.flat_map(Tokenizer()) \
.key_by(Selector()) \
.time_window(milliseconds(30)) \
.reduce(Sum()) \
.print()
env.execute()
系统级别
可以通过设置 parallelism.default
属性来定义所有运行环境的系统范围默认并行度 ./conf/flink-conf.yaml
。有关详细信息,请参阅 配置 文档
执行计划
要使用 Flink 运行计划,请转到 Flink 分发,然后从/ bin 文件夹运行 pyflink-stream.sh 脚本。包含该计划的脚本必须作为第一个参数传递,然后是许多其他 Python 包,最后由 -
将被提供给脚本的其他参数分隔。
./bin/pyflink-stream.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论