RxJava
创建被观察者
Observable
1 | public abstract class Observable<T> implements ObservableSource<T> {...} |
Observable是一个抽象类,实现了ObservableSource接口
ObservableSource
1 | public interface ObservableSource<T> { |
ObservableSource接口中只有一个subscribe方法。
Observable#create()
1 | public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { |
observableOnSubscribe接口
1 | public interface ObservableOnSubscribe<T> { |
将参数source传入ObservableCreate的构造方法中创建ObservableCreate类型对象,并将该对象传入RxJavaPlugins.onAssembly(Observable
ObservableCreate
1 | public final class ObservableCreate<T> extends Observable<T> { |
可以看出,ObservableCreate的构造函数仅仅是将传入的参数赋值给成员变量进行保存。
RxJavaPlugins#onAssembly()
1 | public static <T> Observable<T> onAssembly( Observable<T> source){ |
可见,在一般情况下,这个方法仅仅是将参数返回。
小结
至此,我们已经知道了创建被观察者的代码流程。在create方法中,传入的是ObservableOnSubscribe接口的实现,该接口只有一个方法。而返回的是Observable的子类,实际上也就是ObservableCreate对象。
可以发现,在这里使用了适配器模式。将ObservableOnSubscribe的实现传入ObservableCreate类中,使其转换为目标类,且同时增添了功能。单从增加功能这一个角度看,很像装饰模式,但是装饰模式的前提是不改变接口。而从ObservableOnSubscribe接口到ObservableCreate类的转换已经将接口改变了,所以不符合装饰模式的定义,而是更加符合适配器模式,即在不改变原先代码的前提下,使得现有的代码适配新的接口。
订阅过程
具体逻辑是:被观察者调用subscribe方法订阅,这时应先传入观察者。然后被观察者会调用观察者的onSubscribe方法通知观察者订阅成功,同时,被观察者也会调用我们自己实现的ObservableOnSubscribe接口的subscribe方法,这个方法持有观察者的引用,可以对观察者进行操作。
具体实现如下:
Observable#subscribe()
1 | public final void subscribe(Observer<? super T> observer) { |
注释1处
RxJavaPlugins#onSubscribe()
1 |
|
应用了hook技术,在一般情况下,仅仅是将传入的参数返回。
注释2处
Observable#subscribeActual()
这是一个抽象方法。具体的实现在Observable的子类,在本例中具体的实现在ObservableCreate类对象中。
1 | protected abstract void subscribeActual(Observer<? super T> observer); |
ObservableCreate#subscribeActual()
1 |
|
注释1处
CreateEmitter
1 | static final class CreateEmitter<T> |
CreateEmitter继承了AtomicReference
Atomic家族主要是保证多线程环境下的原子性,相比synchronized而言更加轻量级。比较常用的是AtomicInteger,作用是对Integer类型操作的封装,而AtomicReference作用是对普通对象的封装。
且CreateEmitter实现了ObservableEmitter
CreateEmitter
Disposable
1 | public interface Disposable { |
注释2处
observer.onSubscribe(parent);
observer是Observable的subscribe调用方法subscribeActual传进来的观察者参数,实际上是我们自定义的观察者对象。
observer的onSubscribe也是我们自定义observer时需要实现的。
1 | void onSubscribe(@NonNull Disposable d); |
其参数为Disposable类型,CreateEmitter实现了Disposable接口,所以可以进一步看出,我们向CreateEmitter构造函数传入observer,使其可以适配Disposable接口。
该方法是在被观察者的subscribe方法里的观察者的回调方法,可以用来通知观察者订阅成功。CreateEmitter类型的参数parent,是对observer功能的扩展,使其具备一些如中断处理的功能,这样就可以在观察者收到订阅成功的消息后,进行进一步的操作。
注释3处
source.subscribe(parent)
source也就是ObservableCreate类保存的observableOnSubscribe接口实例。
observableOnSubscribe#subscribe
1 | void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception; |
这个方法中需要传入ObservableEmitter类型的参数,即实现了ObservableEmitter的CreateEmitter类型对象parent。
observableOnSubscribe的subscribe方法也是我们自己定义的,通常会在其中调用emitter.onNext()和 emitter.onComplete()方法。
注意区分subscribe()方法和onSubscribe()方法,前者属于被观察者,后者属于观察者。
emitter.onNext()方法具体的实现在CreateEmitter类中。
这里的observer就是我们在构造CreateEmitter对象时传入的我们自定义的观察者。它的onNext()和onComplete()方法是我们自定义观察者时实现的。
1 | public void onNext(T t) { |
1 | public void onComplete() { |
可以看到这两个方法中都有isDisposed()的判断
CreateEmitter#isDisposed()
1 | public boolean isDisposed() { |
AtomicReference#get()
1 | private volatile V value; |
由于我们没有设置value,所以返回值为null
DisposableHelper#isDisposed()
1 | public enum DisposableHelper implements Disposable { |
此例中d == null,所以默认返回false。
isDisposed()方法是用来判断当前事件是否被切断,在未进行任何设置的情况下,事件默认是不会被切断的。
在CreateEmitter类中,onNext()和onComplete()方法会先判断事件是否被切断,如果被切断了,就不会继续执行方法中的代码。如果事件没有被切断,onNext()和onComplete()方法会正常执行,onError()和onComplete()方法最后会执行dispose()方法。这个方法会将事件切断。
CreateEmitter#dispose()
1 | public void dispose() { |
DisposableHelper#dispose()
1 | public static boolean dispose(AtomicReference<Disposable> field) { |
AtomicReference#getAndSet()
1 | public final V getAndSet(V newValue) { |
AtomicReference的getAndSet方法通过调用unsafe类的getAndSetObject的CAS操作,保证了操作的原子性,解决了并发操作的读写问题。
线程切换
线程切换主要分为subscribeOn()和observeOn()方法
Observable#subscribeOn()
1 | public final Observable<T> subscribeOn(Scheduler scheduler) { |
如果对上面讲过的内容有印象,可以发现,这个方法和Observable的create方法的结构和形式十分相似。
这里需要传入Scheduler类型的参数。我们通常通过Schedulers类来获取Scheduler类型的子类。
常见的Scheduler类型如下
Scheduler类型 | 使用方式 | 含义 | 使用场景 |
---|---|---|---|
IoScheduler | Schedulers.io() |
io操作线程 | 读写SD卡文件,查询数据库,访问网络等IO密集型操作 |
NewThreadScheduler | Schedulers.newThread() |
创建新线程 | 耗时操作等 |
SingleScheduler | Schedulers.single() |
单例线程 | 只需一个单例线程时 |
ComputationScheduler | Schedulers.computation() |
CPU计算操作线程 | 图片压缩取样、xml,json解析等CPU密集型计算 |
TrampolineScheduler | Schedulers.trampoline() |
当前线程 | 需要在当前线程立即执行任务时 |
HandlerScheduler | AndroidSchedulers.mainThread() |
Android主线程 | 更新UI等 |
Schedulers.io()是我们常用的调度器类型,所以接下来看一下它的具体实现
Schedulers#io()
1 | public static Scheduler io() { |
Schedulers#IO
1 | static { |
Schedulers#IOTask
1 | static final class IOTask implements Callable<Scheduler> { |
Schedulers#IoHolder.DEFAULT
1 | static final class IoHolder { |
RxJavaPlugins主要是执行一些hook操作,可以忽视。
所以Schedulers.io()实际是返回了一个IoScheduler类型的对象。
注释1处
new ObservableSubscribeOn
ObservableSubscribeOn
1 | public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { |
在Observable的subscribeOn方法中,将其自身传入ObservableSubscribeOn方法作为第一个参数。因为Observable实现了ObservableSource接口。
**super(source)**是调用父类构造函数,将source保存在父类的成员变量中。
可以看到这个ObservableSubscribeOn类存储了调度器的类型,以及ObservableSource接口的实例。
最开始有讲过RxJavaPlugins.onAssembly()方法,它需要接收一个Observable类型的对象。而ObservableSubscribeOn类也是Observable的子类,所以可以将ObservableSubscribeOn类型的对象传入RxJavaPlugins.onAssembly()方法中。一般情况下,RxJavaPlugins.onAssembly()方法只是将传入的参数返回。
Observable.subscribeOn()方法返回的是一个ObservableSubscribeOn类型的对象。它也是Observable的子类。
所以,我们可以用它调用subscribe()订阅方法。它的具体实现在Observable类中。之前已经讲过。
但与之前不同的是,这个方法中调用的这个方法:**subscribeActual(observer);**,它的具体实现要交由子类完成,而在这里,实际是调用了ObservableSubscribeOn类的该方法。
ObservableSubscribeOn#subscribeActual()
1 | public void subscribeActual(final Observer<? super T> observer) { |
SubscribeOnObserver是ObservableSubscribeOn的内部类
SubscribeOnObserver
1 | static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { |
可以看出,这个类和CreateEmitter类形式很相似。但最大的区别是,这个类具有AtomicReference
注释2
observer.onSubscribe(parent)
依然是一个回调方法。可以通知观察者订阅成功
注释3
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
这里的逻辑较为复杂,先给出一个结论,它最后会执行SubscribeTask类的run()方法。
1 | public void run() { |
要理解 source.subscribe(parent),先要理解下面具体流程的逻辑图:
注意区分,ObservableSubscribeOn的成员变量source指的是ObservableCreate对象,而ObservableCreate对象也有一个成员变量source,这个source指的是Observable在create时传进来的我们自定义的ObservableOnSubscribe。
ObservableSubscribeOn和ObservableCreate都没有重写subscribe方法,所以我们在调用subscribe时,其实会调用从Observable类继承来的subscribe方法。
这样看来,其实有点像递归操作,在一个方法中调用它本身。
但这个递归应该是有出口的,否则它就是一个死循环。那它的出口是什么呢?其实就是当source为我们实例的ObservableOnSubscribe接口。因为ObservableOnSubscribe中也有subscribe方法,我们无需再去调用Observable的subscribe方法,而是调用我们自定义的subscribe方法。
所以图中的1处,还会调用Observable的subscribe方法,直到碰到source为ObservableOnSubscribe时,便可以不再调用Observable的subscribe方法,即图中的2处。
ObservableSubscribeOn中持有Scheduler类型的成员变量,它将会在subscribeActual()方法的最后执行线程的切换操作。即source.subscribe(parent);会在Scheduler指定的线程中执行。这就是为什么subscribeOn()可以改变被观察者发送消息的线程。
若我们多次调用subscribeOn()方法会出现什么情况?
由于每一个ObservableSubscribeOn中持有Scheduler类型的成员变量,所以它的subscribeActual()方法会根据Scheduler进行相应的线程切换。
图中的圆圈1,2表示线程1,2。
最外层的ObservableSubscribeOn(即拥有线程2的对象)执行subscribe方法时,会将线程切换到线程2,然后继续调用倒数第二层ObservableSubscribeOn方法,依次类推,它也会执行subscribe方法时,会将线程切换到线程1。然后继续执行倒数第三层的ObservableCreate的subscribe方法。但ObservableCreate不持有Scheduler类对象,也不会对线程进行切换,所以,此时的线程为线程1。
所以多次调用subscribeOn()方法,只有第一个subscribeOn()会对被观察者造成影响,其他的subscribeOn()也会造成线程切换,只是使用者感受不到。
subscribeOn()只影响其最近上游所在的线程。
它主要影响的是自定义被观察者的subscribe方法。
doOnSubscribe()
doOnSubscribe()方法执行所在线程由其后面的subscribeOn()来指定;若没有subscribeOn()指定,则执行的线程和本身所在线程一致。
它和subscribeOn方法执行逻辑很像,只是它不会切换线程。
1 | public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe) { |
subscribeOn 作用于该操作符之前的 Observable 的创建操符作以及 doOnSubscribe 操作符 ,换句话说就是 doOnSubscribe 以及 Observable 的创建操作符总是被其之后最近的 subscribeOn 控制
Observable#observeOn()
通常用第一个方法,向其中传入调度器
1 | public final Observable<T> observeOn(Scheduler scheduler) { |
可以看出,最终会将其传入ObservableObserveOn中保存
ObservableObserveOn#subscribe
它的subscribe逻辑和之前提到的ObservableSubscribeOn的subscribe逻辑很像。都会先调用Observable的subscribe方法。而对于subscribeActual方法,不同Observable子类有不同逻辑。
ObservableObserveOn#subscribeActual
1 | protected void subscribeActual(Observer<? super T> observer) { |
source.subscribe(new ObserveOnObserver
主要是将observer进行进一步的传递与保存。
然后就是调用source(在这里也就是ObservableCreate)的subscribe方法。这个流程和前面讲过的一样。继续将observer进行进一步的传递与保存。
最终会调用自定义被观察者的subscribe方法。在这个方法中,假设我们调用了observer的onNext()方法。将会对之前进行层层包裹的observer进行调用。
首先,将会调用CreateEmitter的onNext()方法
CreateEmitter#onNext()
1 | public void onNext(T t) { |
正常情况下,会调用它保存的observer的onNext(),在此处是ObserveOnObserver的onNext()
ObserveOnObserver#onNext()
1 | public void onNext(T t) { |
这个方法较为复杂。结果是它会先切换线程,然后调用它保存的observer的onNext()。此处也就是调用我们自定义的observer的onNext()。
这个过程的具体流程图如下:
若我们多次调用observeOn()方法会出现什么情况?
相比一次调用observeOn(),多次调用只是进行了更多的线程切换,但只有离自定义的observe最近的会对它造成影响。
Retrofit与RxJava
RxJava和Retrofit的组合是安卓开发里很常用的网络请求方法。
基本用法如下:
1 | Api service = retrofit.create(Api.class); |
1 | observable.subscribeOn(Schedulers.io()) |
这里的observable是CallExecuteObservable类型的。作为Observable的子类,我们依旧关注它对subscribeActual方法的实现。
CallExecuteObservable#subscribeActual()
1 | protected void subscribeActual(Observer<? super Response<T>> observer) { |
注释1
Response
又见到了熟悉的call.execute(),这个是我们在不用RxJava时,常用的开启网络请求的方法。它只是被包装在Observable中,最终还是会被调用来进行网络请求。