如何从 Stream 的角度思考 RxJava
这篇文章是对 RxJava 工程使用方面的一些总结和思考,不会涉及具体操作符解释、实现细节,更多是从 idea 出发来思考 Stream。当然因为思想是相同的,对于 RxPython 及其它语言的使用,几乎没有差别。
1. Stream 存在的问题
在谈论 Kotlin Coroutine 时,一般都会谈到一个问题:
有了 RxJava,为什么需要 Coroutine?
这个问题也可以换一个角度,RxJava 有什么缺陷?或者在哪些场合下不适合用 RxJava?
事实上,RxJava 在几乎任何地方都很合适很方便,上手之后开发效率也非常高,但是它不适合与 NonStream 打交道,面对一个不是 RxJava 的 Library,总需要特意转换一次。
有些人也因此抨击 RxJava,说它过于有侵略性。比如 Android 中基本上都用 RxJava + Retrofit 了,既然网路请求都是 RxJava,那么 UI 点击要不要用?页面生命周期要不要用?RxJava 是一个很强大很好使的工具,可以解决大部分问题,但是让人有一种手里有了锤子,看什么都是钉子的感觉,想要把一切都变成 Stream,不然总是会感觉不流畅。
起初,是没有 Stream 的,订阅之后,便有了 Stream。subscribe()
与 dispose()
作为与 NonStream 环境交互的方式,因为是不同的时机触发的,也就需要在不同的地方分别调用。同一个概念,在 2 个地方分别调,就会有遗忘有不一致,所以经常需要引入 Library 保证这种一致性,或者自己创造新轮子。
2. Stream VS NonStream
Stream 和 NonStream 的很多操作是等效的:
-
执行普通的命令
1foo.bar() 2 3Completable.fromAction { foo.bar() }
-
计算一个值
1val v = foo.bar() 2 3Single.fromCallable { foo.bar() }
-
遍历
1val foo = (0..10).toList() 2 3foo.forEach { } 4Observable.fromIterable(foo)
一般情况下,除非特殊的原因,应该优先使用 NonStream 的方式,因为适用性更强,不过具体还是要看场景。
-
blockingFirst() / blockingGet()
在 UT 里面随便用,但是正式代码里是不应该出现的,尤其是以 blocking 的方式使用 Stream 很容易造成互相 block。 -
在 Stream 里是不应该用 for-loop 的,因为一旦 for-loop 执行起来,Stream 没办法终止它执行,即使 Stream 已经被 dispose 了。当然其实 App 里数据都不会太大,不会执行太久,主要是这种方式不符合 Stream 的思想。
倾向于用操作符
fromIterable / fromArray
这些。重要的是,我们想避免 for-loop 的这种思想,即使用
Iterable<T>.any { }
这种写法本质还是一样的 for-loop。 -
但并不是永远倾向于 Stream 的遍历,在一个没有 Stream 的场景下,没必要硬生生地
Observable.fromIterable().map {}.toList()
,看起来很奇怪,但是我还真的见过这样写的。重要的是,我们希望避免在 Stream 里 for-loop,如果没有 Stream 就无所谓了,更何况 Kotlin 提供的语法糖不也挺香的?
-
Chain Stream
一个 Stream 的 subscribe 方法,已经不是 Stream 了,不要在 subscribe 里订阅另一个 Stream,用
flatMapXxx / switchMapXxx
chain 起来。
2. 是否需要所有的值
大部分情况下,我们需要的是最新的值,相比 flatMap
优先使用 switchMap
,另一点是 switchMap
会 dispose 之前过期的 emitter,而 flatMap
没有处理,这是很危险的:
1Observable.just(0, 1, 2)
2 .flatMapSingle {
3 Single.just(it)
4 .delay(1, TimeUnit.SECONDS)
5 }
6 .subscribe { println(it) }
上面的代码多运行几次,你能保证顺序永远是 0, 1, 2 吗?
这带来的问题是,上游先发射的数据,在 flatMap 之后,有可能更晚发射到下游,UI 可能会显示旧数据,大部分情况下我们只关心最新的值,过期的值不值得再处理。
3. 是否一定会有值
-
选择正确的类型来表示数据
- 不在乎值使用
Completable
- 有可能有值使用
Maybe
- 只有单个值使用
Single
- 可能有多个值使用
Observable
HotObservable
根据是否需要默认值,选择BehaviorSubject / PublishSubject
- 不在乎值使用
-
除了声明时的类型,也应该时刻关注类型的转换
-
Maybe.toSingle()
应该考虑默认值的问题,优先使用
defaultIfEmpty
-
Maybe.flatMapSingle
和上面同理,使用
flatMapSingleElement
-
4. 是否关注值的顺序
上面已经提到了,flatMap
是不关心 emitter 处理之后的顺序的,为了保证顺序,需要使用 concatMap
1Observable.just(0, 1, 2)
2 .concatMapSingle {
3 Single.just(it)
4 .delay(1, TimeUnit.SECONDS)
5 }
6 .subscribe { println(it) }
这能保证每个 childStream 的处理顺序,但需要注意的是,每个 childStream 都应该是有限流,否则会一直 block 在这里。
5. 什么时候需要值
我们需要关心上游产生的值是用来做什么的,以及上游产生的值是否会随着时间而改变。
大部分情况下,Observable.just(foo)
是可以的,发送给下游的值就是创建 Stream 时刻的值。但是也有一些场合,我们在创建 Stream 的时候还没有合适的值,或者我们需要的是订阅之后的最新值,这时我们应该用有 lazy 效果的 Observable.fromCallable
或者使用 defer
。
6. 如何安排不同的 Source
-
每一次的变换,都会触发下游,所以我们应该把不变的值尽可能放在上游
1Observable.fromIterable((0..10).toList()) 2 .concatMapSingle { item -> 3 Single.just(42) 4 .map { item + it } 5 } 6 .subscribeBy() 7 8 9Single.just(42) 10 .flatMapObservable { base -> 11 Observable.fromIterable((0..10).toList()) 12 .map { it + base } 13 } 14 .subscribeBy()
对于不变的 Stream,尽量执行一次就可以了。
-
我们常常需要在代码的效率和可读性做选择,比如下面的 Stream
1Single.zip(Single.just(1), Single.just(2)) { t1, t2 -> t1 to t2 } 2 .flatMapObservable { (t1, t2) -> observeX(t1, t2) } 3 4Single.just(1) 5 .flatMapObservable { t1 -> 6 Single.just(2) 7 .flatMapObservable { observeX(t1, it) } 8 }
同样适用于
Observable.combineLatest
, 我们应该尽量减少对象的创建,比如避免了上面的 Pair。但是这也不是绝对的,因为这种写法其实也是一种嵌套,想象一下如果有 4 层,代码会完全变得很丑。所以一般如果是 3 个或者 3 个以上了,出于可读性的角度,倾向于把 Stream 写得平一点,用
zip
或者combineLatest
。
7. 避免副作用
函数式编程的一个主要思想是,给定相同的变换规则,只要初始条件一样,就能保证一样的输出。
这里面潜在的条件是,函数变换中,不应该操作外部变量,大部分情况下是全局变量。如果一个 Stream 访问或修改了全局变量,一般称之为 Side Effect。
为了更符合这一条思想:
- 访问环境变量也应该作为 Stream 的一部分,为了在值变更的时候,能通知到 Stream 的下游
- 在 Stream 获取到数据需要有对应操作时,不要随便找
doOnNext
之类的地方触发,放在subscribe
里。doOnNext / doOnSuccess / doOnError
这些可以随便插入 Stream 里,并不是 Stream 结束的地方,而subscribe
只会有一个,在这里对 Stream 结果进行操作。