一、前言
最近,项目(Java编写的Spring boot API)引入了RXJava,使用了它的异步性操作以及它的一些流式写法。
使用过程中,有过疑惑,有过释然,有过吐槽,有过赞赏。学习应该是一个追根溯源的过程,而不是随便了解一下大致的用法就不了了之,故以此记之。
二、响应式编程
响应式编程是一种面向数据流和变化传播的编程范式,数据更新是相关联的。
在交互式编程中,A = B + C这样的表达式意味着将B与C之和赋给A,而此后B与C的改变都与A无关。
而在响应式编程中,A会去”响应”B或C的变化,即一旦B或C改变之后,A的值也会随之变化。
三、定义
我的理解是,RxJava本质上是一个异步操作库,是一个能让你用极其简洁的逻辑去处理繁琐复杂任务的异步事件库。(其实初学者把握两点就好了:观察者模式和异步)
四、背景
- Rx的全称是Reactive Extensions。直译过来就是响应式扩展。
- Rx基于观察者模式,是一种编程模型。其目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。
- Rx最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源。
- Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。
- RxJava是 ReactiveX 在JVM上的一个实现。
五、为什么?
- 函数式风格
- 简化代码
- 异步错误处理
- 轻松使用并发
六、基本概念及使用介绍
Hello World
说了很多,其实对RxJava是什么,其实还不是很明确,我们先来一个Hello world。
Observable
.create(new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello World");
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println(s);
}
});
简化下来就是:
Observable.create(subscriber -> {
subscriber.onNext("Hello World!");
}).subscribe(System.out::println);
执行的过程就是创建、监听、订阅。
相关概念
- Observable:被观察者
- Observer: 观察者
- Subscriber:观察者,implements Observer。
- OnSubscribe:一个接口类,是连接被观察者和观察者的桥梁,另外要说明的是onSubscribe是Observable的一个局部变量。
使用
第1步:创建观察者Observer
Observer<Object> observer = new Observer<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object s) {
}
};
第2步:创建被观察者Observable
Observable.create()方法可以创建一个Observable, 使用create()创建Observable需要一个OnSubscribe对象,这个对象继承Action1。 当观察者订阅我们的Observable时,它作为一个参数传入并执行call()函数。
Observable<Object> observable = Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
}
});
除了create(),just()和from()同样可以创建Observable。看看下面两个例子:
just(T…)将传入的参数依次发送
Observable observable = Observable.just("One", "Two", "Three");
//上面这行代码会依次调用
//onNext("One");
//onNext("Two");
//onNext("Three");
//onCompleted();
from(T[])/from(Iterable<? extends T>)将传入的数组或者Iterable拆分成Java对象依次发送
String[] parameters = {"One", "Two", "Three"};
Observable observable = Observable.from(parameters);
//上面这行代码会依次调用
//onNext("One");
//onNext("Two");
//onNext("Three");
//onCompleted();
第3步:被观察者Observable订阅观察者Observer
你没看错,不同于普通的观察者模式,这里是被观察者订阅观察者:
observable.subscribe(observer);
连起来就是:
Observable.create(new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello World!");
subscriber.onCompleted();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("Done");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String t) {
System.out.println(t);
}
});
这里表面是被观察者订阅观察者,是为了更好的链式结构。 实际上,过程如下:
- 创建一个被观察者,并传入一个OnSubcribe。
- 创建一个观察者Subscriber。
- 一旦发生subscribe时,会自动触发上述的OnSubcribe的call方法。
- 而OnSubcribe的call方法中的参数subscriber就是第二步中的被观察者Subscriber。
- 然后就调用上述代码中的
subscriber.onNext("Hello World!");
- 也就是真实上述代码中
Override
的onNext()
方法了。 所以本质上来讲,还是被观察者执行订阅操作的。
七、操作符
有了对上一步调用的理解,再去看源代码中操作符的实质,就很简单了。
RxJava的操作符分为如下几类:
- 转换类操作符 map、flatMap、concatMap等等
- 过滤类操作符 filter、find等等
- 组合类操作符 concat、merge、zip等等
- 数学类操作符 count、max等等
- 布尔类操作符 includes、some等等
对于这些操作符的调用分析,读者可以自己查看源代码。 对于操作符的用法,可以参考Github TDD 练习。 对于操作符形象化的理解,可以参考一个交互式的动画网站http://rxmarbles.com/