【Android】RxJava之初始篇
關于RxJava
RxJava是ReactiveX推出在Java VM環境下使用的異步操作庫。除了在Java環境ReactiveX也為其他編程語言推出Rx庫,例如Py、Js、Go等。網上有很多關于對RxJava的介紹和使用,在Android開發中也有很多項目使用RxJava。那為什么還要使用RxJava呢,Android開發也有提供異步操作的方法供開發者使用,我想應該是RxJava比起Handle、AsyncTask簡潔優雅。
- 1 RxJava采用鏈式調用,在程序邏輯上清晰簡潔
- 2 采用擴展式觀察者設計模式
關于觀察者模式以及其他RxJava的介紹這個就不做重復,下面內容主要圍繞RxJava和RxAndroid使用。對于RxJava官方文檔已經有詳細介紹,本節是以學習討論為主,如存在錯誤的地方希望大家可以指出。
被觀察者Observable
使用RxJava需要創建Observable,Observable用于發射數據。如下Observable的create方法需要傳入一個OnSubscribe,其繼承于Action1<Subscriber<? super T>>,Action中的Subscriber就是訂閱者。
- public static <T> Observable<T> create(OnSubscribe<T> f) {
- return new Observable<T>(RxJavaHooks.onCreate(f));
- }
另外create方法中需要實現接口call,返回subscriber對象。call方法實現在observable訂閱后要執行的事件流。subscriber.onNext發射data,subscriber.onCompleted可以表示發射事件結束。接著調用observable的subscribe方法實現被訂閱后執行的事件流。
- Observable<String> observable = Observable
- .create(new Observable.OnSubscribe<String>() {
- @Override
- public void call(Subscriber<? super String> subscriber) {
- subscriber.onNext("1");
- subscriber.onNext("2");
- subscriber.onNext("3");
- subscriber.onNext("4");
- subscriber.onNext("5");
- }
- });
- Subscriber<String> subscriber = new Subscriber<String>() {
- @Override
- public void onCompleted() {
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onNext(String s) {
- System.out.print(s + '\n');
- }
- };
- observable.subscribe(subscriber);
- //輸出結果 print:
- //1
- //2
- //3
- //4
- //5
Observable除了使用create方法創建外還可以使用from或者just快速設置發射的事件流,簡化了create的步驟。
- Observable<String> o = Observable.from("a", "b", "c");
- Observable<String> o = Observable.just("one object");
說好的異步操作
RxJava的線程由Schedulers調度者控制,通過它來控制具體操作在什么線程中進行。
- Schedulers.immediate() 在當前線程中執行
- Schedulers.newThread() 為每一個任務開辟線程執行
- Schedulers.computation() 計算任務運行的線程
- Schedulers.io() IO任務運行的線程....
- AndroidSchedulers.mainThread() Android 主線程運行
對于線程的控制主要由subscribeOn()和observeOn()兩個方法控制:
- subscribeOn 控制Observable.OnSubscribe所處的線程,等同于Observable create、just、from時所處的線程。
- observeOn 控制Subscriber的線程,也可以說是控制事件被執行時所在的線程。
- Observable
- .just(1,2,3)
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(new Subscriber<Integer>() {
- @Override
- public void onCompleted() {
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onNext(Integer integer) {
- System.out.print(integer + '\n');
- }
- });
- //輸出結果 print:
- //1
- //2
- //3
寫下上面的RxJava鏈式調用的代碼,有沒有覺得比以前使用的異步調用清爽許多,對處女座還說這很治愈!
操作符Operators
ReactiveX提供超級多的操作符,每個操作符都具有不同的功能,但目的都是在Observable和Subscribe之間變換和修改發射出去的事件流。這節介紹幾個比較常見簡單的操作符,之后有機會再寫一節操作符篇詳細說說每個操作符的作用。附上官方操作符文檔看看就知道有多少多了。
- Map()
- public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
- return create(new OnSubscribeMap<T, R>(this, func));
- }
首先先介紹一個操作符map,map實現Func1接口將T類型數據變換為R類型數據,返回R類型數據。例如傳入Integer類型的事件隊列,經過map加工之后以String類型返回。
- Observable
- .just(1,2,3)
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .map(new Func1<Integer, String>() {
- @Override
- public String call(Integer integer) {
- return integer + "";
- }
- })
- .subscribe(new Subscriber<String>() {
- ......
- @Override
- public void onNext(String str) {
- System.out.print(str + '\n');
- }
- });
- //輸出結果 print:
- //1
- //2
- //3
- Filter()
- public final Observable<T> filter(Func1<? super T, Boolean> predicate) {
- return create(new OnSubscribeFilter<T>(this, predicate));
- }
filter和map一樣實現Func1接口不過它變換之后的類型為boolean,對發射的事件流進行篩選,當變換后的boolean值為true,訂閱者才能收到通過篩選的事件,反之該事件不被消費。例如事件流篩選要求當int值可被2整除才能繼續傳遞,所以最后訂閱者可消費的事件為2,4,6,8,10。
- Observable
- .just(1,2,3,4,5,6,7,8,9,10)
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .filter(new Func1<Integer, Boolean>() {
- @Override
- public Boolean call(Integer integer) {
- return integer % 2 == 0;
- }
- })
- .map(new Func1<Integer, String>() {
- @Override
- public String call(Integer integer) {
- return integer + "";
- }
- })
- .subscribe(new Subscriber<String>() {
- ......
- @Override
- public void onNext(String str) {
- System.out.print(str + '\n');
- Log.i("subscribe", str);
- }
- });
- //輸出結果 print:
- //2
- //3
- //4
- //6
- //8
- //10
- Skip()
- public final Observable<T> skip(int count) {
- return lift(new OperatorSkip<T>(count));
- }
skip操作符表示跳過前幾個事件從某一個事件開始發射事件,下標從0開始。
- Observable
- .just(1,2,3,4,5,6,7,8,9,10)
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .skip(3)
- .map(new Func1<Integer, String>() {
- @Override
- public String call(Integer integer) {
- return integer + "";
- }
- })
- .subscribe(new Subscriber<String>() {
- ......
- @Override
- public void onNext(String s) {
- System.out.print(s + '\n');
- Log.i("subscribe", s);
- }
- });
- //輸出結果 print:
- //4
- //5
- //6
- //7
- //8
- //9
- //10
- Range()
- public static Observable<Integer> range(int start, int count) {
- if (count < 0) {
- throw new IllegalArgumentException("Count can not be negative");
- }
- if (count == 0) {
- return Observable.empty();
- }
- if (start > Integer.MAX_VALUE - count + 1) {
- throw new IllegalArgumentException("start + count can not exceed Integer.MAX_VALUE");
- }
- if(count == 1) {
- return Observable.just(start);
- }
- return Observable.create(new OnSubscribeRange(start, start + (count - 1)));
- }

range操作符可以理解為just,from傳遞一個連續的int類型待發射數組,n為起始int值,m為Count。例如n = 1,m = 5 int數組就是{1,2,3,4,5}
結尾
這節先學習到這,算是對RxJava的初步認識和學習。其實運用RxJava主要還是依賴于對操作符的使用,前面所介紹的操作符屬于最最簡單基礎的,還有很多特別有用的操作符沒有介紹。之后再繼續介紹一些操作符。RxJava在Android開發中很受歡迎,就是因為它的強大,同時RxJava可以和Retrofit組合使用,更高效處理網絡請求回值。另外GitHub GoogleSample上的android-architecture也有使用RxJava框架的TODO項目,可以看看理解RxJava在項目中的實踐應用。