Rxjava2 可谓是日常开发中的利器,特别是在异步任务中更能发挥作用。响应式编程以及流式 api 的良好支持,给予了更好的编码体验。越来越多开发者渐渐用起来了。学习 rxjava2 最好的地方无外乎官方文档,详细且完整。以下结合官方文档和我自己的理解以及例子,解释各个操作符的用法,给各位以及我自己作一篇参考。

怎么用 Rxjava2

要使用 RxJava,需要先创建 Observables(发出数据项),以各种方式转换这些 Observable 以获取所需要的精确数据项(通过使用 Observable 运算符),然后观察并响应这些需要的项目序列(通过实现观察者) 或者订阅者,然后将它们订阅到最终的变换后的 Observables)。

Creating Observables 创建操作符

just

通过获取预先存在的对象并在订阅时将该特定对象发布给下游使用者来构造反应类型。为方便起见,存在 2 到 9 个参数的重载,这些对象(具有相同的常见类型)将按指定的顺序发出。就像 From 类似,但请注意 From 将传入一个数组或一个 iterable 或类似的东西来取出要发出的项目,而 Just 只是简单地发出数组或者迭代器。

请注意,如果将 null 传递给 Just,它将返回一个 Observable,它将 null 作为项发出。不要错误地假设这将返回一个空的 Observable(一个根本不发出任何项目)。为此,需要使用 Empty 运算符。

just

    fun testOpJust() {
        val arr = arrayOf("mary", "tom", "ben", "lisa", "ken")
        Observable.fromArray(arr).filter { it.size > 3 }.map { it + "s" }.subscribe(System.out::println)
    val list = arrayListOf(<span class="hljs-string">"mary"</span>, <span class="hljs-string">"tom"</span>, <span class="hljs-string">"ben"</span>, <span class="hljs-string">"lisa"</span>, <span class="hljs-string">"ken"</span>)
    Observable.just(list).forEach { it -&gt; System.out.println(it + <span class="hljs-string">"s"</span>) }

    list.stream().filter { it -&gt; it.length &gt; 3 }.map { <span class="hljs-string">"<span class="hljs-variable">$it</span> s"</span> }.forEach(System.out::println)
}
复制代码

from

根据预先存在的源或生成器类型构造序列。当使用 Observable 时,如果使用的所有数据都可以表示为 Observables,而不是 Observables 和其他类型的混合,则可以更方便。这允许使用一组运算符来控制数据流的整个生命周期。例如,Iterables 可以被认为是一种的 Observable; 作为一种始终只发出单一项目的 Observable。通过将这些对象显式转换为 Observable,可以将它们作为对等体与其他 Observable 进行交互。因此,大多数 ReactiveX 实现都具有允许将特定于语言的对象和数据结构转换为 Observable 的方法。

注意:这些静态方法使用后缀命名约定(即,在方法名称中重复参数类型)以避免重载解析模糊。

from

fromIterable

从 java.lang.Iterable 源(例如 Lists,Sets 或 Collections 或 custom Iterables)发出信号,然后完成序列。

可用于 Flowable ,Observable

fromArray

发信号通知给定数组的元素,然后完成序列。 可用于 Flowable,Observable

注意:RxJava 不支持原始数组,只支持(通用)引用数组。

fun testOpFrom(){
        val list = arrayListOf<Int>(1,2,3,4,5,6)
        Observable.fromIterable(list).subscribe(System.out::println)
    Observable.fromArray(1,2,3,4,5,6).subscribe(System.out::println)

}
复制代码
fromCallable

当消费者订阅时,调用给定的 java.util.concurrent.Callable 并将其返回值(或抛出的异常)转发给该使用者。

可用于:Observable,Flowable,Maybe,Single,Completable

备注:在 Completable 中,忽略实际返回值,并且 Completable 完成。

       Observable.fromCallable<String> {
            "hello"
        }.subscribe(System.out::println)
    Completable.fromCallable{
        <span class="hljs-string">"complatable from callable"</span>
    }.subscribe {
        System.out.println(<span class="hljs-string">"complete"</span>)
    }
复制代码

fromAction

当消费者订阅时,调用给定的 io.reactivex.function.Action 并且消费者完成或接收 Action 抛出的异常。

可用于: Maybe,Completable

   Maybe.fromAction<String>{
            System.out.println("maybe from action")
        }.subscribe(System.out::println)
复制代码

以下标星先不多做解释,用得不多

*fromRunnable

*fromFuture

*from{reactive type}

将另一种反应类型包裹或转换为目标反应类型。具有以下签名模式的各种反应类型中提供以下组合:targetType.from {sourceType}()

* 注意:并非所有可能的转换都是通过 from {reactive type} 方法系列实现的。查看 to {reactive type} 方法系列以获得进一步的转换可能性。

注意:fromAction 和 fromRunnable 之间的区别在于 Action 接口允许抛出已受检的异常,而 java.lang.Runnable 则不然。

error

可用于 Observable,Flowable,Maybe,Single,Completable

通过 java.util.concurrent.Callable 向消费者发出预先存在或生成的错误信号。

  fun testOpError(){
        Observable.error<Throwable>(IOException(""))
                .subscribe({
                    System.out.print("不会打印吧")
                },{
                    it.printStackTrace()
                },{
                    System.out.println("也不会打印")
                })
    }
复制代码

一个典型的用例是使用 onErrorResumeNext 有条件地映射或抑制链中的异常:

   /**
     * 抑制链上发生的异常
     */
    @Test
    fun testOpOnErrorResumeNext() {
        val observable = Observable.fromCallable {
            if (Math.random() < 0.5f) {
                throw IllegalArgumentException()
            }
            throw IOException()
        }
    observable.onErrorResumeNext(Function {
        <span class="hljs-keyword">if</span> (it is IllegalArgumentException) {
            Observable.empty()
        } <span class="hljs-keyword">else</span> {
            Observable.error(it)
        }
    }).subscribe({
        System.out.println(<span class="hljs-string">"nothing"</span>)
    },{
        it.printStackTrace()
    },{
        System.out.println(<span class="hljs-string">"empty"</span>)
    })
}
复制代码

这个 onErrorResumeNext 厉害了,可以说之前一直不太明白怎么很好的处理。通过此操作符可以抑制错误的传递,本来如果 subscribe 发生了错误会触发 onError 回调。事实上可能发生了错误,需要不处理或者抑制产生。在 onErrorResumeNext 的 function 参数中,可以根据错误类型返回处理流程。

  • empty 这种类型的源在订阅后立即表示完成。 可用于 Observable,Flowable,Maybe,Single,Completable

示例可见 onErrorResumeNext 的例子

empty

empty 发送直接表示完成,就是订阅者直接调用 onComplete 回调。onNext 不会执行

  • never 这种类型的源不会发出任何 onNext,onSuccess,onError 或 onComplete 的信号。这种类型的反应源可用于测试或“禁用”组合子操作符中的某些源。

可用于 Observable,Flowable,Maybe,Single,Completable

不会对订阅者的任何回调进行调用。禁用也可理解,比如发送了错误,都不往下执行

  • interval 定期生成无限的,不断增加的数字(Long 类型)。intervalRange 变体生成有限数量的此类数字。

可用于 Observable,Flowable

interval

    fun testOpInterval(){
        Observable.interval(1,TimeUnit.SECONDS)
                .onErrorResumeNext(Function { 
                    Observable.error(it)
                })
                .subscribe({
                    if (it.rem(5) == 0L) {
                        System.out.println("tick")
                    } else {
                        System.out.println("tock")
                    }
                },{
                    it.printStackTrace()
                },{
                    System.out.println("interval complete")
                })
    }
复制代码
  • Timer 运算符创建一个 Observable,在指定的一段时间后发出一个特定项。 Timer

也就是说在给定的时间之后发送事件

  • range 为每个消费者生成一系列值。range()方法生成 Integers,rangeLong()生成 Longs。Range 运算符按顺序发出一系列顺序整数,您可以在其中选择范围的起点及其长度。

可用于 Observable,Flowable

range

    fun testOpRange(){
        val s = "test range operation now"
        Observable.range(0,s.length- 3)
                .map { "${s[it]} in range"}
                .subscribe {
                    System.out.println(it)
                }
    }
复制代码

发出一系列值,参数为起点,和长度。

  • generate 创建一个冷,同步和有状态的值生成器。

可用于 Observable,Flowable

create

   @Test
    fun testOpGenerate(){
        val start = 1
        val increaseValue = 2
        Observable.generate<Int,Int>(Callable<Int> {
            start
        }, BiFunction<Int, Emitter<Int>,Int> {
            t1, t2 ->
            t2.onNext(t1 + increaseValue)
            t1 + increaseValue
        }).subscribe {
            System.out.println("generate value : $it")
        }
    }
复制代码

不太明白干啥的,具体应用场景。只是一直不间断的产生值

Filtering Observables 过滤 Observable

过滤操作是非常常用且重要的,而且相关的操作符也很多

Debounce

可用于 Observable,Flowable

删除响应源发出的项目,在给定的超时值到期之前,这些项目后面跟着更新的项目。计时器重置每次发射。此运算符会跟踪最近发出的项目,并且仅在有足够的时间过去而没有源发出任何其他项目时才会发出此项目。

按照我得理解就是 debounde 传入了超时值,在该时间之内如果多次发射,取离超时值最近得值。既然又超时那么也应该又开始时间,开始时间就是一组发射最开始值得时间,这一组发射得值的时的差是在 debounce 超时时间之内。

// Diagram:
// -A--------------B----C-D-------------------E-|---->
//  a---------1s
//                 b---------1s
//                      c---------1s
//                        d---------1s
//                                            e-|---->
// -----------A---------------------D-----------E-|-->

fun testOpDebounce(){
Observable.create<String>{
it.onNext(“A”)
Thread.sleep(1_500)
it.onNext(“B”)
Thread.sleep(500)
it.onNext(“C”)
Thread.sleep(250)
it.onNext(“D”)
Thread.sleep(2_000)
it.onNext(“E”)
}.debounce(1,TimeUnit.SECONDS)
.subscribe(System.out::println)
}

复制代码

distinct

可用于 Observable Flowable 通过仅发出与先前项目相比不同的项目来过滤反应源。可以指定 io.reactivex.functions.Function,将源发出的每个项目映射到一个新值中,该值将用于与先前的映射值进行比较。Distinct 运算符通过仅允许尚未发出的项目来过滤 Observable。在一些实现中,存在允许调整两个项被视为“不同”的标准的变体。在一些实施例中,存在操作符的变体,其仅将项目与其前一个项目进行比较以获得更精确的比较,从而仅过滤连续的重复项目,序列中的项目。

    fun testOpDistinct(){
        Observable.fromArray(1,2,3,3,4,5)
                .distinct()
                .subscribe(System.out::println)
    // 用来过滤序列中一组值前后是否相同得值
    Observable.fromArray(1,1,2,3,2)
            .distinct { <span class="hljs-string">"呵呵"</span> }
            .subscribe(System.out::println)
}
复制代码

重载的方法,传入 keySelectro , 作用是对每个元素应用方法得到得新得值,再决定怎么去重

distinctUntilChanged

可用于 Observable Flowable
通过仅发出与其前一个元素相比较不同的项目来过滤反应源。可以指定 io.reactivex.functions.Function,将源发出的每个项目映射到一个新值中,该值将用于与先前的映射值进行比较。或者,可以指定 io.reactivex.functions.BiPredicate 作为比较器函数来比较前一个。

        Observable.fromArray(1,2,3,3,4,5)
//                .distinctUntilChanged()
                .distinctUntilChanged { t1, t2 ->
                    t1 == t2
                }
                .subscribe(System.out::println)
复制代码

可以说是 distinct 的加强版,多了一个可以传入比较器的重载方法

elementAt

课用于 Flowable,Observable 在来自反应源的一系列发射的数据项中,以指定的从零开始的索引发出单个项目。如果指定的索引不在序列中,则可以指定将发出的默认项。

简单说就是按照发出项的次序获取指定的位置的元素

     Observable.fromArray(1,2,3,3,4,5)
                .elementAt(2)
                .subscribe(System.out::println)
复制代码

elementAtOrError

filter

可用于 Observable,Flowable,Maybe,Single 通过仅发出满足指定函数的项来过滤由反应源发出的项。

过滤偶数
 Observable.fromArray(1,2,3,3,4,5)
                .filter {
                    it.rem(2) == 0
                }
                .subscribe(System.out::println)}
复制代码

first

可用于 Flowable,Observable 仅发出反应源发出的第一个项目,或者如果源完成而不发出项目则发出给定的默认项目。这与 firstElement 的不同之处在于此运算符返回 Single,而 firstElement 返回 Maybe。

   Observable.fromArray(1,2,3,3,4,5)
                .first(-1)
                .subscribe(Consumer<Int> {
                    System.out.println("onNext :$it")
                })
                  Observable.fromArray(1,2,3,3,4,5)
            .firstElement()
            .subscribe {
                System.out.println(<span class="hljs-string">"onNext :<span class="hljs-variable">$it</span>"</span>)
            }
复制代码

firstOrError

仅发出响应源发出的第一个项目,或者如果源完成而不发出项目则发出 java.util.NoSuchElementException 信号。

ignoreElement

可用于 Maybe Single 忽略 Single 或 Maybe 源发出的单个项目,并返回一个 Completable,它仅从源中发出错误或完成事件的信号。

ignoreElement

 Maybe.timer(1L,TimeUnit.SECONDS)
                .ignoreElement()
                .doOnComplete {
                    System.out.println("done")
                }
                .blockingAwait()
复制代码

ignoreElements

忽略 Single 或 Maybe 源发出的单个项目,并返回一个 Completable,它仅从源中发出错误或完成事件的信号。

 Observable.timer(1L,TimeUnit.SECONDS)
                .ignoreElements()
                .doOnComplete {
                    System.out.println("completed")
                }
                .blockingAwait()
复制代码

last

可用于 Observable,Flowable

仅发出反应源发出的最后一个项目,或者如果源完成而不发出项目则发出给定的默认项目。这与 lastElement 的不同之处在于此运算符返回 Single,而 lastElement 返回 Maybe。

   Observable.fromArray(1,2,3,3,4,5)
                .last(-1)
                .subscribe(Consumer<Int>{
                    System.out.println("last $it")
                })
复制代码

lastElement

  Observable.fromArray(1,2,3,3,4,5)
                .lastElement()
                .subscribe(Consumer<Int>{
                    System.out.println("last $it")
                })
复制代码

lastOnError

仅发出响应源发出的最后一项,或者如果源完成而不发出项,则发出 java.util.NoSuchElementException 信号。

ofType

可用于 Flowable,Observable,Maybe 通过仅发出指定类型的项目来过滤反应源发出的项目。

 Observable.fromArray(1,2.1f,3,3,4,5)
                .ofType(Int::class.java)
                .subscribe(Consumer<Int>{
                    System.out.println("last $it")
                })
复制代码

sample

可用于 Observable Flowable 通过仅在周期性时间间隔内发出最近发出的项目来过滤反应源发出的项目。


 Observable.create<String> {
            it.onNext("A")
            Thread.sleep(1_000)
        it.onNext(<span class="hljs-string">"B"</span>)
        Thread.sleep(300)

        it.onNext(<span class="hljs-string">"C"</span>)
        Thread.sleep(700)

        it.onNext(<span class="hljs-string">"D"</span>)
        it.onComplete()
    }.sample(1,TimeUnit.SECONDS)
            .blockingSubscribe(System.out::println)
复制代码

skip

删除响应源发出的前 n 个项目,并发出剩余项目。您可以通过使用 Skip 运算符修改 Observable 来忽略 Observable 发出的前 n 个项目,并仅参加之后的项目。

 Observable.fromArray("hehe",2.1f,3,3,4,5)
//                .ofType(String::class.java)
                .skip(3)
                .subscribe {
                    System.out.println(it)
                }
复制代码

skipLast

丢弃反应源发出的最后 n 个项目,并发出剩余的项目。

take

可用于 Flowable Observable 仅发出反应源发出的前 n 项。

     Observable.fromArray("hehe",2.1f,3,3,4,5)
                .take(2)
                .subscribe(System.out::println)
复制代码

takeLast

可用于 Flowable Observable 仅发出反应源发出的最后 n 个项目。

throttleFirst

可用于 Flowable Observable

跟 debounce 有些相似,是取时间范围内第一个,在点击事件过滤很常用

在指定持续时间的连续时间窗口期间仅发出由反应源发出的第一个项目。

 Observable.create<String> {
            it.onNext("A")
            Thread.sleep(300)
        it.onNext(<span class="hljs-string">"B"</span>)
        Thread.sleep(400)
    }.throttleFirst(1,TimeUnit.SECONDS)
            .subscribe(System.out::println)
复制代码

throttleLast

可用于 Observable,Flowable 在指定持续时间的连续时间期间仅发出由反应源发出的最后一个项目。跟 throttleFirst 相反,取最后一个值

throttleWithTimeout

跟 debounce 的别名

    public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
        return debounce(timeout, unit);
    }
复制代码

timeout

从 Observable 或 Flowable 源发出项目,但如果在从上一项开始的指定超时持续时间内未发出下一项,则以 java.util.concurrent.TimeoutException 终止。对于 Maybe,Single 和 Completable,指定的超时持续时间指定等待成功或完成事件到达的最长时间。如果 Maybe,Single 或 Completable 在给定时间内没有完成,将发出 java.util.concurrent.TimeoutException。

   Observable.create<String>{
            it.onNext("A")
            Thread.sleep(600)
        it.onNext(<span class="hljs-string">"B"</span>)
        Thread.sleep(1_500)

        it.onNext(<span class="hljs-string">"C"</span>)
        Thread.sleep(500)
    }.subscribeOn(Schedulers.io())
            .subscribe({
                System.out.println(it)
            },{
                it.printStackTrace()
            })
复制代码

捕获处理

一下为 Kotlin 编写的代码,可以看到在发生错误的情况下,通过 onError() 抛出了错误,并且需要在订阅者,第二个参数传入,处理错误的回调。

    fun testErrorHandle() {
        Observable.create<String> {
            it.onNext("start")
            Thread {
                try {
                    System.out.println("start open ...")
                    it.onNext("start open ...")
                    val stream = URL("https://www.baidu.com").openStream()
                    System.out.println("after url ...")
                    it.onNext("after url")
                    val br = stream.bufferedReader()
                    if (!it.isDisposed) {
                        var text = br.readText()
                        it.onNext(text)
                    }
                    stream.close()
                    br.close()
                    it.onNext("after open ...")
                    if (!it.isDisposed) {
                        it.onComplete()
                    }
                }catch (e : java.lang.Exception) {
                    System.out.println(e)
                    e.printStackTrace()
                    it.onError(e)
                }
            }.start()
        }.subscribe(System.out::println) {
            it.printStackTrace()
            System.out.println("what the fuck")
        }
    }
复制代码

Observable 通常不会抛出异常。相反,它会通过使用 onError 通知终止 Observable 序列来通知任何观察者发生了不可恢复的错误。

这有一些例外。例如,如果 onError()调用本身失败,Observable 将不会尝试通过再次调用 onError 来通知观察者,但会抛出 RuntimeException,OnErrorFailedException 或 OnErrorNotImplementedException。

从 onError 通知中恢复的技术

因此,不是捕获异常,而是观察者或操作者应该更通常地响应异常的 onError 通知。还有各种 Observable 运算符可用于对来自 Observable 的 onError 通知作出反应或从中恢复。例如,可以使用运算符:

  1. 吞下错误并切换到备份 Observable 以继续序列
  2. 吞下错误并发出默认项
  3. 吞下错误并立即尝试重启失败的 Observable
  4. 吞下错误并尝试在一些退避间隔后重新启动失败的 Observable

可以使用错误处理运算符中描述的运算符来实现这些策略。

吞下的意思,应该是不处理异常

RxJava 特定的异常以及如何处理它们

CompositeException 这表明发生了多个异常。可以使用异常的 getExceptions() 方法来检索构成组合的各个异常。

MissingBackpressureException 这表示试图将过多发出数据项应用于它的 Observable。有关背压 (github.com/ReactiveX/R…) 的 Observable 的解决方法,请参阅 Backpressure。

OnErrorFailedException 这表明 Observable 试图调用其观察者的 onError()方法,但该方法本身引发了异常。

OnErrorNotImplementedException 这表明 Observable 试图调用其观察者的 onError()方法,但是没有这样的方法存在。可以通过修复 Observable 以使其不再达到错误条件,通过在观察者中实现 onError 处理程序,或通过使用本页其他地方描述的其中一个运算符到达观察者之前截获 onError 通知来消除此问题。。

OnErrorThrowable 观察者将这种类型的 throwable 传递给他们的观察者的 onError()处理程序。此变量的 Throwable 包含有关错误的更多信息以及错误发生时系统的 Observable 特定状态,而不是标准 Throwable。

参考资料

官网文档

  • Android

    开放手机联盟(一个由 30 多家科技公司和手机公司组成的团体)已开发出 Android,Android 是第一个完整、开放、免费的手机平台。

    293 引用
感谢    赞同    分享    收藏    关注    反对    举报    ...