• Home
  • About
    • wanziの遇笺 photo

      wanziの遇笺

      一点随笔,一丝感悟,一些记录,一种成长。

    • Learn More
    • Instagram
    • Github
  • Archive
  • Category
  • Tag

RxJava

01 Nov 2016

Reading time ~1 minute

一、前言

  最近,项目(Java编写的Spring boot API)引入了RXJava,使用了它的异步性操作以及它的一些流式写法。
使用过程中,有过疑惑,有过释然,有过吐槽,有过赞赏。学习应该是一个追根溯源的过程,而不是随便了解一下大致的用法就不了了之,故以此记之。

二、响应式编程

  响应式编程是一种面向数据流和变化传播的编程范式,数据更新是相关联的。
在交互式编程中,A = B + C这样的表达式意味着将B与C之和赋给A,而此后B与C的改变都与A无关。
而在响应式编程中,A会去”响应”B或C的变化,即一旦B或C改变之后,A的值也会随之变化。

三、定义

  我的理解是,RxJava本质上是一个异步操作库,是一个能让你用极其简洁的逻辑去处理繁琐复杂任务的异步事件库。(其实初学者把握两点就好了:观察者模式和异步)

四、背景

  • Rx的全称是Reactive Extensions。直译过来就是响应式扩展。
  • Rx基于观察者模式,是一种编程模型。其目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。
  • Rx最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源。
  • Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。
  • RxJava是 ReactiveX 在JVM上的一个实现。

五、为什么?

  • 函数式风格
  • 简化代码
  • 异步错误处理
  • 轻松使用并发

六、基本概念及使用介绍

Hello World

说了很多,其实对RxJava是什么,其实还不是很明确,我们先来一个Hello world。

Observable
    .create(new OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hello World");
        }
    })
    .subscribe(new Subscriber<String>() {
        @Override
        public void onNext(String s) {
            System.out.println(s);
        }
    });

简化下来就是:

Observable.create(subscriber -> {
    subscriber.onNext("Hello World!");
}).subscribe(System.out::println);

执行的过程就是创建、监听、订阅。

相关概念

  • Observable:被观察者
  • Observer: 观察者
  • Subscriber:观察者,implements Observer。
  • OnSubscribe:一个接口类,是连接被观察者和观察者的桥梁,另外要说明的是onSubscribe是Observable的一个局部变量。

使用

第1步:创建观察者Observer

Observer<Object> observer = new Observer<Object>() {

    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Object s) {

    }
};

第2步:创建被观察者Observable

  Observable.create()方法可以创建一个Observable, 使用create()创建Observable需要一个OnSubscribe对象,这个对象继承Action1。 当观察者订阅我们的Observable时,它作为一个参数传入并执行call()函数。

Observable<Object> observable = Observable.create(new Observable.OnSubscribe<Object>() {         
    @Override
    public void call(Subscriber<? super Object> subscriber) {

    }
});

除了create(),just()和from()同样可以创建Observable。看看下面两个例子:

just(T…)将传入的参数依次发送

Observable observable = Observable.just("One", "Two", "Three");
//上面这行代码会依次调用
//onNext("One");
//onNext("Two");
//onNext("Three");
//onCompleted();

from(T[])/from(Iterable<? extends T>)将传入的数组或者Iterable拆分成Java对象依次发送

String[] parameters = {"One", "Two", "Three"};
Observable observable = Observable.from(parameters);
//上面这行代码会依次调用
//onNext("One");
//onNext("Two");
//onNext("Three");
//onCompleted();

第3步:被观察者Observable订阅观察者Observer

你没看错,不同于普通的观察者模式,这里是被观察者订阅观察者:

observable.subscribe(observer);

连起来就是:

Observable.create(new OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello World!");
        subscriber.onCompleted();
    }

}).subscribe(new Subscriber<String>() {

    @Override
    public void onCompleted() {
        System.out.println("Done");
    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onNext(String t) {
        System.out.println(t);
    }
});

这里表面是被观察者订阅观察者,是为了更好的链式结构。 实际上,过程如下:

  • 创建一个被观察者,并传入一个OnSubcribe。
  • 创建一个观察者Subscriber。
  • 一旦发生subscribe时,会自动触发上述的OnSubcribe的call方法。
  • 而OnSubcribe的call方法中的参数subscriber就是第二步中的被观察者Subscriber。
  • 然后就调用上述代码中的subscriber.onNext("Hello World!");
  • 也就是真实上述代码中Override的onNext()方法了。 所以本质上来讲,还是被观察者执行订阅操作的。

七、操作符

有了对上一步调用的理解,再去看源代码中操作符的实质,就很简单了。

RxJava的操作符分为如下几类:

  • 转换类操作符 map、flatMap、concatMap等等
  • 过滤类操作符 filter、find等等
  • 组合类操作符 concat、merge、zip等等
  • 数学类操作符 count、max等等
  • 布尔类操作符 includes、some等等

对于这些操作符的调用分析,读者可以自己查看源代码。   对于操作符的用法,可以参考Github TDD 练习。   对于操作符形象化的理解,可以参考一个交互式的动画网站http://rxmarbles.com/



javaRxJava Share Tweet +1