我承认这篇文章有标题党的嫌疑,看完这篇文章并不会让你月薪两万。如果想月薪两万甚至更多,并不是靠一篇文章,一本书,一个项目来实现的。
但是一个合格的程序员对响应式编程多少都应该有些了解,甚至有个清楚的认识。
希望这篇文章能够让你对响应式编程有个基本的认识,以及响应式编程会带来哪些好处,解决哪些问题,或者说为什么响应式编程如此重要。
响应式编程的概念是微软最开始提出并且在.NET平台上实现的一个库。后来这个模型被大家接受并认可,ReactiveX 就实现了很多其它语言对应的库,大名鼎鼎的RXJAVA就是针对Java语言实现的。
后来ReactiveX 和 Reactor共同制定了Reactive Stream标准,ReactiveX和Reactor都是在这个标准下实现的框架。Spring5 正式引入Reactor 并基于该框架实现了WEB-FLUX。
此外Java8引进了Stream流以及lamada表达式,Java9引入了Flow,也是对响应式编程的一种支持。
reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change
这是维基百科对响应式编程给出的定义。我对这个定义的评价是,懂得自然懂,不懂的还是不懂。
我提炼一下这个定义的关键词 声明式,数据流, 传递变化(响应),我自己再给加一个异步,因为实际上生产级别代码都是进行异步响应的,几乎很少进行同步响应。
在具体介绍响应式编程之前,先简单解释一下这几个关键词的含义。
声明式和指令式对应。指令式大家比较熟悉,就是依次写出完成某个任务的每条指令。
比如从一个苹果列表里,找出所有红色的苹果,指令式编程是这样做的。
List<Apple> apples = new ArrayList<Apple>();
for (Apple apple : apples){
if (apple.getColor() == "red"){
System.out.println(apple);
}
}
声明式编程,只要写出你想要什么就OK了。
典型的声明式语言的就是sql,对应上面的找红色苹果的需求,应该是这样的 select * from apple where color = red。
简单的讲,声明式编程就是聊天式编程,和计算机说你想要什么就OK了。
再说说数据流,其实数据流可以把它想象成水流,里面流淌的是数据,事件,信号等内容。如果大家对Java8引入的Stream流有一定了解的话,就会好理解。如果不了解的可以通过我这篇文章做个入门。
传递变化(响应),其实就是对响应二字的体现。所谓的响应就是你和某个人打了招呼,然后某人回应你了。某人对你的回应就是响应。
将上面的场景对应到面向对象的编程里面,就是观察者(订阅)模式。观察者对被观察者的某些行为做出对应的动作。
有些前端程序员对观察者模式可能比较陌生,那么大家比较熟悉的Ajax回调函数也是响应式编程的一种体现,比如如下JS代码
$.ajax("example.action")
.done(function(){
console.log("success")
})
.fail(function(){
console.log("error")
})
.always(function(){
console.log("complete")
});
这就是典型的异步回调,当请求成功的时候会有一种响应动作,请求失败的时候会有另一种响应动作。
关于响应的方式,有同步响应和异步响应。实际应用中大部分都会采用异步响应。
同步:你给旅行社打电话预定一张机票,接线员接到你的电话后,开始查询航班信息,然后进行预订,这期间你一直拿着电话等他的结果。
异步:你给旅行社打电话预定一张机票,接线员接到电话后,记录下你要预定的航班信息,然后就挂掉电话。等他预定好之后,把预定结果打电话告诉你。这就是异步。
很明显异步操作对你来说效率更高,因为你不用一直等接线员的操作,你可以干其他事情。
上面的场景也被很多人称为好莱坞规则。很多好演员去好莱坞报名拍戏,经纪公司会登记下演员的姓名,等有合适的机会的时候,经纪公司会给演员打电话,而不用演员一直在现场等,或者不断的给经纪公司打电话询问。 don't call me I will call you。
其实介绍完上面那些东西,可能对响应式编程的理解还是模糊的。那么我们就以Reactor框架为例子做一个简单的说明。毕竟程序员都喜欢show me the code。
上面提到了响应式编程的核心是基于观察者(订阅)模式的。观察者观察被观察者的行为,根据不同的行为做出不同的响应行为。
在Reactor框架中用两个类来表示Publisher,分别是Flux和Mono。Flux表示0...N个元素序列;Mono表示零或一个元素序列。
Flux/Mono可以发布三类值 正常数值,异常信号,完成信号。三类信号不会同时存在,最多同时发布两类信号。
举个例子,我们假设让Flux发射一个1-6的6个整数的数字流,6个数字流发送完成后,会紧跟着发送一个完成信号,告诉订阅者或者观察者,数据流完成。同样的,如果发送正常数据的过程中出现异常,也可以发送一个异常信号给订阅者或者观察者,表示出现异常,将停止发送。异常信号和完成信号不能同时存在,因为出现任何一个该数据流都将结束。但是信号流里面可以即没有异常信号也没有完成信号,这表示该流是一个无限流。
Flux.just(1,2,3,4,5,6)
上面这行代码表示发布者发布了6条消息,下面我们订阅者6条消息,也就是对这6条消息进行响应。
Flux.just(1,2,3,4,5,6).subscribe(System.out::print)
在控制台将会打印出1,2,3,4,5,6。
注意,只有订阅的时候才会对事件或者元素进行响应。
上面的例子,我们对元素或者事件没有做任何操作,仅仅是将它们原封不动地打印了出来,这显然不是我们想要的。接下来我们对元素做一些有意义的操作。
map
对数据流里面的每个元素执行一次map里面的函数。示意图如下
代码示例
Flux.range(1,6).map(i -> i*i).subscribe(System.out::println);
将会输出 1 4 9 16 25 36
flatmap
该操作符逻辑上包含两个操作,第一个操作是map操作,第二个是flatten,flatten类似于merge操作,将对每个元素进行映射之后,合并成一个新的流。示意图如下。
代码示例
Flux.just("apple-1","pear-2").flatMap(i ->
Flux.fromArray(i.split("-"))).subscribe(System.out::println);
以上代码将会输出 apple 1 pear 2;
filter
过滤出符合条件的元素。
代码示例
Flux.range(1,6).filter(i -> i>3).subscribe(System.out::println)
以上代码将会输出 4,5,6
zip zip英文单词有拉链的意思,在Reactor中,表示将两个数据流合并到一起。示意图如下。
示例代码
Flux.zip(
Flux.just("A","B","C"),
Flux.just("1","2","3"),
(x,y) -> x + y
).subscribe(System.out::println);
以上代码输出 A1 B2 C3
还有很多操作符这里不一一介绍了,感兴趣的可以看官网。
Reactor自然也是支持多线程的。而且多线程调度很简单。 Reactor中创建线程是通过Scheduler接口来表示的。
//创建一个线程
Scheduler single = Schedulers.single();
//创建等于CPU核心数量的线程
Scheduler parallel = Schedulers.parallel();
//创建有界限的线程池,不传参数的默认创建10倍于CPU核心数量
Scheduler elastic = Schedulers.boundedElastic();
创建了线程,自然要分配线程,也就是线程调度。 切换线程上下文主要通过publishOn()和subscribeOn()两个函数实现。
publishOn()会影响调用该函数之后的操作。而subscribeOn()会从源头影响整个操作链,无论subscribeOn()调用发生在何处。
举个例子:
Flux.just("hello")
.map(s -> {
System.out.println("[map] Thread name: " + Thread.currentThread().getName());
return s.concat(" world!");
})
//只改变publishOn()之后的操作的线程。
.publishOn(Schedulers.newSingle("thread-publishOn"))
.filter(s -> {
System.out.println("[filter] Thread name: " + Thread.currentThread().getName());
return s.startsWith("h");
})
//从源头变整个操作链的线程
.subscribeOn(Schedulers.newSingle("thread-subscribeOn"))
.subscribe(s -> {
System.out.println("[subscribe] Thread name: " + Thread.currentThread().getName());
System.out.println(s);
});
上面的代码输出结果是这样的
[map] Thread name: thread-subscribeOn-1
[filter] Thread name: thread-publishOn-2
[subscribe] Thread name: thread-publishOn-2
hello world!
建议各位把上面这段代码复制到本地运行一下,同时可以把subscribeOn()和publishOn()分别注释掉,感受一下区别。
这两个函数经常用于有阻塞操作的时候,将阻塞操作调度到新的线程,以便提高效率。
响应式编程可以帮助解决两类棘手问题,第一个问题就是大家熟悉的callback hell,第二个问题就是同步阻塞效率低的问题。
先说第一个问题,这里拿reactor官方的例子做个说明,找出某个用户最喜爱的五个爱好。通过Callback的方式实现是这样的。
采用Reactor 响应式编程代码大概应该是这个样子的
可以看到通过采用响应式编程,大大提高了代码的可读性,逻辑表达也更清晰。
再来看第二个问题,同步阻塞通常被认为是低效率的。而异步非阻塞被认为是高效率的。而响应式编程,天生就是异步非阻塞的。
来举个简单例子说明一下,为什么同步阻塞是低效率的而异步非阻塞是高效率的。
同步和异步描述的是服务提供者提供服务的能力。当调用者向服务者发起请求后,服务提供者能够立即返回,并且在处理完后通过某种方式通知调用者,那么就是异步的。相反如果服务提供者只在处理完之后才返回,或者要求调用者主动去查询处理结果,就是同步。
阻塞和非阻塞描述的是调用者的状态。当调用者向服务提供者发起请求后,一直等待处理结果返回,否则无法执行后续操作,就是阻塞状态。如果调用后直接返回,继续执行后续操作就是非阻塞状态。
上面提到的打电话的例子就是异步非阻塞的例子,你给旅行社打电话,预定一张机票。旅行社接线员收到你的请求,就立刻给你回复(异步),告诉你请求已经收到,稍后会通知你。然后你就挂掉电话,去处理其他事情(非阻塞),等旅行社预定好之后,会立刻给你打电话通知你结果。
如果是同步阻塞的话,场景应该是这样的,你给旅行社打电话预定机票,接线员接听你的电话,然后处理订票请求,你在电话另一端一直在等待,什么都做不了。更可怕的是,其他旅客的订票请求一直打不进来,因为线路资源一直被你占用。这将是多么低效的处理方式。
响应式编程虽好,但并不是包治百病,首先掌握起来就有一定难度,同时Debug也需要有一定的相关经验。更主要的是,我们要根据业务场景来决定响应式编程是否能给我们带来真正的好处。记住软件工程里面,没有银弹。