如何从 Stream 的角度思考 RxJava

这篇文章是对 RxJava 工程使用方面的一些总结和思考,不会涉及具体操作符解释、实现细节,更多是从 idea 出发来思考 Stream。当然因为思想是相同的,对于 RxPython 及其它语言的使用,几乎没有差别。

1. Stream 存在的问题

在谈论 Kotlin Coroutine 时,一般都会谈到一个问题:

有了 RxJava,为什么需要 Coroutine?

这个问题也可以换一个角度,RxJava 有什么缺陷?或者在哪些场合下不适合用 RxJava?

事实上,RxJava 在几乎任何地方都很合适很方便,上手之后开发效率也非常高,但是它不适合与 NonStream 打交道,面对一个不是 RxJava 的 Library,总需要特意转换一次。

有些人也因此抨击 RxJava,说它过于有侵略性。比如 Android 中基本上都用 RxJava + Retrofit 了,既然网路请求都是 RxJava,那么 UI 点击要不要用?页面生命周期要不要用?RxJava 是一个很强大很好使的工具,可以解决大部分问题,但是让人有一种手里有了锤子,看什么都是钉子的感觉,想要把一切都变成 Stream,不然总是会感觉不流畅。

起初,是没有 Stream 的,订阅之后,便有了 Stream。subscribe()dispose() 作为与 NonStream 环境交互的方式,因为是不同的时机触发的,也就需要在不同的地方分别调用。同一个概念,在 2 个地方分别调,就会有遗忘有不一致,所以经常需要引入 Library 保证这种一致性,或者自己创造新轮子。

2. Stream VS NonStream

Stream 和 NonStream 的很多操作是等效的:

  1. 执行普通的命令

    1foo.bar()
    2
    3Completable.fromAction { foo.bar() }
    
  2. 计算一个值

    1val v = foo.bar()
    2
    3Single.fromCallable { foo.bar() }
    
  3. 遍历

    1val foo = (0..10).toList()
    2
    3foo.forEach {  }
    4Observable.fromIterable(foo)
    

一般情况下,除非特殊的原因,应该优先使用 NonStream 的方式,因为适用性更强,不过具体还是要看场景。

2. 是否需要所有的值

大部分情况下,我们需要的是最新的值,相比 flatMap 优先使用 switchMap,另一点是 switchMap 会 dispose 之前过期的 emitter,而 flatMap 没有处理,这是很危险的:

1Observable.just(0, 1, 2)
2    .flatMapSingle {
3        Single.just(it)
4            .delay(1, TimeUnit.SECONDS)
5    }
6    .subscribe { println(it) }

上面的代码多运行几次,你能保证顺序永远是 0, 1, 2 吗?

这带来的问题是,上游先发射的数据,在 flatMap 之后,有可能更晚发射到下游,UI 可能会显示旧数据,大部分情况下我们只关心最新的值,过期的值不值得再处理。

3. 是否一定会有值

4. 是否关注值的顺序

上面已经提到了,flatMap 是不关心 emitter 处理之后的顺序的,为了保证顺序,需要使用 concatMap

1Observable.just(0, 1, 2)
2    .concatMapSingle {
3        Single.just(it)
4            .delay(1, TimeUnit.SECONDS)
5    }
6    .subscribe { println(it) }

这能保证每个 childStream 的处理顺序,但需要注意的是,每个 childStream 都应该是有限流,否则会一直 block 在这里。

5. 什么时候需要值

我们需要关心上游产生的值是用来做什么的,以及上游产生的值是否会随着时间而改变。

大部分情况下,Observable.just(foo) 是可以的,发送给下游的值就是创建 Stream 时刻的值。但是也有一些场合,我们在创建 Stream 的时候还没有合适的值,或者我们需要的是订阅之后的最新值,这时我们应该用有 lazy 效果的 Observable.fromCallable 或者使用 defer

6. 如何安排不同的 Source

7. 避免副作用

函数式编程的一个主要思想是,给定相同的变换规则,只要初始条件一样,就能保证一样的输出。

这里面潜在的条件是,函数变换中,不应该操作外部变量,大部分情况下是全局变量。如果一个 Stream 访问或修改了全局变量,一般称之为 Side Effect。

为了更符合这一条思想:

  1. 访问环境变量也应该作为 Stream 的一部分,为了在值变更的时候,能通知到 Stream 的下游
  2. 在 Stream 获取到数据需要有对应操作时,不要随便找 doOnNext 之类的地方触发,放在 subscribe 里。 doOnNext / doOnSuccess / doOnError 这些可以随便插入 Stream 里,并不是 Stream 结束的地方,而 subscribe 只会有一个,在这里对 Stream 结果进行操作。