12.1 Reactive 编程
本节首先介绍响应式编程范式,以及基于该范式的 Reactive Streams 标准 API,最后给出基于 Reactive Streams 实现的 JDK 9 的 Flow 代码示例。
12.1.1 响应式宣言
说到响应式编程,就不能不提 The Reactive Manifesto(响应式宣言)。几年前,一个大型应用系统可能会部署在几百台服务器上,响应时间为秒级,每天产生 GB 级的数据。随着移动设备的普及,应用程序需要部署在数以千计或万计的云端集群上,用户对响应时间的需求也提高到了毫秒级,每天产生的数据也达到了 PB 级,这对当今的系统架构提出了新的挑战。基于此,一些组织开发出了响应式系统。响应式系统具有 4 个特性,如图 12.1 所示。

图 12.1 响应式系统的特性
- 可响应:系统尽可能地响应。
- 可恢复:系统出错的情况下也可以响应。
- 可伸缩:系统在各种负载下都可以响应。
- 消息驱动:系统通过异步传递消息。
以上 4 个特性就组成了响应式宣言,为响应式编程指明了方向。响应式系统就是以事件驱动,打造可伸缩、可恢复、能实时响应的应用程序。
12.1.2 Reactive 编程简介
关于响应式编程,百度百科中是这样解释的:
在计算机领域,响应式编程是一个专注于数据流和变化传递的异步编程范式。这意味着可以使用编程语言很容易地表示静态(如数组)或动态(如事件发射器)数据流,在执行过程中数据流之间有一定的关系,关系的存在有利于数据流的自动变更。
上面的解释是不是不太好理解?我们具体分析一下。首先,响应式编程是一个编程范式,是一种编程规范,和我们平时开发中的声明式编程、命令式编程、函数式编程一样。其次,从过去的面向过程开发到 Java 提出的面向对象开发,响应式编程代表未来的发展方向——面向流开发。因此我们总结出响应式编程的定义是:一种面向数据流的响应式编码方式。
注意:Reactive Programming = Streams + Operations。其中,Streams 代表被处理的数据节点,Operations 代表那些异步处理函数。
12.1.3 Reactive Streams 标准
既然有了编程规范,就需要定义一套 API 协议标准。2013 年,Netflix、Pivotal 和 Lightbend 的工程师们启动了 Reactive Streams 项目。Reactive Stream(响应式流)是一套标准,是一套基于发布/订阅模式的数据流处理规范。对于开发人员来说,它其实就是一个 API 规范,具有异步非阻塞背压特性,异步非阻塞可以在同等资源下给出更快的响应。
举个直观的例子可以帮助读者更好地理解响应式数据流。现代前端开发框架如 Vue.js 和 React 等实现了双向数据绑定,在一个输入框内修改数据,可以同步在另一个组件中展示。也就是一个组件的数据值可以基于另一个组件的数据变化做出响应,这就是响应式。
在传统的命令式编程中,假设定义 c = a*b,那么当 a=1、b=2 时,c 的值就是 2。之后 a 变量的改变不会引起 c 变量的变化,因为它们都是确定的。如果 a、b 的值是不确定的,即 c=a*b,这个语句仅仅是定义了变量 c 与变量 a、b 的计算关系,那么 c 的值就是可变的。例如:
a=1,b=1,c=1 a=2,b=2,c=4 a=3,b=2,c=6 ...
简而言之,c 需要动态地由 a、b 共同来决定,当 a、b 的值发生变化时,c 的结果需要及时地做出响应(或者叫反应),以此来保证正确性。变化的 a、b 相当于数据流,c 要根据数据流的变化做出正确的响应,这就是 Reactive Streams(响应式流)。
12.1.4 Java Flow API 简介
基于 Reactive Streams 实现的响应式框架有 RxJava、Reactor、Akka 和 Vert.x 等。2017 年,Java JDK 9 发布,其中一个特性就是引入了基于 Reactive Streams 的 Flow 类。
Flow API 是基于发布者/订阅者模式提供的推(push)和拉(pull)的模型,如图 12.2 所示。

图 12.2 发布者/订阅者模型
基于发布/订阅模型的 Flow 更像是迭代器模式与观察者模式的组合。迭代器模式是拉(pull)模型,告诉数据源要拉取多少数据,观察者模式是推(push)模型,将数据推送给订阅者。Flow 订阅者最初请求(拉)N 个数据,然后发布者将最多 N 个数据推送给订阅者。
Flow 类中定义了 4 个嵌套的静态接口,如表 12.1 所示。
表 12.1 Flow 中定义的 4 个静态接口

下面介绍 Flow 的相关 API,并给出一些实际的例子。
1. 基于 Publisher 与 Subscriber 的示例
Flow.Subscriber 有 4 个抽象方法:
- onSubscribe():发布者调用该方法异步传递订阅。
- onNext():发布者调用该方法传递数据。
- onError():发生错误时调用。
- onComplete():数据发送完成后调用。
Sbscription 的 request() 和 cancel() 方法提供的背压特性,让订阅者可以告诉发布者能接收的最大数据量,还可以取消订阅,这样不至于因发布者速度过快而导致订阅系统崩溃。示例如下:
public class PublisherAndSubscriberDemo {
public static void main(String[] args) throws InterruptedException {
//发布者
SubmissionPublisher<String> publisher=new SubmissionPublisher<>();
//订阅者
Flow.Subscriber<String> subscriber=new Flow.Subscriber<String>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription=subscription;
subscription.request(1);
}
//传递数据
@Override
public void onNext(String item) {
System.out.println("【订阅者】接收消息: " + item);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.subscription.request(1);
}
//异常处理
@Override
public void onError(Throwable throwable) {
System.out.println("【订阅者】数据接收出现异常," + throwable);
this.subscription.cancel();
}
//发送结束处理
@Override
public void onComplete() {
System.out.println("【订阅者】数据接收完毕");
}
};
publisher.subscribe(subscriber);
for (int i=0;i<5;i++){
String message = "hello flow api " + i;
System.out.println("【发布者】发布消息: " + message);
publisher.submit(message);
}
publisher.close();
Thread.currentThread().join(20000);
}
}控制台的打印结果如下:
【发布者】发布消息: hello flow api 0 【发布者】发布消息: hello flow api 1 【发布者】发布消息: hello flow api 2 【发布者】发布消息: hello flow api 3 【发布者】发布消息: hello flow api 4 【订阅者】接收消息: hello flow api 0 【订阅者】接收消息: hello flow api 1 【订阅者】接收消息: hello flow api 2 【订阅者】接收消息: hello flow api 3 【订阅者】接收消息: hello flow api 4 【订阅者】数据接收完毕
2. Processor 示例
Processor 扩展了 Publisher 和 Subscriber,因此它可以在 Publisher 和 Subscriber 之间来回切换。Processor 的示例如下:
public class MyProcessor extends SubmissionPublisher<Integer>
implements Flow.Processor<Integer, Integer>{
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Processor 收到订阅请求");
this.subscription = subscription;
this.subscription.request(1);
}
//传递数据
@Override
public void onNext(Integer item) {
System.out.println("onNext 收到发布者数据: "+item);
if (item % 2 == 0) {
this.submit(item);
}
this.subscription.request(1);
}
//处理异常
@Override
public void onError(Throwable throwable) {
this.subscription.cancel();
}
//结束处理
@Override
public void onComplete() {
System.out.println("处理器处理完毕");
this.close();
}
}ProcessorDemo 代码如下:
public class ProcessorDemo {
public static void main(String[] args) throws InterruptedException {
SubmissionPublisher<Integer> publisher = new Submission
Publisher<>();
MyProcessor myProcessor = new MyProcessor();
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
//数据处理
@Override
public void onNext(Integer item) {
System.out.println("onNext 从 Processor 接收到过滤后的数
据 item : "+item);
this.subscription.request(1);
}
//处理异常
@Override
public void onError(Throwable throwable) {
System.out.println("onError 出现异常");
subscription.cancel();
}
//结束处理
@Override
public void onComplete() {
System.out.println("onComplete 所有数据接收完成");
}
};
publisher.subscribe(myProcessor); //发布
myProcessor.subscribe(subscriber); //订阅
publisher.submit(1);
publisher.submit(2);
publisher.submit(3);
publisher.submit(4);
publisher.close();
TimeUnit.SECONDS.sleep(2);
}
}最终打印结果如下:
Processor 收到订阅请求
onNext 收到发布者数据: 1
onNext 收到发布者数据: 2
onNext 收到发布者数据: 3
onNext 收到发布者数据: 4
处理器处理完毕
onNext 从 Processor 接收到过滤后的数据 item : 2
onNext 从 Processor 接收到过滤后的数据 item : 4
onComplete 所有数据接收完成绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论