成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

我們學習WebFlux 前置知識

開發 架構
Backpressure 在國內被翻譯成背壓,這個翻譯在網上被很多人吐槽,我覺得大家的吐槽是有道理的,背壓單純從字面上確實看不出來有什么意思。所以松哥這里直接用英文 Backpressure 吧。

[[402929]]

 1.Backpressure

Backpressure 在國內被翻譯成背壓,這個翻譯在網上被很多人吐槽,我覺得大家的吐槽是有道理的,背壓單純從字面上確實看不出來有什么意思。所以松哥這里直接用英文 Backpressure 吧。

Backpressure 是一種現象:當數據流從上游生產者向下游消費者傳輸的過程中,上游生產速度大于下游消費速度,導致下游的 Buffer 溢出,這種現象就叫做 Backpressure。

換句話說,上游生產數據,生產完成后通過管道將數據傳到下游,下游消費數據,當下游消費速度小于上游數據生產速度時,數據在管道中積壓會對上游形成一個壓力,這就是 Backpressure,從這個角度來說,Backpressure 翻譯成反壓、回壓似乎更合理一些。

Backpressure 會出現在有 Buffer 上限的系統中,當出現 Buffer 溢出的時候,就會有 Backpressure,對于 Backpressure,它的應對措施只有一個:丟棄新事件。那么什么是 Buffer 溢出呢?例如我的服務器可以同時處理 2000 個用戶請求,那么我就把請求上限設置為 2000,這個 2000 就是我的 Buffer,當超出 2000 的時候,就產生了 Backpressure。

2.Flow API

JDK9 中推出了 Flow API,用以支持 Reactive Programming,即響應式編程。

在響應式編程中,會有一個數據發布者 Publisher 和數據訂閱者 Subscriber,Subscriber 接收 Publisher 發布的數據并進行消費,在 Subscriber 和 Publisher 之間還存在一個 Processor,類似于一個過濾器,可以對數據進行中間處理。

JDK9 中提供了 Flow API 用以支持響應式編程,另外 RxJava 和 Reactor 等框架也提供了相關的實現。

我們來看看 JDK9 中的 Flow 類:

非常簡潔,基本上就是按照 Reactive Programming 的設計來的:

Publisher

Publisher 為數據發布者,這是一個函數式接口,里邊只有一個方法,通過這個方法將數據發布出去,Publisher 的定義如下:

  1. @FunctionalInterface 
  2. public static interface Publisher<T> { 
  3.     public void subscribe(Subscriber<? super T> subscriber); 

Subscriber

Subscriber 為數據訂閱者,這個里邊有四個方法,如下:

  1. public static interface Subscriber<T> { 
  2.     public void onSubscribe(Subscription subscription); 
  3.     public void onNext(T item); 
  4.     public void onError(Throwable throwable); 
  5.     public void onComplete(); 
  • onSubscribe:這個是訂閱成功的回調方法,用于初始化 Subscription,并且表明可以開始接收訂閱數據了。
  • onNext:接收下一項訂閱數據的回調方法。
  • onError:在 Publisher 或 Subcriber 遇到不可恢復的錯誤時調用此方法,之后 Subscription 不會再調用 Subscriber 其他的方法。
  • onComplete:當接收完所有訂閱數據,并且發布者已經關閉后會回調這個方法。

Subscription

Subscription 為發布者和訂閱者之間的訂閱關系,用來控制消息的消費,這個里邊有兩個方法:

  1. public static interface Subscription { 
  2.     public void request(long n); 
  3.     public void cancel(); 
  • request:這個方法用來向數據發布者請求 n 個數據。
  • cancel:取消消息訂閱,訂閱者將不再接收數據。

Processor

Processor 是一個空接口,不過它同時繼承了 Publisher 和 Subscriber,所以它既能發布數據也能訂閱數據,因此我們可以通過 Processor 來完成一些數據轉換的功能,先接收數據進行處理,處理完成后再將數據發布出去,這個也有點類似于我們 JavaEE 中的過濾器。

  1. public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { 

2.1 消息訂閱初體驗

我們通過如下一段代碼體驗一下消息的訂閱與發布:

  1. public class FlowDemo { 
  2.     public static void main(String[] args) { 
  3.         SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 
  4.         Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() { 
  5.             private Flow.Subscription subscription; 
  6.             @Override 
  7.             public void onSubscribe(Flow.Subscription subscription) { 
  8.                 this.subscription = subscription; 
  9.                 //向數據發布者請求一個數據 
  10.                 this.subscription.request(1); 
  11.             } 
  12.             @Override 
  13.             public void onNext(String item) { 
  14.                 System.out.println("接收到 publisher 發來的消息了:" + item); 
  15.                 //接收完成后,可以繼續接收或者不接收 
  16.                 //this.subscription.cancel(); 
  17.                 this.subscription.request(1); 
  18.             } 
  19.             @Override 
  20.             public void onError(Throwable throwable) { 
  21.                 //出現異常,就會來到這個方法,此時直接取消訂閱即可 
  22.                 this.subscription.cancel(); 
  23.             } 
  24.             @Override 
  25.             public void onComplete() { 
  26.                 //發布者的所有數據都被接收,并且發布者已經關閉 
  27.                 System.out.println("數據接收完畢"); 
  28.             } 
  29.         }; 
  30.         //配置發布者和訂閱者 
  31.         publisher.subscribe(subscriber); 
  32.         for (int i = 0; i < 5; i++) { 
  33.             //發送數據 
  34.             publisher.submit("hello:" + i); 
  35.         } 
  36.         //關閉發布者 
  37.         publisher.close(); 
  38.         new Scanner(System.in).next(); 
  39.     } 

松哥稍微解釋一下上面這段代碼:

  1. 首先創建一個 SubmissionPublisher 對象作為消息發布者。
  2. 接下來創建 Flow.Subscriber 對象作為消息訂閱者,實現消息訂閱者里邊的四個方法,分別進行處理。
  3. 為 publisher 配置上 subscriber。
  4. 發送消息。
  5. 消息發送完成后關閉 publisher。
  6. 最后是讓程序不要停止,觀察消息訂閱者打印情況。

2.2 模擬 Backpressure

Backpressure 問題在 Flow API 中得到了很好的解決。Subscriber 會將 Publisher 發布的數據緩存在 Subscription 中,其長度默認為256,相關源碼如下:

  1. public final class Flow { 
  2.     static final int DEFAULT_BUFFER_SIZE = 256; 
  3.     public static int defaultBufferSize() { 
  4.         return DEFAULT_BUFFER_SIZE; 
  5.     } 
  6.     ... 

一旦超出這個數據量,publisher 就會降低數據發送速度。

我們對上面的案例進行修改,如下:

  1. public class FlowDemo { 
  2.     public static void main(String[] args) { 
  3.         SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 
  4.  
  5.         Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() { 
  6.             private Flow.Subscription subscription; 
  7.  
  8.             @Override 
  9.             public void onSubscribe(Flow.Subscription subscription) { 
  10.                 this.subscription = subscription; 
  11.                 //向數據發布者請求一個數據 
  12.                 this.subscription.request(1); 
  13.             } 
  14.  
  15.             @Override 
  16.             public void onNext(String item) { 
  17.                 System.out.println("接收到 publisher 發來的消息了:" + item); 
  18.                 //接收完成后,可以繼續接收或者不接收 
  19.                 //this.subscription.cancel(); 
  20.                 try { 
  21.                     Thread.sleep(2000); 
  22.                 } catch (InterruptedException e) { 
  23.                     e.printStackTrace(); 
  24.                 } 
  25.                 this.subscription.request(1); 
  26.             } 
  27.  
  28.             @Override 
  29.             public void onError(Throwable throwable) { 
  30.                 //出現異常,就會來到這個方法,此時直接取消訂閱即可 
  31.                 this.subscription.cancel(); 
  32.             } 
  33.  
  34.             @Override 
  35.             public void onComplete() { 
  36.                 //發布者的所有數據都被接收,并且發布者已經關閉 
  37.                 System.out.println("數據接收完畢"); 
  38.             } 
  39.         }; 
  40.         publisher.subscribe(subscriber); 
  41.         for (int i = 0; i < 500; i++) { 
  42.             System.out.println("i--------->" + i); 
  43.             publisher.submit("hello:" + i); 
  44.         } 
  45.         //關閉發布者 
  46.         publisher.close(); 
  47.         new Scanner(System.in).next(); 
  48.     } 

一共修改了三個地方:

  1. Subscriber#onNext 方法中,每次休息兩秒再處理下一條數據。
  2. 發布數據時,一共發布 500 條數據。
  3. 打印數據發布的日志。

修改完成后,我們再次啟動項目,觀察控制臺輸出:

可以看到,生產者先是一股腦生產了 257 條數據(hello0 在一開始就被消費了,所以緩存中實際上是 256 條),消息則是一條一條的來,由于消費的速度比較慢,所以當緩存中的數據超過 256 條之后,接下來都是消費一條,再發送一條。

2.3 數據處理

Flow.Processor 可以像過濾器一樣,對數據進行預處理,數據從 publisher 出來之后,先進入 Flow.Processor 中進行預處理,然后再進入 Subscriber。

修改后的代碼如下:

  1. public class FlowDemo { 
  2.     public static void main(String[] args) { 
  3.  
  4.         class DataFilter extends SubmissionPublisher<String> implements Flow.Processor<String,String>{ 
  5.  
  6.             private Flow.Subscription subscription; 
  7.  
  8.             @Override 
  9.             public void onSubscribe(Flow.Subscription subscription) { 
  10.                 this.subscription = subscription; 
  11.                 this.subscription.request(1); 
  12.             } 
  13.  
  14.             @Override 
  15.             public void onNext(String item) { 
  16.                 this.submit("【這是一條被處理過的數據】" + item); 
  17.                 this.subscription.request(1); 
  18.             } 
  19.  
  20.             @Override 
  21.             public void onError(Throwable throwable) { 
  22.                 this.subscription.cancel(); 
  23.             } 
  24.  
  25.             @Override 
  26.             public void onComplete() { 
  27.                 this.close(); 
  28.             } 
  29.         } 
  30.  
  31.         SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 
  32.         DataFilter dataFilter = new DataFilter(); 
  33.         publisher.subscribe(dataFilter); 
  34.  
  35.         Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() { 
  36.             private Flow.Subscription subscription; 
  37.  
  38.             @Override 
  39.             public void onSubscribe(Flow.Subscription subscription) { 
  40.                 this.subscription = subscription; 
  41.                 //向數據發布者請求一個數據 
  42.                 this.subscription.request(1); 
  43.             } 
  44.  
  45.             @Override 
  46.             public void onNext(String item) { 
  47.                 System.out.println("接收到 publisher 發來的消息了:" + item); 
  48.                 //接收完成后,可以繼續接收或者不接收 
  49.                 //this.subscription.cancel(); 
  50.                 try { 
  51.                     Thread.sleep(2000); 
  52.                 } catch (InterruptedException e) { 
  53.                     e.printStackTrace(); 
  54.                 } 
  55.                 this.subscription.request(1); 
  56.             } 
  57.  
  58.             @Override 
  59.             public void onError(Throwable throwable) { 
  60.                 //出現異常,就會來到這個方法,此時直接取消訂閱即可 
  61.                 this.subscription.cancel(); 
  62.             } 
  63.  
  64.             @Override 
  65.             public void onComplete() { 
  66.                 //發布者的所有數據都被接收,并且發布者已經關閉 
  67.                 System.out.println("數據接收完畢"); 
  68.             } 
  69.         }; 
  70.         dataFilter.subscribe(subscriber); 
  71.         for (int i = 0; i < 500; i++) { 
  72.             System.out.println("發送消息 i--------->" + i); 
  73.             publisher.submit("hello:" + i); 
  74.         } 
  75.         //關閉發布者 
  76.         publisher.close(); 
  77.         new Scanner(System.in).next(); 
  78.     } 

簡單起見,我這里創建了一個局部內部類 DataFilter,DataFilter 繼承自 SubmissionPublisher 并實現了 Flow.Processor 接口,由于 DataFilter 繼承自 SubmissionPublisher,所以它也兼具 SubmissionPublisher 的功能。

在 DataFilter 中完成消息的處理并重新發送出去。接下來定義 publisher,讓 dataFilter 作為其訂閱者,再定義新的訂閱者,作為 dataFilter 的訂閱者。

最終運行效果如下:

3.小結

好啦,這就是今天和大家介紹的 Java9 中的 Reactive Stream,那么至此,我們的 WebFlux 前置知識差不多告一段落了,下篇文章開始,正式開整 WebFlux。

本文轉載自微信公眾號「江南一點雨」,可以通過以下二維碼關注。轉載本文請聯系江南一點雨公眾號。

 

責任編輯:武曉燕 來源: 江南一點雨
相關推薦

2021-05-19 10:37:16

WebFlux 前置工具

2019-08-02 11:53:50

Android開發學習

2022-12-09 07:57:15

2021-12-13 22:47:31

人工智能機器人學習

2017-03-07 15:43:28

編程語言函數數據結構

2018-09-10 05:14:38

物聯網工程物聯網IOT

2024-05-22 08:03:39

2022-09-22 08:19:26

WebFlux函數式編程

2023-02-09 08:01:12

核心組件非阻塞

2022-07-04 09:15:10

Spring請求處理流程

2021-06-02 10:39:59

ServletWebFluxSpringMVC

2010-03-09 18:34:29

Python日志

2022-05-17 11:05:16

機器學習人工智能

2011-06-16 20:05:41

SEO

2022-11-04 08:39:46

SpringWebFlux

2018-04-12 13:53:19

2018-08-05 06:48:34

2023-10-31 08:21:18

WebFlux基本用法JPA

2023-09-04 11:52:53

SpringMVC性能

2010-04-30 00:38:34

UNIX引號
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 91精品国产高清久久久久久久久 | 国产成人aⅴ | 日韩国产一区 | 久久国产精彩视频 | 91成人免费看片 | 国产丝袜av | 欧美精品一 | 免费的av| 最近中文字幕免费 | 久久久久久亚洲精品 | 综合激情av| 久久国产精品无码网站 | 亚洲综合色婷婷 | 亚洲区视频 | 精品日韩一区二区 | 人人干免费 | 日韩视频在线免费观看 | 欧美日韩综合 | 中文字幕一区二区三区四区五区 | 亚洲久草| 国产原创在线观看 | 久久精品亚洲精品国产欧美 | 91豆花视频 | 精品久久国产 | 欧美国产91 | 国产97久久 | 国产精品欧美一区二区三区不卡 | 91麻豆精品国产91久久久更新资源速度超快 | 国产丝袜一区二区三区免费视频 | 九九亚洲精品 | 国产999精品久久久久久 | 九九久久免费视频 | 精品麻豆剧传媒av国产九九九 | 国产精品99一区二区 | 欧美日韩国产一区二区三区 | 二区在线观看 | 在线国产一区二区三区 | 国产一级淫片a直接免费看 免费a网站 | 亚洲精品一二区 | 自拍偷拍第1页 | 欧美日日|