Observable的变种

Observable及其变种简介

类型 描述
Observable 能够发射0或n个数据,并以成功或错误事件终止。
Flowable 能够发射0或n个数据,并以成功或错误事件终止。 支持Backpressure,可以控制数据源发射的速度。
Single 只发射单个数据或错误事件。
Completable 它从来不发射数据,只处理 onComplete 和 onError 事件。
Maybe 能够发射0(onCompleted())或者1(onSuccess())个数据,要么成功,要么失败。

观察者的回调方法

onNext: 普通事件,将要处理的事件添加到事件队列中

**onCompleted:**事件队列完结。RxJava不仅把每个事件单独处理,还会把它们看成一个队列。当不再有新的onNext时,需调用onCompleted()作为完成的标志

**onError:**事件队列异常。当事件处理过程出现异常时,会调用onError(),同时队列自动终止,不再有事件发出

onCompleted只是一个通知方法,无法进行参数传递

而onNext方法是含有参数的,所以可以进行被观察者到观察者的消息传递

Flowable

Backpressure

在 RxJava 中, 有一种场景,被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息,这样就会产生很多下游没来得及处理的数据,这些数据既不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出。

这就是反压的应用场景。

具体的可以看我的另一篇关于流控的文章。

Flowable#create()

Flowable 的异步缓存池不同于 ObservableObservable的异步缓存池没有大小限制,可以无限制向里添加数据,直至OOM,而 Flowable 的异步缓存池有个固定容量,其大小为128。

1
2
3
4
5
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
ObjectHelper.requireNonNull(source, "source is null");
ObjectHelper.requireNonNull(mode, "mode is null");
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}

BackpressureStrategy 的作用便是用来设置 Flowable 异步缓存池中的存储数据超限时的策略。
BackpressureStrategy 提供了一下几种背压策略:

  • MISSING:这种策略模式下相当于没有指定任何的背压策略,不会对数据做缓存或丢弃处理,需要下游通过背压操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背压策略。
  • ERROR:这种策略模式下如果缓存池中的数据超限了,则会抛出 MissingBackpressureException 异常
  • BUFFER:这种策略模式下没有为异步缓存池限制大小,可以无限制向里添加数据,不会抛出 MissingBackpressureException 异常,但会导致OOM。
  • DROP:这种策略模式下如果异步缓存池满了,会丢掉将要放入缓存池中的数据。
  • LATEST:这种策略模式下与 Drop 策略一样,如果缓存池满了,会丢掉将要放入缓存池中的数据,不同的是,不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中。

RxJava 提供了下面的操作符来指定背压策略。

  • onBackpressureBuffer():对应BUFFER策略
  • onBackpressureDrop():对应DROP策略
  • onBackpressureLatest():对应LATEST策略

https://www.heqiangfly.com/2017/10/14/open-source-rxjava-guide-flowable/

Single

Single#create()

创建的方式和Observable很相似

1
2
3
4
public static <T> Single<T> create(SingleOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new SingleCreate<T>(source));
}

SingleOnSubscribe#subscribe()

subscribe的不同在于它的参数类型

1
void subscribe(@NonNull SingleEmitter<T> emitter) throws Exception;

SingleEmitter

它并没有onNext或onComplete方法。所以它只能发送一个事件或错误事件。

1
2
3
4
5
6
7
void onSuccess(@NonNull T t);

/**
* Signal an exception.
* @param t the exception, not null
*/
void onError(@NonNull Throwable t);

使用场景

有时候需要发射的数据并不是数据流的形式,而只是一条单一的数据,比如发起一次网络请求。在这种情况下,如果我们使用 Observable,onComplete会紧跟着onNext被调用,为什么不能将这连个方法合二为一呢。如果再这种情况下我们再使用Observable 就显得有点大材小用,因为我们不需要处理onNext() 的数据。于是,为了满足这种单一数据的使用场景,便出现了 Single。

Completable

没有数据处理的方法,只有通知相关的方法。它只发射一条完成通知,或者一条异常通知,不能发射数据,其中完成通知与异常通知只能发射一个

Completable#create()

1
2
3
4
public static Completable create(CompletableOnSubscribe source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new CompletableCreate(source));
}

CompletableOnSubscribe#subscribe()

1
void subscribe(@NonNull CompletableEmitter var1) throws Exception;

CompletableEmitter

没有onNext和onSuccess方法

1
2
3
void onComplete();

void onError(@NonNull Throwable var1);

使用场景

我们向服务器发起一个更新数据的请求,服务器更新数据以后是返回的是更新的结果。这个时候我们或许只是关心的是服务器更新数据是否成功,而不需要对数据进行处理,那么这个时候用 Completable 就可以

Maybe

思想

该流可以发出单个值,要么成功,要么失败

Maybe是一种特殊的Observable,它只能发出0或1个项目,并且在失败时报告错误。

在这方面,它就像是Single和Completable的结合。所有这些简化类型,包括Maybe,提供了一个Flowable操作符的子集。这意味着我们可以像使用Flowable一样使用Maybe,只要操作对0或1个条目有意义。

因为它只能释放一个值,所以它不像Flowable那样支持反压处理

Maybe#create()

1
2
3
4
public static <T> Maybe<T> create(MaybeOnSubscribe<T> onSubscribe) {
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
return RxJavaPlugins.onAssembly(new MaybeCreate(onSubscribe));
}

MaybeOnSubscribe#subscribe()

1
void subscribe(@NonNull MaybeEmitter<T> var1) throws Exception;

MaybeEmitter

没有onNext方法

1
2
3
4
5
void onSuccess(@NonNull T var1);

void onError(@NonNull Throwable var1);

void onComplete();

使用方法

1
2
3
4
5
Maybe.just(1)
.map(x -> x + 7)
.filter(x -> x > 0)
.test()
.assertResult(8);

onSuccess、onError和onComplete

1
2
3
4
5
6
Maybe.just(1)
.subscribe(
x -> System.out.print("Emitted item: " + x),
ex -> System.out.println("Error: " + ex.getMessage()),
() -> System.out.println("Completed. No items.")
);

上面的代码将print: Emitted item: 1

因为source发出的是onSuccess信号

若是Maybe.empty().subscribe(…)

将 print “Completed. No items.”

若是Maybe.error(new Exception(“error”)).subscribe(…)

将 print “Error: error”

这些事件对Maybe来说是相互排斥的。也就是说,**onComplete不会在onSuccess之后被调用**

这与Flowable略有不同,因为onComplete将在流完成时被调用,甚至可能在一些onNext调用之后。

Single没有像Maybe那样的onComplete信号

Completable缺少onSuccess,因为它只用于处理完成/失败的情况。

与Flowable

Maybe类型的另一个用例是将它与Flowable结合使用。firstElement()方法可以用来从Flowable中创建Maybe:

1
2
3
4
5
6
7
8
Flowable<String> visitors = ...
visitors
.skip(1000)
.firstElement()
.subscribe(
v -> System.out.println("1000th visitor: " + v + " won the prize"),
ex -> System.out.print("Error: " + ex.getMessage()),
() -> System.out.print("We need more marketing"));

使用场景

可发射一条单一的数据,以及发射一条完成通知,或者一条异常通知,其中完成通知和异常通知只能发射一个,发射数据只能在发射完成通知或者异常通知之前,否则发射数据无效。