我們學習WebFlux 前置知識
1.Backpressure
Backpressure 在國內被翻譯成背壓,這個翻譯在網上被很多人吐槽,我覺得大家的吐槽是有道理的,背壓單純從字面上確實看不出來有什么意思。所以松哥這里直接用英文 Backpressure 吧。
Backpressure 是一種現象:當數據流從上游生產者向下游消費者傳輸的過程中,上游生產速度大于下游消費速度,導致下游的 Buffer 溢出,這種現象就叫做 Backpressure。
換句話說,上游生產數據,生產完成后通過管道將數據傳到下游,下游消費數據,當下游消費速度小于上游數據生產速度時,數據在管道中積壓會對上游形成一個壓力,這就是 Backpressure,從這個角度來說,Backpressure 翻譯成反壓、回壓似乎更合理一些。
Backpressure 會出現在有 Buffer 上限的系統中,當出現 Buffer 溢出的時候,就會有 Backpressure,對于 Backpressure,它的應對措施只有一個:丟棄新事件。那么什么是 Buffer 溢出呢?例如我的服務器可以同時處理 2000 個用戶請求,那么我就把請求上限設置為 2000,這個 2000 就是我的 Buffer,當超出 2000 的時候,就產生了 Backpressure。
2.Flow API
JDK9 中推出了 Flow API,用以支持 Reactive Programming,即響應式編程。
在響應式編程中,會有一個數據發布者 Publisher 和數據訂閱者 Subscriber,Subscriber 接收 Publisher 發布的數據并進行消費,在 Subscriber 和 Publisher 之間還存在一個 Processor,類似于一個過濾器,可以對數據進行中間處理。
JDK9 中提供了 Flow API 用以支持響應式編程,另外 RxJava 和 Reactor 等框架也提供了相關的實現。
我們來看看 JDK9 中的 Flow 類:
非常簡潔,基本上就是按照 Reactive Programming 的設計來的:
Publisher
Publisher 為數據發布者,這是一個函數式接口,里邊只有一個方法,通過這個方法將數據發布出去,Publisher 的定義如下:
- @FunctionalInterface
- public static interface Publisher<T> {
- public void subscribe(Subscriber<? super T> subscriber);
- }
Subscriber
Subscriber 為數據訂閱者,這個里邊有四個方法,如下:
- public static interface Subscriber<T> {
- public void onSubscribe(Subscription subscription);
- public void onNext(T item);
- public void onError(Throwable throwable);
- public void onComplete();
- }
- onSubscribe:這個是訂閱成功的回調方法,用于初始化 Subscription,并且表明可以開始接收訂閱數據了。
- onNext:接收下一項訂閱數據的回調方法。
- onError:在 Publisher 或 Subcriber 遇到不可恢復的錯誤時調用此方法,之后 Subscription 不會再調用 Subscriber 其他的方法。
- onComplete:當接收完所有訂閱數據,并且發布者已經關閉后會回調這個方法。
Subscription
Subscription 為發布者和訂閱者之間的訂閱關系,用來控制消息的消費,這個里邊有兩個方法:
- public static interface Subscription {
- public void request(long n);
- public void cancel();
- }
- request:這個方法用來向數據發布者請求 n 個數據。
- cancel:取消消息訂閱,訂閱者將不再接收數據。
Processor
Processor 是一個空接口,不過它同時繼承了 Publisher 和 Subscriber,所以它既能發布數據也能訂閱數據,因此我們可以通過 Processor 來完成一些數據轉換的功能,先接收數據進行處理,處理完成后再將數據發布出去,這個也有點類似于我們 JavaEE 中的過濾器。
- public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
- }
2.1 消息訂閱初體驗
我們通過如下一段代碼體驗一下消息的訂閱與發布:
- public class FlowDemo {
- public static void main(String[] args) {
- SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
- Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
- private Flow.Subscription subscription;
- @Override
- public void onSubscribe(Flow.Subscription subscription) {
- this.subscription = subscription;
- //向數據發布者請求一個數據
- this.subscription.request(1);
- }
- @Override
- public void onNext(String item) {
- System.out.println("接收到 publisher 發來的消息了:" + item);
- //接收完成后,可以繼續接收或者不接收
- //this.subscription.cancel();
- this.subscription.request(1);
- }
- @Override
- public void onError(Throwable throwable) {
- //出現異常,就會來到這個方法,此時直接取消訂閱即可
- this.subscription.cancel();
- }
- @Override
- public void onComplete() {
- //發布者的所有數據都被接收,并且發布者已經關閉
- System.out.println("數據接收完畢");
- }
- };
- //配置發布者和訂閱者
- publisher.subscribe(subscriber);
- for (int i = 0; i < 5; i++) {
- //發送數據
- publisher.submit("hello:" + i);
- }
- //關閉發布者
- publisher.close();
- new Scanner(System.in).next();
- }
- }
松哥稍微解釋一下上面這段代碼:
- 首先創建一個 SubmissionPublisher 對象作為消息發布者。
- 接下來創建 Flow.Subscriber 對象作為消息訂閱者,實現消息訂閱者里邊的四個方法,分別進行處理。
- 為 publisher 配置上 subscriber。
- 發送消息。
- 消息發送完成后關閉 publisher。
- 最后是讓程序不要停止,觀察消息訂閱者打印情況。
2.2 模擬 Backpressure
Backpressure 問題在 Flow API 中得到了很好的解決。Subscriber 會將 Publisher 發布的數據緩存在 Subscription 中,其長度默認為256,相關源碼如下:
- public final class Flow {
- static final int DEFAULT_BUFFER_SIZE = 256;
- public static int defaultBufferSize() {
- return DEFAULT_BUFFER_SIZE;
- }
- ...
- }
一旦超出這個數據量,publisher 就會降低數據發送速度。
我們對上面的案例進行修改,如下:
- public class FlowDemo {
- public static void main(String[] args) {
- SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
- Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
- private Flow.Subscription subscription;
- @Override
- public void onSubscribe(Flow.Subscription subscription) {
- this.subscription = subscription;
- //向數據發布者請求一個數據
- this.subscription.request(1);
- }
- @Override
- public void onNext(String item) {
- System.out.println("接收到 publisher 發來的消息了:" + item);
- //接收完成后,可以繼續接收或者不接收
- //this.subscription.cancel();
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- this.subscription.request(1);
- }
- @Override
- public void onError(Throwable throwable) {
- //出現異常,就會來到這個方法,此時直接取消訂閱即可
- this.subscription.cancel();
- }
- @Override
- public void onComplete() {
- //發布者的所有數據都被接收,并且發布者已經關閉
- System.out.println("數據接收完畢");
- }
- };
- publisher.subscribe(subscriber);
- for (int i = 0; i < 500; i++) {
- System.out.println("i--------->" + i);
- publisher.submit("hello:" + i);
- }
- //關閉發布者
- publisher.close();
- new Scanner(System.in).next();
- }
- }
一共修改了三個地方:
- Subscriber#onNext 方法中,每次休息兩秒再處理下一條數據。
- 發布數據時,一共發布 500 條數據。
- 打印數據發布的日志。
修改完成后,我們再次啟動項目,觀察控制臺輸出:
可以看到,生產者先是一股腦生產了 257 條數據(hello0 在一開始就被消費了,所以緩存中實際上是 256 條),消息則是一條一條的來,由于消費的速度比較慢,所以當緩存中的數據超過 256 條之后,接下來都是消費一條,再發送一條。
2.3 數據處理
Flow.Processor 可以像過濾器一樣,對數據進行預處理,數據從 publisher 出來之后,先進入 Flow.Processor 中進行預處理,然后再進入 Subscriber。
修改后的代碼如下:
- public class FlowDemo {
- public static void main(String[] args) {
- class DataFilter extends SubmissionPublisher<String> implements Flow.Processor<String,String>{
- private Flow.Subscription subscription;
- @Override
- public void onSubscribe(Flow.Subscription subscription) {
- this.subscription = subscription;
- this.subscription.request(1);
- }
- @Override
- public void onNext(String item) {
- this.submit("【這是一條被處理過的數據】" + item);
- this.subscription.request(1);
- }
- @Override
- public void onError(Throwable throwable) {
- this.subscription.cancel();
- }
- @Override
- public void onComplete() {
- this.close();
- }
- }
- SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
- DataFilter dataFilter = new DataFilter();
- publisher.subscribe(dataFilter);
- Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
- private Flow.Subscription subscription;
- @Override
- public void onSubscribe(Flow.Subscription subscription) {
- this.subscription = subscription;
- //向數據發布者請求一個數據
- this.subscription.request(1);
- }
- @Override
- public void onNext(String item) {
- System.out.println("接收到 publisher 發來的消息了:" + item);
- //接收完成后,可以繼續接收或者不接收
- //this.subscription.cancel();
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- this.subscription.request(1);
- }
- @Override
- public void onError(Throwable throwable) {
- //出現異常,就會來到這個方法,此時直接取消訂閱即可
- this.subscription.cancel();
- }
- @Override
- public void onComplete() {
- //發布者的所有數據都被接收,并且發布者已經關閉
- System.out.println("數據接收完畢");
- }
- };
- dataFilter.subscribe(subscriber);
- for (int i = 0; i < 500; i++) {
- System.out.println("發送消息 i--------->" + i);
- publisher.submit("hello:" + i);
- }
- //關閉發布者
- publisher.close();
- new Scanner(System.in).next();
- }
- }
簡單起見,我這里創建了一個局部內部類 DataFilter,DataFilter 繼承自 SubmissionPublisher 并實現了 Flow.Processor 接口,由于 DataFilter 繼承自 SubmissionPublisher,所以它也兼具 SubmissionPublisher 的功能。
在 DataFilter 中完成消息的處理并重新發送出去。接下來定義 publisher,讓 dataFilter 作為其訂閱者,再定義新的訂閱者,作為 dataFilter 的訂閱者。
最終運行效果如下:
3.小結
好啦,這就是今天和大家介紹的 Java9 中的 Reactive Stream,那么至此,我們的 WebFlux 前置知識差不多告一段落了,下篇文章開始,正式開整 WebFlux。
本文轉載自微信公眾號「江南一點雨」,可以通過以下二維碼關注。轉載本文請聯系江南一點雨公眾號。