RxJava的流控

FlowControl(流控)

简介

在RxJava中,遇到这样一种情况并不困难:Observable发出条目的速度比操作符或订阅者消费它们的速度要快。这就提出了一个问题,即如何处理这种不断增长的未消耗项目的积压。

例如,想象一下使用zip操作符将两个无限的observable压缩到一起,其中一个发出条目的频率是另一个的两倍。zip操作符的幼稚实现将不得不维护一个不断扩展的缓冲区,以保存由更快的Observable发出的项,最终与由更慢的Observable发出的项合并。这会导致RxJava占用大量的系统资源。

在RxJava中,您可以使用多种策略来实施流量控制和反压,以缓解快速生成的Observable遇到缓慢消耗的观察者时所造成的问题。本页面解释了其中一些策略,并向您展示了如何设计自己的Observable和Observable操作符来满足流量控制的请求

Hot and cold Observables, and multicasted Observables

cold Observable发射一个特定的项目序列,但当其Observer觉得方便时,可以开始发射该序列,并且以Observer希望的任何速率发射,而不会破坏序列的完整性。例如,如果您将静态Iterable转换为Observable,则该Observable将发出相同的项序列,无论稍后何时以何频率订阅或观察这些项。cold Observable的示例主要包括数据库查询、文件检索。

A hot Observable 在创建时开始生成要立即发射的项。Observer通常开始观察从序列中间的某个地方观察到的热点发出的项目序列,从在订阅建立之后可观察到的第一个项目开始。Observable以其自身的速度发射消息。对消息的接收情况取决于它的观察者能否跟上。A hot Observable示例主要包括鼠标和键盘事件、系统事件或股票价格

当a cold Observable是multicast时(当它被转换为ConnectableObservable并调用其connect()方法时),它实际上变为a hot Observable,并且出于反压和流量控制的目的,它应该被视为a hot Observable。

Cold Observables 是一种理想的模型,它很少出现上游流速远大于下游消费速度的问题。Hot Observables通常不能很好地处理反压的情况。

所以Hot Observables更适合用下文讲述的onBackpressureBuffer或onBackpressureDrop操作符、节流、缓冲或窗口等来处理反压问题。

Backpressure

Backpressure常被翻译为背压,但我觉得翻译成反压更加形象。

Backpressure可以理解为将消费者的压力反馈给生产者,这就是反压的含义。

使用一些操作符代替反压

应对Observable生产过剩问题的第一道防线是使用一些普通的Observable操作符集合,将发出的项的数量减少到一个更易于管理的数量。

本节的例子将展示如何使用这些操作符来处理一个数据突发式的Observable,就像下面的图中所示的那样:

img

通过微调这些操作符的参数,你可以确保一个慢速消耗的观察者不会被一个快速生成的Observable所淹没。

节流

简介

像sample()或throttleLast()、throttleFirst()和throttleWithTimeout()或debounce()这样的操作符允许你调节Observable发出条目的速率。

它不是限制发送的数量,而是对发送的内容进行选择性接收,没有被接收的则丢弃。

以下的bursty为自定义的Observable子类

所以这些操作符是用于限制Observable的发送,或者说限制对Observe的onNext方法的调用时机。

sample (or throttleLast)

使用方法:
1
Observable<Integer> burstySampled = bursty.sample(500, TimeUnit.MILLISECONDS);
思想

根据用户设置的时间间隔,隔一段时间去发送一次数据,所发送的数据为这个时间间隔内,最后发送的数据。所以无论这个时间间隔内产生了多少数据,观察者只接收到最后一个。

图示:

img

下图中,用黄色和蓝色高光表示的是时间的间隔。在这段时间间隔内发送的最后一个数据即为观察者收到的数据。

IMG_0660(20211202-213543).PNG

throttleFirst

使用方法
1
Observable<Integer> burstyThrottled = bursty.throttleFirst(500, TimeUnit.MILLISECONDS);
思想

与上面说的sample (or throttleLast)相反,它发送的是一段时间间隔内的第一个数据

图示

img

debounce (or throttleWithTimeout)

使用方法
1
Observable<Integer> burstyDebounced = bursty.debounce(10, TimeUnit.MILLISECONDS);
思想

只发出Observable中在指定的持续时间内后面没有其他item的那些消息

图示

img

图中粉色高亮的部分,为大于设置的时间间隔的一段时间,在这段时间前的最后一个消息,即为要发送的消息。也就是说,在一定的时间间隔内,将要发送的消息后面没有其他消息。

IMG_0664(20211204-211630).PNG

打包发送

简介

可以使用像buffer()或window()这样的操作符来从Observable中收集消息,然后将它们以集合(或Observable)的形式发出。然后,速度慢的使用者可以决定是只处理每个集合中的一个特定消息,还是处理这些消息的某些组合,还是根据需要安排对集合中的每个消息执行的工作。

buffer

使用方法
1
Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
思想

在有规律的时间间隔内,周期性地关闭并从突发的Observable中发送一个条目缓冲区的消息

打包后的消息是用List存储起来

图示

img

进一步使用
使用方法
1
2
3
4
5
6
7
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
思想

将buffer和之前提到的debounce结合。

首先用户设置一段时间间隔。

在一个缓冲区后面若间隔了设置的时间依然没有消息,则将这个缓冲区的消息一起发送。

图示

img

window

思想

将一段连续的消息打包成新的Observable发送。

一段消息的长度确定有两种方式:一段时间间隔内的所有消息,或者自行设置一段消息中共有多少消息。

时间间隔内的消息图示

在用户设置的时间间隔内的所有消息被打包成一个新的Observable进行发送

img

使用方法
1
Observable<Observable<Integer>> burstyWindowed = bursty.window(500, TimeUnit.MILLISECONDS);
自定义数量消息的图示

自定义一次打包多少消息。在达到了设置的个数时,将会打包成新的Observable进行发送。

img

使用方法
1
Observable<Observable<Integer>> burstyWindowed = bursty.window(5);

Callstack blocking

如果Observable、所有对它进行操作的操作符以及订阅它的观察者都在同一个线程中操作,这将通过调用Callstack blocking(堆栈阻塞)有效地建立一种形式的背压。

即整个调用链处于同一个线程,不能用常用的线程切换方法切换线程。这属于同步的调用。

所以,Observable发送的消息会被观察者按顺序接收,若观察者在接收、处理消息时过慢,便会阻塞后面的消息。

这种情况其实和RxJava的主要功能相违背,属于很少见的情况。

Subscriber如何建立响应式获取的反压策略

思想

当你用Subscriber订阅一个Observable时,你可以通过在Subscriber的onStart()方法中调用Subscriber.request(n)来请求响应式获取消息(n是你希望Observable在下一次request()调用之前发出的最大条目数)。

然后,在onNext()中处理这个消息(或这些消息)之后,您可以再次调用request()来指示Observable发出另一个消息(或多个消息)。

使用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
someObservable.subscribe(new Subscriber<t>() {
@Override
public void onStart() {
request(1);
}

@Override
public void onCompleted() {
// gracefully handle sequence-complete
}

@Override
public void onError(Throwable e) {
// gracefully handle error
}

@Override
public void onNext(t n) {
// do something with the emitted item "n"
// request another item:
request(1);
}
});

request(Long.MAX_VALUE),可以禁用响应式获取数据,并要求Observable以自己的速度发出条目。request(0)是一个合法调用,但没有效果。将小于0的值传递给request()将导致抛出异常。

Reactive pull backpressure isn’t magic

onBackpressureBuffer
思想

维护Observable的所有发射的缓冲区,并根据Subscribers生成的requests将数据发送给它们。

图示

img

图中虚线部分是原先缓冲区的消息,每次Subscribers进行request(n)时,都会将缓冲区的前n个消息发送给Subscribers

所以在进行了五次request(1)后,原先缓冲区中前五个红色的消息就被发送给Subscribers了。

IMG_0668(20211205-193751).PNG

这个操作符的一个实验版本(RxJava 1.0中不可用)允许您设置缓冲区的容量;如果缓冲区溢出,应用此运算符将导致生成的Observable终止并出现错误

onBackpressureDrop
思想

在收到request(n)请求后,将这个请求之后产生的n条消息发送给Subscribers。其余的不属于这个范围的消息都将会被丢弃。

图示

img

即将图中蓝色虚线之后的一条消息发送给Subscribers

IMG_0670(20211205-195733).PNG

onBackpressureLatest
思想

默认会缓存最新的n条数据,当接收到request(n)的时候,会把缓存的数据发送给Subscribers

图示

sample

onBackpressureBlock
思想

调用onBackpressureBlock(n)时传入的参数n,在Observable产生了n条未被Subscribers进行request的数据后,Observable所在的线程将被阻塞,直到Subscribersr进行request(x)的操作,此时Observable所在的线程将不被阻塞,Observable又可以生成x条数据。

简而言之,在Observable生成了n条未被消费的消息后,它所在的线程将被阻塞。

图示

img

由于这里onBackpressureBlock传入的参数是2,所以Observable在生成了2条未被消费的消息后,所在线程将被阻塞。

图中,刚开始生成了2个红色的1和2,所以Observable所在线程被block。直到Subscribers进行了request(1),获得了两个数据的第一个,即红色的1,所以Observable中只有红色的2,所在线程被unblock,之后又生产了一个橙色的1,现有红色的2和橙色的1,直到Subscribers进行了request(1)后进行如上重复操作。IMG_0676(20211205-205825).PNG

可以理解为堵住了接收者的接收入口

Flowable

Observable 与 Flowable

在RxJava的前一个版本中,只有一个基类用于处理反压和非反压的情况——Observable

而RxJava 2引入了Flowable来处理反压的情况。

Flowable的创建

简单的Flowable

我们可以像创建Observable一样,使用just()方法创建一个Flowable:

1
Flowable<Integer> integerFlowable = Flowable.just(1, 2, 3, 4);

尽管just()的使用非常简单,但从静态数据创建一个Flowable并不常见,它仅仅用于测试。

通过Observable创建Flowable

当我们有一个Observable时,我们可以很容易地使用toFlowable()方法将其转换为Flowable:

1
2
3
Observable<Integer> integerObservable = Observable.just(1, 2, 3);
Flowable<Integer> integerFlowable = integerObservable
.toFlowable(BackpressureStrategy.BUFFER);

这里的BackpressureStrategy将会在下文讲述

通过FlowableOnSubscribe创建Flowable

RxJava 2引入了一个接口FlowableOnSubscribe,它表示一个在用户订阅后开始发出事件的Flowable。

因此,所有客户端都将收到相同的事件集,这使得FlowableOnSubscribe反压更加安全。

当我们有了FlowableOnSubscribe,我们可以使用它来创建Flowable

1
2
3
4
FlowableOnSubscribe<Integer> flowableOnSubscribe
= flowable -> flowable.onNext(1);
Flowable<Integer> integerFlowable = Flowable
.create(flowableOnSubscribe, BackpressureStrategy.BUFFER);

其他的创建Flowable方法这里就不一一介绍了

Flowable的Backpressure策略

在RxJava 2.x中,Observable不再支持Backpressure,而是改用Flowable来专门支持Backpressure。上面提到的四种operator(onBackpressureBuffer、onBackpressureDrop、onBackpressureLatest、onBackpressureBlock)的前三种分别对应Flowable的三种Backpressure策略:

  • BackpressureStrategy.BUFFER
  • BackpressureStrategy.DROP
  • BackpressureStrategy.LATEST

BackpressureStrategy

BackpressureStrategy是一个枚举类型,它定义了Flowable的反压行为。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Represents the options for applying backpressure to a source sequence.
*/
public enum BackpressureStrategy {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
MISSING,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}

Buffer

思想

如果我们使用该反压策略,source将缓冲所有事件,直到订阅者可以使用它们。也就是缓冲所有onNext()的值,直到下游使用它。

使用方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void thenAllValuesAreBufferedAndReceived() {
List testList = IntStream.range(0, 100000)
.boxed()
.collect(Collectors.toList());

Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable
.toFlowable(BackpressureStrategy.BUFFER)
.observeOn(Schedulers.computation()).test();

testSubscriber.awaitTerminalEvent();

List<Integer> receivedInts = testSubscriber.getEvents()
.get(0)
.stream()
.mapToInt(object -> (int) object)
.boxed()
.collect(Collectors.toList());

assertEquals(testList, receivedInts);
}

它类似于在Flowable上调用onBackpressureBuffer()方法,但它不允许指定缓冲区大小或onOverflow。

Drop

思想

我们可以用以丢弃不能消费的onNext()值,而不是缓冲它们。

这类似于在Flowable上使用onBackpressureDrop():

使用方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void whenDropStrategyUsed_thenOnBackpressureDropped() {

Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable
.toFlowable(BackpressureStrategy.DROP)
.observeOn(Schedulers.computation())
.test();
testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents()
.get(0)
.stream()
.mapToInt(object -> (int) object)
.boxed()
.collect(Collectors.toList());

assertThat(receivedInts.size() < testList.size());
assertThat(!receivedInts.contains(100000));
}

Latest

思想

使将强制source仅保留最新的onNext()值。因此如果消费者不能跟上,将覆盖以前的任何值

使用方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void whenLatestStrategyUsed_thenTheLastElementReceived() {

Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(Schedulers.computation())
.test();

testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents()
.get(0)
.stream()
.mapToInt(object -> (int) object)
.boxed()
.collect(Collectors.toList());

assertThat(receivedInts.size() < testList.size());
assertThat(receivedInts.contains(100000));
}

BackpressureStrategy.LATEST和BackpressureStrategy.DROP看起来非常相似。
但是,BackPressureStragey.LATEST将覆盖订阅者无法处理的元素,并仅保留最新的元素。
BackpressureStragy.DROP将丢弃无法处理的元素。这意味着不一定会发射最新的元素。

Error

思想

当我们使用backpressureStragy.ERROR时,表示我们不希望出现反压。因此,如果消费者无法跟上source,则应抛出MissingBackpressureException

使用方法
1
2
3
4
5
6
7
8
9
10
public void whenErrorStrategyUsed_thenExceptionIsThrown() {
Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable
.toFlowable(BackpressureStrategy.ERROR)
.observeOn(Schedulers.computation())
.test();

subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class);
}

Missing

思想

source在不丢弃或缓冲的情况下发送消息。

在这种情况下,下游必须处理溢出

使用方法
1
2
3
4
5
6
7
8
9
public void whenMissingStrategyUsed_thenException() {
Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable
.toFlowable(BackpressureStrategy.MISSING)
.observeOn(Schedulers.computation())
.test();
subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class);
}

我们的测试中,我们正在为ERROR和MISSING策略排除MissingbackpressureException。因为当source的内部缓冲区溢出时,它们都会抛出此类异常。然而它们有不同的目的。

当我们根本不期望出现反压,并且希望源程序在发生反压时抛出异常时,我们应该使用ERROR策略。

如果我们不想在创建Flowable时指定默认行为,可以使用MISSING策略。我们之后会用反压运算符来给它的行为赋值。