- Apache Flink 文档
- 概念
- 数据流编程模型
- 分布式运行时环境
- 教程
- API 教程
- DataStream API 教程
- Setup 教程
- 本地安装教程
- 在 Windows 上运行 Flink
- 例子
- 批处理示例
- 应用开发
- 项目构建设置
- Java 项目模板
- Scala 的项目模板
- 配置依赖关系,连接器,库
- 基础 API 概念
- Scala API 扩展
- Java Lambda 表达式
- Flink DataStream API 编程指南
- 活动时间
- 事件时间/处理时间/摄取时间
- 活动时间和水印
- 状态与容错
- 算子
- DataStream 转换
- 物理分区
- 任务链和资源组
- 流连接器
- 旁路输出
- Python 编程指南(流)Beta
- 测试
- 实验特点
- Flink DataSet API 编程指南
- 数据集转换
- 容错
- 在数据集中压缩数据元
- 迭代
- Python 编程指南 Beta
- 连接器
- Hadoop 兼容性测试版
- 本地执行
- 群集执行
- Table API 和 SQL
- 概念和通用 API
- 流处理概念
- 连接到外部系统
- Table API
- SQL
- 内置函数
- 用户定义的源和接收器
- 用户定义的函数
- SQL 客户端测试版
- 数据类型和序列化
- 为 Flink 程序注册自定义序列化程序
- 管理执行
- 执行配置
- 程序打包和分布式执行
- 并行执行
- 执行计划
- 重启策略
- 类库
- FlinkCEP - Flink 的复杂事件处理
- 风暴兼容性 Beta
- 项目配置
- 执行 Storm 拓扑
- 在 Flink 流程序中嵌入 Storm 算子
- Flink Extensions
- Storm 兼容性示例
- Gelly:Flink Graph API
- 图 API
- FlinkML - Flink 的机器学习
- 最佳实践
- API 迁移指南
- 部署和运营
- 集群和部署
- 独立群集
- YARN 设置
- Mesos 设置
- Kubernetes 设置
- Docker 设置
- 亚马逊网络服务(AWS)
- Google Compute Engine 设置
- 先决条件
- 在 Google Compute Engine 上部署 Flink
- MapR 设置
- Hadoop 集成
- JobManager 高可用性(HA)
- 状态和容错
- 检查点
- 保存点
- 状态后台
- 调整检查点和大状态
- 配置
- 生产准备清单
- 命令行界面
- Scala REPL
- Kerberos 身份验证设置和配置
- SSL 设置
- 文件系统
- 升级应用程序和 Flink 版本
- 调试和监控
- 度量
- 如何使用日志记录
- 历史服务器
- 监控检查点
- 监测背压
- 监控 REST API
- 调试 Windows 和事件时间
- 调试类加载
- 应用程序分析
- 使用 Java Flight Recorder 进行性能分析
- 使用 JITWatch 进行分析
- Flink Development
- 将 Flink 导入 IDE
- 从 Source 建立 Flink
- 内幕
- 组件堆栈
- 数据流容错
- 工作和调度
- 任务生命周期
- 文件系统
- 实现
- 坚持保证
- 更新文件内容
- 覆盖文件
- 线程安全
FlinkCEP - Flink 的复杂事件处理
FlinkCEP 是在 Flink 之上实现的复杂事件处理(CEP)库。它允许您在无休止的事件流中检测事件模式,让您有机会掌握数据中重要的事项。
此页面描述了 Flink CEP 中可用的 API 调用。我们首先介绍 Pattern API ,它允许您指定要在流中检测的模式,然后介绍如何 检测匹配事件序列并对其进行 算子操作 。然后,我们将介绍 CEP 库在 处理 事件时间 延迟 时所做的假设,以及如何 将您的工作 从较旧的 Flink 版本 迁移 到 Flink-1.3。
入门
如果要直接进入,请 设置 Flink 程序 并将 FlinkCEP 依赖项添加到 pom.xml
项目中。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
信息 FlinkCEP 不是二进制分发的一部分。 在此处 了解如何与集群执行相关联。
现在,您可以使用 Pattern API 开始编写第一个 CEP 程序。
注意在事件 DataStream
要应用模式匹配,必须实施适当的到 equals()
和 hashCode()
方法,因为 FlinkCEP 用它们来比较和匹配的事件。
DataStream<Event> input = ...
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getId() == 42;
}
}
).next("middle").subtype(SubEvent.class).where(
new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent subEvent) {
return subEvent.getVolume() >= 10.0;
}
}
).followedBy("end").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("end");
}
}
);
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.select(
new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) throws Exception {
return createAlertFrom(pattern);
}
}
});
val input: DataStream[Event] = ...
val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
.next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
.followedBy("end").where(_.getName == "end")
val patternStream = CEP.pattern(input, pattern)
val result: DataStream[Alert] = patternStream.select(createAlert(_))
Pattern API
模式 API 允许您定义要从输入流中提取的复杂模式序列。
每个复杂模式序列由多个简单模式组成,即模式查找具有相同属性的单个事件。从现在开始,我们将调用这些简单的模式 模式 ,以及我们在流中搜索的最终复杂模式序列,即 模式序列 。您可以将模式序列视为此类模式的图形,其中从一个模式到下一个模式的转换基于用户指定的 条件发生 ,例如 event.getName().equals("end")
。一个 匹配 是访问的复数图案图的所有模式,通过有效模式转换的序列的输入事件的序列。
注意每个模式必须具有唯一的名称,稍后您可以使用该名称来标识匹配的事件。
注意模式名称 不能 包含该字符 ":"
。
在本节的其余部分,我们将首先介绍如何定义 单个模式 ,然后如何将各个模式组合到 复杂模式中 。
个人模式
一个 模式 可以是 单 或一个 循环 模式。单例模式接受单个事件,而循环模式可以接受多个事件。在模式匹配的符号,图案 "a b+ c? d"
(或 "a"
,随后 一个或多个 "b"
的,任选接着一个 "c"
,接着是 "d"
), , a
, c?
和 d
是单模式,同时 b+
是一个循环的一个。默认情况下,模式是单例模式,您可以使用 Quantifiers 将其转换为循环模式。每个模式可以有一个或多个 条件, 基于它接受事件。
量词
在 FlinkCEP 中,您可以使用以下方法指定循环模式: pattern.oneOrMore()
对于期望一个或多个事件发生的模式(例如 b+
前面提到的); 并且 pattern.times(#ofTimes)
,对于期望特定类型事件的特定出现次数的模式,例如 4 a
; 并且 pattern.times(#fromTimes, #toTimes)
,对于期望特定最小出现次数和给定类型事件的最大出现次数的模式,例如 2-4s a
。
您可以使用该 pattern.greedy()
方法使循环模式变得贪婪,但您还不能使组模式变得贪婪。您可以使用该 pattern.optional()
方法创建所有模式,循环与否,可选。
对于命名的模式 start
,以下是有效的量词:
// expecting 4 occurrences
start.times(4);
// expecting 0 or 4 occurrences
start.times(4).optional();
// expecting 2, 3 or 4 occurrences
start.times(2, 4);
// expecting 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).greedy();
// expecting 0, 2, 3 or 4 occurrences
start.times(2, 4).optional();
// expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).optional().greedy();
// expecting 1 or more occurrences
start.oneOrMore();
// expecting 1 or more occurrences and repeating as many as possible
start.oneOrMore().greedy();
// expecting 0 or more occurrences
start.oneOrMore().optional();
// expecting 0 or more occurrences and repeating as many as possible
start.oneOrMore().optional().greedy();
// expecting 2 or more occurrences
start.timesOrMore(2);
// expecting 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).greedy();
// expecting 0, 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).optional().greedy();
// expecting 4 occurrences
start.times(4)
// expecting 0 or 4 occurrences
start.times(4).optional()
// expecting 2, 3 or 4 occurrences
start.times(2, 4)
// expecting 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).greedy()
// expecting 0, 2, 3 or 4 occurrences
start.times(2, 4).optional()
// expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).optional().greedy()
// expecting 1 or more occurrences
start.oneOrMore()
// expecting 1 or more occurrences and repeating as many as possible
start.oneOrMore().greedy()
// expecting 0 or more occurrences
start.oneOrMore().optional()
// expecting 0 or more occurrences and repeating as many as possible
start.oneOrMore().optional().greedy()
// expecting 2 or more occurrences
start.timesOrMore(2)
// expecting 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).greedy()
// expecting 0, 2 or more occurrences
start.timesOrMore(2).optional()
// expecting 0, 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).optional().greedy()
条件
对于每个模式,您可以指定传入事件必须满足的条件,以便“接受”到模式中,例如,其值应大于 5,或大于先前接受的事件的平均值。您可以指定经由事件属性条件 pattern.where()
, pattern.or()
或 pattern.until()
方法。这些可以是 IterativeCondition
s 或 SimpleCondition
s。
迭代条件: 这是最常见的条件类型。这是您可以如何指定一个条件,该条件基于先前接受的事件的属性或其子集的统计信息来接受后续事件。
下面是迭代条件的代码,如果名称以“foo”开头,则接受名为“middle”的模式的下一个事件,并且如果该模式的先前接受的事件的价格总和加上当前的价格事件不要超过 5.0 的值。迭代条件可能很强大,特别是与循环模式结合使用,例如 oneOrMore()
。
middle.oneOrMore()
.subtype(SubEvent.class)
.where(new IterativeCondition<SubEvent>() {
@Override
public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
if (!value.getName().startsWith("foo")) {
return false;
}
double sum = value.getPrice();
for (Event event : ctx.getEventsForPattern("middle")) {
sum += event.getPrice();
}
return Double.compare(sum, 5.0) < 0;
}
});
middle.oneOrMore()
.subtype(classOf[SubEvent])
.where(
(value, ctx) => {
lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum
value.getName.startsWith("foo") && sum + value.getPrice < 5.0
}
)
注意调用以 ctx.getEventsForPattern(...)
查找给定潜在匹配的所有先前接受的事件。此 算子操作的成本可能会有所不同,因此在实施您的条件时,请尽量 Reduce 其使用。
简单条件: 此类条件扩展了上述 IterativeCondition
类,并 仅 基于事件本身的属性决定是否接受事件。
start.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return value.getName().startsWith("foo");
}
});
start.where(event => event.getName.startsWith("foo"))
最后,您还可以 Event
通过 pattern.subtype(subClass)
方法将接受事件的类型限制为初始事件类型(此处)的子类型。
start.subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent value) {
return ... // some condition
}
});
start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)
组合条件: 如上所示,您可以将 subtype
条件与其他条件组合。这适用于所有条件。您可以通过顺序调用任意组合条件 where()
。最终结果将是各个条件的结果的逻辑 AND 。要使用 OR 组合条件,可以使用该 or()
方法,如下所示。
pattern.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ... // some condition
}
}).or(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) {
return ... // or condition
}
});
pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)
停止条件: 在循环模式( oneOrMore()
和 oneOrMore().optional()
)的情况下,您还可以指定停止条件,例如,接受值大于 5 的事件,直到值的总和小于 50。
为了更好地理解它,请看下面的示例。特定
- 模式
"(a+ until b)"
(一个或多个"a"
直到"b"
) - 一系列传入事件
"a1" "c" "a2" "b" "a3"
- 该库将输出结果:
{a1 a2} {a1} {a2} {a3}
。
由于停止条件,您可以看到 {a1 a2 a3}
或未 {a2 a3}
返回。
模式操作: where(condition)
描述:定义当前模式的条件。要匹配模式,事件必须满足条件。多个连续的 where()子句导致其条件为 AND:
pattern.where(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ... // some condition
}
});
模式操作: or(condition)
描述:添加与现有条件进行 OR 运算的新条件。只有在至少通过其中一个条件时,事件才能匹配该模式:
pattern.where(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ... // some condition
}
}).or(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ... // alternative condition
}
});
模式操作: until(condition)
描述:指定循环模式的停止条件。意味着如果匹配给定条件的事件发生,则不再接受该模式中的事件。仅适用于 oneOrMore()
注意: 它允许在基于事件的条件下清除相应模式的状态。
pattern.oneOrMore().until(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context ctx) throws Exception {
return ... // alternative condition
}
});
模式操作: subtype(subClass)
描述:定义当前模式的子类型条件。如果事件属于此子类型,则事件只能匹配该模式:
pattern.subtype(SubEvent.class);
模式操作: oneOrMore()
描述:指定此模式至少发生一次匹配事件。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参阅 连续 。 注意: 建议使用 until()
或 within()
启用状态清除
pattern.oneOrMore();
模式操作: timesOrMore(#times)
描述:指定此模式至少需要 #times 出现匹配事件。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参阅 连续 。
pattern.timesOrMore(2);
模式操作: 次(#ofTimes)
描述:指定此模式需要匹配事件的确切出现次数。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参阅 连续 。
pattern.times(2);
模式操作: times(#fromTimes,#toTimes)
描述:指定此模式期望在匹配事件的 #fromTimes 和 #toTimes 之间出现。默认情况下,使用宽松的内部连续性(在后续事件之间)。有关内部连续性的更多信息,请参阅 连续 。
pattern.times(2, 4);
模式操作: Optional()
描述:指定此模式是可选的,即根本不会发生。这适用于所有上述量词。
pattern.oneOrMore().optional();
模式操作: greedy()
描述:指定此模式是贪婪的,即它将尽可能多地重复。这仅适用于量词,目前不支持组模式。
pattern.oneOrMore().greedy();
结合模式
现在你已经看到了单个模式的样子,现在是时候看看如何将它们组合成一个完整的模式序列。
模式序列必须以初始模式开始,如下所示:
Pattern<Event, ?> start = Pattern.<Event>begin("start");
val start : Pattern[Event, _] = Pattern.begin("start")
接下来,您可以通过指定它们之间所需的 连续条件 ,为模式序列添加更多模式。FlinkCEP 支持事件之间的以下形式的邻接:
- 严格连续性 :预期所有匹配事件一个接一个地出现,中间没有任何不匹配的事件。
- 轻松连续性 :忽略匹配的事件之间出现的不匹配事件。
- 非确定性轻松 连续性:进一步放宽连续性,允许忽略某些匹配事件的其他匹配。
要在连续模式之间应用它们,您可以使用:
next()
, 严格来说 ,followedBy()
, 放松 ,和followedByAny()
,对于 非确定性放松 连续性。
要么
notNext()
,如果您不希望事件类型直接跟随另一个事件类型notFollowedBy()
,如果您不希望事件类型在两个其他事件类型之间的任何位置。
注意模式序列无法结束 notFollowedBy()
。
注意一个 NOT
图案不能由一个可选的一个之后。
// strict contiguity
Pattern<Event, ?> strict = start.next("middle").where(...);
// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);
// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);
// NOT pattern with strict contiguity
Pattern<Event, ?> strictNot = start.notNext("not").where(...);
// NOT pattern with relaxed contiguity
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
// strict contiguity val strict: Pattern[Event, _] = start.next("middle").where(...)
// relaxed contiguity val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...)
// non-deterministic relaxed contiguity val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...)
// NOT pattern with strict contiguity val strictNot: Pattern[Event, _] = start.notNext("not").where(...)
// NOT pattern with relaxed contiguity val relaxedNot: Pattern[Event, _] = start.notFollowedBy("not").where(...)
宽松的连续性意味着仅匹配第一个匹配事件,而具有非确定性的松弛连续性,将针对同一开始发出多个匹配。作为示例, "a b"
给定事件序列的模式 "a", "c", "b1", "b2"
将给出以下结果:
"a"
和之间的严格连续性"b"
:({}
不匹配),"c"
后"a"
原因"a"
被丢弃。- 之间轻松连续性
"a"
和"b"
:{a b1}
作为放松的连续性被视为“跳过不匹配的事件,直到下一个匹配的一个”。 - 非确定性之间轻松连续性
"a"
和"b"
:{a b1}
,{a b2}
,因为这是最普遍的形式。
也可以为模式定义时间约束以使其有效。例如,您可以通过该 pattern.within()
方法定义模式应在 10 秒内发生。 处理和事件时间 都支持时间模式。
注意模式序列只能有一个时间约束。如果在不同的单独模式上定义了多个这样的约束,则应用最小的约束。
next.within(Time.seconds(10));
next.within(Time.seconds(10))
循环模式中的邻接性
您可以在循环模式中应用与上一 节中 讨论的相同的连续条件。连续性将应用于被接受到这种模式中的数据元之间。为了举例说明上述情况,一个模式序列 "a b+ c"
( "a"
后跟一个或多个 "b"
的任何(非确定性松弛)序列,然后 a "c"
)输入 "a", "b1", "d1", "b2", "d2", "b3" "c"
将具有以下结果:
- 严格连续性 :
{a b3 c}
-在"d1"
之后"b1"
的原因"b1"
被丢弃,同样的情况发生了"b2"
,因为"d2"
。 - 宽松的连续性 :
{a b1 c}
,{a b1 b2 c}
,{a b1 b2 b3 c}
,{a b2 c}
,{a b2 b3 c}
,{a b3 c}
-"d"
的被忽略。 - 非确定性轻松连续性 :
{a b1 c}
,{a b1 b2 c}
,{a b1 b3 c}
,{a b1 b2 b3 c}
,{a b2 c}
,{a b2 b3 c}
,{a b3 c}
-注意{a b1 b3 c}
,这是间放松连续性的结果"b"
S”。
对于循环模式(例如 oneOrMore()
和 times()
),默认是 放松的连续性 。如果您想要严格的连续性,则必须使用该 consecutive()
呼叫明确指定它,如果您想要 非确定性的宽松连续性 ,则可以使用该 allowCombinations()
呼叫。
模式操作: continuous()
描述:与匹配事件一起使用 oneOrMore()
并 times()
强制执行严格的连续性,即任何不匹配的数据元都会中断匹配(如 next()
)。如果不应用,则使用放松的连续性(如 followedBy()
)。例如:
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
})
.followedBy("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().consecutive()
.followedBy("end1").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
将为输入序列生成以下匹配项:CD A1 A2 A3 D A4 B.连续申请:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B}没有连续申请:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}
模式操作: allowCombinations()
描述:与匹配事件一起使用 oneOrMore()
并且 times()
在匹配事件之间施加非确定性松弛连续性(如 followedByAny()
)。如果不应用,则使用放松的连续性(如 followedBy()
)。例如:
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
})
.followedBy("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().allowCombinations()
.followedBy("end1").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
将为输入序列生成以下匹配项:CD A1 A2 A3 D A4 B.启用组合:{C A1 B},{C A1 A2 B},{C A1 A3 B},{C A1 A4 B},{C A1 A2 A3 B},{C A1 A2 A4 B},{C A1 A3 A4 B},{C A1 A2 A3 A4 B}未启用组合:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}
模式组
它也可以定义一个模式序列作为条件 begin
, followedBy
, followedByAny
和 next
。图案序列将被认为是逻辑上的匹配条件和一个 GroupPattern
将被返回,并且可以应用 oneOrMore()
, times(#ofTimes)
, times(#fromTimes, #toTimes)
, optional()
, consecutive()
, allowCombinations()
的 GroupPattern
。
Pattern<Event, ?> start = Pattern.begin(
Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
);
// strict contiguity
Pattern<Event, ?> strict = start.next(
Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);
// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy(
Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();
// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny(
Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();
val start: Pattern[Event, _] = Pattern.begin(
Pattern.begin[Event]("start").where(...).followedBy("start_middle").where(...)
)
// strict contiguity val strict: Pattern[Event, _] = start.next(
Pattern.begin[Event]("next_start").where(...).followedBy("next_middle").where(...)
).times(3)
// relaxed contiguity val relaxed: Pattern[Event, _] = start.followedBy(
Pattern.begin[Event]("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore()
// non-deterministic relaxed contiguity val nonDetermin: Pattern[Event, _] = start.followedByAny(
Pattern.begin[Event]("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional()
模式操作: 开始(#NAME)
描述:定义一个起始模式:
Pattern<Event, ?> start = Pattern.<Event>begin("start");
模式操作: 开始(#pattern_sequence)
描述:定义一个起始模式:
Pattern<Event, ?> start = Pattern.<Event>begin(
Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);
模式操作: 下一个(#NAME)
描述:添加新模式。匹配事件必须直接接替先前的匹配事件(严格连续性):
Pattern<Event, ?> next = start.next("middle");
模式操作: 下一个(#pattern_sequence)
描述:添加新模式。一系列匹配事件必须直接接替先前的匹配事件(严格连续性):
Pattern<Event, ?> next = start.next(
Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);
模式操作: followedBy(#NAME)
描述:添加新模式。匹配事件和先前匹配事件(轻松连续)之间可能发生其他事件:
Pattern<Event, ?> followedBy = start.followedBy("middle");
模式操作: followedBy(#pattern_sequence)
描述:添加新模式。在一系列匹配事件和先前匹配事件(轻松连续)之间可能发生其他事件:
Pattern<Event, ?> followedBy = start.followedBy(
Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);
模式操作: followedByAny(#NAME)
描述:添加新模式。匹配事件和先前匹配事件之间可能发生其他事件,并且将针对每个备选匹配事件(非确定性放松连续性)呈现替代匹配:
Pattern<Event, ?> followedByAny = start.followedByAny("middle");
模式操作: followedByAny(#pattern_sequence)
描述:添加新模式。在一系列匹配事件和先前匹配事件之间可能发生其他事件,并且将针对匹配事件的每个替代序列(非确定性松弛邻接)呈现替代匹配:
Pattern<Event, ?> followedByAny = start.followedByAny(
Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);
模式操作: notNext()
描述:添加新的负面模式。匹配(否定)事件必须直接成功执行先前的匹配事件(严格连续性)才能丢弃部分匹配:
Pattern<Event, ?> notNext = start.notNext("not");
模式操作: notFollowedBy()
描述:添加新的负面模式。即使在匹配(否定)事件和先前匹配事件(松弛连续性)之间发生其他事件,也将丢弃部分匹配事件序列:
Pattern<Event, ?> notFollowedBy = start.notFollowedBy("not");
模式操作: 内(时间)
描述:定义事件序列与模式匹配的最大时间间隔。如果未完成的事件序列超过此时间,则将其丢弃:
pattern.within(Time.seconds(10));
比赛后跳过策略
对于给定模式,可以将同一事件分配给多个成功匹配。要控制将分配事件的匹配数,您需要指定调用的跳过策略 AfterMatchSkipStrategy
。跳过策略有四种类型,如下所示:
- NO_SKIP :将发出每个可能的匹配。
- SKIP_PAST_LAST_EVENT :丢弃在匹配开始后但在结束之前开始的每个部分匹配。
- SKIP_TO_FIRST :丢弃在比赛开始后但在 PatternName 的第一个事件发生之前开始的每个部分匹配。
- SKIP_TO_LAST :放弃在比赛开始之后但在 PatternName 的最后一个事件发生之前开始的每个部分匹配。
请注意,使用 SKIP_TO_FIRST 和 SKIP_TO_LAST 跳过策略时,还应指定有效的 PatternName 。
例如,对于给定模式 b+ c
和数据流 b1 b2 b3 c
,这四种跳过策略之间的差异如下:
跳过策略 | 结果 | 描述 |
---|---|---|
NO_SKIP | b1 b2 b3 c | 找到匹配后 b1 b2 b3 c ,匹配过程不会丢弃任何结果。 |
b2 b3 c | ||
b3 c | ||
SKIP_PAST_LAST_EVENT | b1 b2 b3 c | 找到匹配后 b1 b2 b3 c ,匹配过程将丢弃所有已开始的部分匹配。 |
SKIP_TO_FIRST [ b* ] | b1 b2 b3 c | 找到匹配后 b1 b2 b3 c ,匹配过程将尝试丢弃之前开始的所有部分匹配 b1 ,但是没有这样的匹配。因此,不会丢弃任何东西。 |
b2 b3 c | ||
b3 c | ||
SKIP_TO_LAST [ b ] | b1 b2 b3 c | 找到匹配后 b1 b2 b3 c ,匹配过程将尝试丢弃之前开始的所有部分匹配 b3 。有一个这样的比赛 b2 b3 c |
b3 c |
看看另一个例子,以便更好地看到 NO_SKIP 和 SKIP_TO_FIRST 之间的区别:模式: (a | c) (b | c) c+.greedy d
和序列: a b c1 c2 c3 d
然后结果将是:
跳过策略 | 结果 | 描述 |
---|---|---|
NO_SKIP | a b c1 c2 c3 d | 找到匹配后 a b c1 c2 c3 d ,匹配过程不会丢弃任何结果。 |
b c1 c2 c3 d | ||
c1 c2 c3 d | ||
c2 c3 d | ||
SKIP_TO_FIRST [ b* ] | a b c1 c2 c3 d | 找到匹配后 a b c1 c2 c3 d ,匹配过程将尝试丢弃之前开始的所有部分匹配 c1 。有一个这样的比赛 b c1 c2 c3 d 。 |
c1 c2 c3 d |
要指定要使用的跳过策略,只需 AfterMatchSkipStrategy
通过调用创建:
函数 | 描述 |
---|---|
AfterMatchSkipStrategy.noSkip() | 创建 NO_SKIP 跳过策略 |
AfterMatchSkipStrategy.skipPastLastEvent() | 创建 SKIP_PAST_LAST_EVENT 跳过策略 |
AfterMatchSkipStrategy.skipToFirst(patternName) | 使用引用的模式名称 patternName 创建 SKIP_TO_FIRST 跳过策略 |
AfterMatchSkipStrategy.skipToLast(patternName) | 使用引用的模式名称 patternName 创建 SKIP_TO_LAST 跳过策略 |
然后通过调用将跳过策略应用于模式:
AfterMatchSkipStrategy skipStrategy = ...
Pattern.begin("patternName", skipStrategy);
val skipStrategy = ...
Pattern.begin("patternName", skipStrategy)
检测模式
指定要查找的模式序列后,是时候将其应用于输入流以检测潜在匹配。要针对模式序列运行事件流,您必须创建一个 PatternStream
。给定一个输入流 input
,一个模式 pattern
和一个可选的比较器, comparator
用于在 EventTime 的情况下对具有相同时间戳的事件进行排序,或者在同一时刻到达, PatternStream
通过调用来创建:
DataStream<Event> input = ...
Pattern<Event, ?> pattern = ...
EventComparator<Event> comparator = ... // optional
PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
val input : DataStream[Event] = ...
val pattern : Pattern[Event, _] = ...
var comparator : EventComparator[Event] = ... // optional
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)
输入流可以被 键入 或 不带键的 取决于你的使用情况。
注意在非被 Key 化的数据流上应用模式将导致并行度等于 1 的作业。
从模式中选择
获得 a 后, PatternStream
您可以通过 select
或 flatSelect
方法从检测到的事件序列中进行选择。
该 select()
方法需要 PatternSelectFunction
实现。A PatternSelectFunction
具有 select
为每个匹配事件序列调用的方法。它 Map<String, List<IN>>
以键的形式接收匹配,其中键是模式序列中每个模式的名称,值是该模式的所有已接受事件的列表( IN
是输入数据元的类型)。给定模式的事件按时间戳排序。返回每个模式的接受事件列表的原因是当使用循环模式(例如 oneToMany()
和 times()
)时,对于给定模式可以接受多个事件。选择函数只返回一个结果。
class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
@Override
public OUT select(Map<String, List<IN>> pattern) {
IN startEvent = pattern.get("start").get(0);
IN endEvent = pattern.get("end").get(0);
return new OUT(startEvent, endEvent);
}
}
A PatternFlatSelectFunction
类似于 PatternSelectFunction
,唯一的区别是它可以返回任意数量的结果。为此,该 select
方法有一个附加 Collector
参数,用于将输出数据元向下游转发。
class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
@Override
public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> collector) {
IN startEvent = pattern.get("start").get(0);
IN endEvent = pattern.get("end").get(0);
for (int i = 0; i < startEvent.getValue(); i++ ) {
collector.collect(new OUT(startEvent, endEvent));
}
}
}
The select()
method takes a selection function as argument, which is called for each matching event sequence. It receives a match in the form of Map[String, Iterable[IN]]
where the key is the name of each pattern in your pattern sequence and the value is an Iterable over all accepted events for that pattern ( IN
is the type of your input elements).
The events for a given pattern are ordered by timestamp. The reason for returning an iterable of accepted events for each pattern is that when using looping patterns (e.g. oneToMany()
and times()
), more than one event may be accepted for a given pattern. The selection function returns exactly one result per call.
def selectFn(pattern : Map[String, Iterable[IN]]): OUT = {
val startEvent = pattern.get("start").get.next
val endEvent = pattern.get("end").get.next
OUT(startEvent, endEvent)
}
The flatSelect
method is similar to the select
method. Their only difference is that the function passed to the flatSelect
method can return an arbitrary number of results per call. In order to do this, the function for flatSelect
has an additional Collector
parameter which is used to forward your output elements downstream.
def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]) = {
val startEvent = pattern.get("start").get.next
val endEvent = pattern.get("end").get.next
for (i <- 0 to startEvent.getValue) {
collector.collect(OUT(startEvent, endEvent))
}
}
处理超时部分模式
每当模式通过 within
关键字附加窗口长度时,部分事件序列可能因为超出窗口长度而被丢弃。为了对这些超时的部分匹配做出反应 select
, flatSelect
API 调用允许您指定超时处理程序。为每个超时的部分事件序列调用此超时处理程序。超时处理程序接收到目前为止由模式匹配的所有事件,以及检测到超时时的时间戳。
为了处理部分模式, select
和 flatSelect
API 调用提供了一个重载版本,它作为参数
PatternTimeoutFunction
/PatternFlatTimeoutFunction
- 将返回与超时匹配的旁路输出的 OutputTag
- 和已知
PatternSelectFunction
/PatternFlatSelectFunction
。 - Java
- Scala
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<ComplexEvent> result = patternStream.select(
outputTag,
new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
new PatternSelectFunction<Event, ComplexEvent>() {...}
);
DataStream<TimeoutEvent> timeoutResult = result.getSideOutput(outputTag);
SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
outputTag,
new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
);
DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
val outputTag = OutputTag[String]("side-output")
val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outputTag){
(pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
} {
pattern: Map[String, Iterable[Event]] => ComplexEvent()
}
val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)
The flatSelect
API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function. In contrast to the select
functions, the flatSelect
functions are called with a Collector
. You can use the collector to emit an arbitrary number of events.
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
val outputTag = OutputTag[String]("side-output")
val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect(outputTag){
(pattern: Map[String, Iterable[Event]], timestamp: Long, out: Collector[TimeoutEvent]) =>
out.collect(TimeoutEvent())
} {
(pattern: mutable.Map[String, Iterable[Event]], out: Collector[ComplexEvent]) =>
out.collect(ComplexEvent())
}
val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)
处理事件时间的延迟
在 CEP
顺序的元件,其中被处理的事项。为了保证在事件时间工作时按正确的顺序处理数据元,传入的数据元最初放在缓冲区中,其中数据元 根据其时间戳按升序排序 ,当水印到达时,此缓冲区中的所有数据元都包含在处理小于水印的时间戳。这意味着水印之间的数据元按事件时间顺序处理。
注意在事件时间工作时,库假定水印的正确性。
为了保证跨越水印的数据元按事件时间顺序处理,Flink 的 CEP 库假定 水印的正确性 ,并将其视为时间戳小于上次看到的水印的 后期 数据元。后期数据元不会被进一步处理。此外,您可以指定 sideOutput 标记来收集最后看到的水印之后的后期数据元,您可以像这样使用它。
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
OutputTag<String> lateDataOutputTag = new OutputTag<String>("late-data"){};
SingleOutputStreamOperator<ComplexEvent> result = patternStream
.sideOutputLateData(lateDataOutputTag)
.select(
new PatternSelectFunction<Event, ComplexEvent>() {...}
);
DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
val lateDataOutputTag = OutputTag[String]("late-data")
val result: SingleOutputStreamOperator[ComplexEvent] = patternStream
.sideOutputLateData(lateDataOutputTag)
.select{
pattern: Map[String, Iterable[ComplexEvent]] => ComplexEvent()
}
val lateData: DataStream<String> = result.getSideOutput(lateDataOutputTag)
例子
以下示例检测 start, middle(name = "error") -> end(name = "critical")
被 Keys 化数据流上的模式 Events
。事件由他们的 id
s 键入,有效模式必须在 10 秒内发生。整个处理是在事件时间完成的。
StreamExecutionEnvironment env = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Event> input = ...
DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
@Override
public Integer getKey(Event value) throws Exception {
return value.getId();
}
});
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("error");
}
}).followedBy("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("critical");
}
}).within(Time.seconds(10));
PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);
DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) throws Exception {
return createAlert(pattern);
}
});
val env : StreamExecutionEnvironment = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val input : DataStream[Event] = ...
val partitionedInput = input.keyBy(event => event.getId)
val pattern = Pattern.begin[Event]("start")
.next("middle").where(_.getName == "error")
.followedBy("end").where(_.getName == "critical")
.within(Time.seconds(10))
val patternStream = CEP.pattern(partitionedInput, pattern)
val alerts = patternStream.select(createAlert(_)))
从较旧的 Flink 版本迁移(1.3 之前版本)
迁移到 1.4+
在 Flink-1.4 中,CEP 库与<= Flink 1.2 的向后兼容性被删除。遗憾的是,无法恢复曾经使用 1.2.x 运行的 CEP 作业
迁移到 1.3.x.
Flink-1.3 中的 CEP 库附带了许多新函数,这些函数导致了 API 的一些变化。在这里,我们描述了您需要对旧 CEP 作业进行的更改,以便能够使用 Flink-1.3 运行它们。完成这些更改并重新编译作业后,您将能够从使用旧版本作业的保存点恢复执行, 即 无需重新处理过去的数据。
所需的更改是:
- 更改条件(
where(...)
子句中的条件)以扩展SimpleCondition
类而不是实现FilterFunction
接口。 - 将作为参数提供的函数更改为
select(...)
和flatSelect(...)
方法,以期望与每个模式关联的事件列表(List
inJava
,Iterable
inScala
)。这是因为通过添加循环模式,多个输入事件可以匹配单个(循环)模式。 - 将
followedBy()
在 Flink1.1 和 1.2 隐含的non-deterministic relaxed contiguity
(见 这里 )。在 Flink 1.3 中,这已经改变并followedBy()
隐式relaxed contiguity
,followedByAny()
如果non-deterministic relaxed contiguity
Required 则应该使用。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论