RxJava操作符系列二(下)
接上文
輸出日志信息
- call:2 ConcatMap RxNewThreadScheduler-5
- onNext: ConcatMap 101 ConcatMap
- call:2 ConcatMap RxNewThreadScheduler-6
- onNext: ConcatMap 102 ConcatMap
- call:2 ConcatMap RxNewThreadScheduler-7
- onNext: ConcatMap 103 ConcatMap
- onCompleted: ConcatMap
通過該操作符和flatMap輸出的日志信息,很容易看出flatMap并沒有保證數據源的順序性,但是ConcatMap操作符保證了數據源的順序性。在應用中,如果你對數據的順序性有要求的話,就需要使用ConcatMap。若沒有要求,二者皆可使用。
SwitchMap
當原始Observable發射一個新的數據(Observable)時,它將取消訂閱并停止監視產生執之前那個數據的Observable,只監視當前這一個.
- Integer[] integers = {1, 2, 3};
- Observable.from(integers).switchMap(new Func1>() {
- @Override
- public Observable call(Integer integer) {
- Log.e(TAG, "call: SwitchMap" + Thread.currentThread().getName());
- //如果不通過subscribeOn(Schedulers.newThread())在在子線程模擬并發操作,所有數據源依然會全部輸出,也就是并發操作此操作符才有作用
- //若在此通過Thread。sleep()設置等待時間,則輸出信息會不一樣。相當于模擬并發程度
- return Observable.just((integer + 100) + "SwitchMap").subscribeOn(Schedulers.newThread());
- }
- }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber() {
- @Override
- public void onCompleted() {
- Log.e(TAG, "onCompleted: SwitchMap");
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError: SwitchMap");
- }
- @Override
- public void onNext(String s) {
- Log.e(TAG, "onNext: SwitchMap "+s);
- }
- });
輸出日志信息
- call: SwitchMapmain
- call: SwitchMapmain
- call: SwitchMapmain
- onNext: SwitchMap 106SwitchMap
- onCompleted: SwitchMap
當數據源較多時,并不一定是只輸出***一項數據,有可能輸出幾項數據,也可能是全部。
GroupBy
看到這個詞你就應該想到了這個操作符的作用,就是你理解的含義,他將數據源按照你的約定進行分組。我們通過groupBy實行將1到10的數據進行就劃分,代碼如下
- Observable.range(1, 10).groupBy(new Func1() {
- @Override
- public Boolean call(Integer integer) {
- return integer % 2 == 0;
- }
- }).subscribe(new Subscriber>() {
- @Override
- public void onCompleted() {
- Log.e(TAG, "onCompleted:1 ");
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError:1 ");
- }
- @Override
- public void onNext(GroupedObservable booleanIntegerGroupedObservable) {
- booleanIntegerGroupedObservable.toList().subscribe(new Subscriber>() {
- @Override
- public void onCompleted() {
- Log.e(TAG, "onCompleted:2 " );
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError:2 ");
- }
- @Override
- public void onNext(List integers) {
- Log.e(TAG, "onNext:2 "+integers);
- }
- });
- }
- });
輸出日志信息
- onNext:2 [1, 3, 5, 7, 9]
- onCompleted:2
- onNext:2 [2, 4, 6, 8, 10]
- onCompleted:2
- onCompleted:1
在上面代碼中booleanIntegerGroupedObservable變量有一個getKey()方法,該方法返回的是分組的key,他的值就是groupBy方法call回調所用函數的值,在上面也就是integer % 2 == 0的值,及true和false。有幾個分組也是有此值決定的。
Scan
操作符對原始Observable發射的***項數據應用一個函數,然后將那個函數的結果作為自己的***項數據發射。它將函數的結果同第二項數據一起填充給這個函數來產生它自己的第二項數據。它持續進行這個過程來產生剩余的數據序列。
例如計算1+2+3+4的和
- Observable.range(1,4).scan(new Func2() {
- @Override
- public Integer call(Integer integer, Integer integer2) {
- Log.e(TAG, "call: integer:"+integer+" integer2 "+integer2);
- return integer+integer2;
- }
- }).subscribe(new Subscriber() {
- @Override
- public void onCompleted() {
- Log.e(TAG, "onCompleted: ");
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError: " );
- }
- @Override
- public void onNext(Integer integer) {
- Log.e(TAG, "onNext: "+integer );
- }
- });
輸出日志信息
- onNext: 1
- call: integer:1 integer2 2
- onNext: 3
- call: integer:3 integer2 3
- onNext: 6
- call: integer:6 integer2 4
- onNext: 10
- onCompleted:
對于scan有一個重載方法,可以設置一個初始值,如上面代碼,初始值設置為10,只需將scan加個參數scan(10,new Func2)。
Buffer
操作符將一個Observable變換為另一個,原來的Observable正常發射數據,變換產生的Observable發射這些數據的緩存集合,如果原來的Observable發射了一個onError通知,Buffer會立即傳遞這個通知,而不是首先發射緩存的數據,即使在這之前緩存中包含了原始Observable發射的數據。
示例代碼
- Observable.range(10, 6).buffer(2).subscribe(new Subscriber>() {
- @Override
- public void onCompleted() {
- Log.e(TAG, "onCompleted: ");
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError: ");
- }
- @Override
- public void onNext(List integers) {
- Log.e(TAG, "onNext: " + integers);
- }
- });
輸出日志信息
- onNext: [10, 11]
- onNext: [12, 13]
- onNext: [14, 15]
- onCompleted:
上面一次性訂閱兩個數據,如果設置參數為6,就一次性訂閱。buffer的另一重載方法buffer(count, skip)從原始Observable的***項數據開始創建新的緩存(長度count),此后每當收到skip項數據,用count項數據填充緩存:開頭的一項和后續的count-1項,它以列表(List)的形式發射緩存,取決于count和skip的值,這些緩存可能會有重疊部分(比如skip count時)。具體執行結果,你可以設置不同的skip和count觀察輸出日志,查看執行結果及流程。
Window
Window和Buffer類似,但不是發射來自原始Observable的數據包,它發射的是Observables,這些Observables中的每一個都發射原始Observable數據的一個子集,***發射一個onCompleted通知。
- Observable.range(10, 6).window(2).subscribe(new Subscriber>() {
- @Override
- public void onCompleted() {
- Log.e(TAG, "onCompleted1: ");
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError1: ");
- }
- @Override
- public void onNext(Observable integerObservable) {
- Log.e(TAG, "onNext1: ");
- tv1.append("\n");
- integerObservable.subscribe(new Subscriber() {
- @Override
- public void onCompleted() {
- Log.e(TAG, "onCompleted2: ");
- }
- @Override
- public void onError(Throwable e) {
- Log.e(TAG, "onError2: ");
- }
- @Override
- public void onNext(Integer integer) {
- Log.e(TAG, "onNext2: "+integer);
- }
- });
- }
- });
輸出日志信息
- onNext2: 10
- onNext2: 11
- onCompleted2:
- onNext2: 12
- onNext2: 13
- onCompleted2:
- onNext2: 14
- onNext2: 15
- onCompleted2:
- onCompleted1:
window和buffer一樣也有不同的重載方法。這兩個操作符相對其他操作符不太容易理解,可以去RxJava GitHub理解,里面有圖示解析。當然***的理解方式就是通過更改變量的值,去觀察輸出的日志信息。
好了,這篇文章就介紹到這里。若文中有錯誤的地方,歡迎指正。謝謝。