成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Java響應式編程實踐與原理解析

開發 前端
在傳統的命令式編程模式下,程序都是按照人工編寫的指令一條條順序的同步執行,也就是說,只有當前指令運行完畢,下一條指令才開始執行。那么傳統的命令式編程有有些線程處理模型呢?

背景

在傳統的命令式編程模式下,程序都是按照人工編寫的指令一條條順序的同步執行,也就是說,只有當前指令運行完畢,下一條指令才開始執行。那么傳統的命令式編程有有些線程處理模型呢?

首先是同步阻塞式,在這種模型下,只有阻塞操作完成后,程序才能夠繼續執行。而且阻塞會浪費資源,比如等待網絡連接(數據庫請求,其他服務請求),就會導致執行線程處于空閑狀態。

第二種就是異步阻塞式,在這種方式下一般會通過線程池,創建很多線程,然后針對請求,分配空閑的線程來處理。每個處理線程當遇到阻塞操作時,還是會中斷等待操作完成,不過相對于同步阻塞的模式,減少了任務的響應時間。通過增加并行度,提升了資源利用率。

第三種是異步非阻塞,通過回調方法來摒棄阻塞操作帶來的資源浪費。不過回調函數會層層嵌套,導致回調噩夢(callback hell),讓可讀性變得很差。

為了利用第三種模型的優勢,同時又讓代碼維護性更高,spring社區推出了spring flux響應式非阻塞編程。它默認的實現叫projectreactor。projectreactor是JVM的完全非阻塞響應式編程基礎,具有高效的需求管理(以管理“背壓”的形式).它提供了可組合的異步序列API Flux(用于[0…N]元素)和 Mono(用于[0 | 1]元素),廣泛地實現了Reactive Extensions規范。

特點

響應式編程的特點包括以下幾點。待會會通過例子給大家詳細展示下。

  • 可組合性&可讀性
  • 直到訂閱才會發生任何事情
  • 采用背壓或通過其他方式消費者向生產者發出排放率過高的信號的能力
  • 具體豐富的數據流運算符
  • 高水平但高價值的抽象是并發性不可知的

實現

projectreactor引入了可組合的反應類型,它們實現Publisher同時也提供了豐富的操作符,尤其是 Flux 和Mono 。

Flux 表示一個0..N項的反應序列,可以有 完成信號、錯誤信息來結束整個流程。所以傳輸的數據為一個普通值、一個完成信號、一個錯誤信號。對應的方法為onNext()、onComplete()、onError()。

而一個Mono對象表示一個單值或空的(0..1)結果,可以認為是一種特殊的 Flux,最多可以發出一個普通值,同樣包含onComplete()、onError()。

示例

  • 靜態數據創建

直接調用just()方法進行創建,也可以通過一個Stream或者一個Iterable對象(比如List)。還有通過Flux靜態方法來生成,range方法(這個方法生成的是一個 Integer 序列,第一個參數表示起始數字,第二個參數表示,生成的個數,這里生成的數據就為1、2、3),empty() 方法就是生成一個空的序列。

Flux<String> flux1 = Flux.just("one", "two", "three");
Flux<String> flux2 = Flux.fromStream(Stream.of("one", "two", "three"));
List<String> iterable = Arrays.asList("one", "two", "three");
Flux<String> flux3 = Flux.fromIterable(iterable);
Flux<Integer> flux4 = Flux.range(1, 3);
//或者通過 #empty() 生成空數據
Flux<String> fluxEmpty = Flux.empty()

Mono 也有類似的創建方法,只是對于的 just() 方法是對應只是一個參數。而 justOrEmpty() 方法會對空值進行校驗,選擇調用 just() 或者 empty()。

//Mono 也是類型
Mono<String> monoEmpty = Mono.empty();

Mono<String> mono1 = Mono.just("one");

//justOrEmpty 可以保證傳入參數為空時也不會報錯
Mono<String> mono2 = Mono.justOrEmpty(null);

  • 動態數據創建

動態數據創建方法主要有generate與create兩種方法。

對于generate 方法,在Flux中有3個重載方法,不管是哪個方法都是會包含一個循環構造函數。在每個循環中,sink.next()方法最多被調用一次。比如在 flux_generate1 這個實例對應的方法。循環生成1~10的序列,當atomicInteger大于10的時候就調用complete()方法,發出信息通知訂閱者。flux_generate2 實例對應的方法則將atomicInteger作為一個對象,在方法中進行傳遞,并且在最后打印在控制臺上。

// generate 生成,調用 next 即生成數據,complete 則是完成了整個流程
// 一個循環中只允許調用 next 方式一次
AtomicInteger atomicInteger = new AtomicInteger();
Flux<Integer> flux_generate1 = Flux.generate(sink -> {
if(atomicInteger.incrementAndGet() > 10){
sink.complete();
}
sink.next(atomicInteger.get());
});

Flux<Integer> flux_generate2 = Flux.generate(() -> 0, (integer, sink) -> {
if (++integer > 10) {
sink.complete();
}
sink.next(integer);
return integer;
}, integer -> {
System.out.println("last integer value is " + integer);
});

執行過程解析

為了更好的理解flux的底層實現邏輯和編程思想,我們下面會給大家詳細的演示下flux.create方法的執行。尤其是前面提到的直到訂閱才會發生任何事情,這句話的真實含義。

flux.create((t) -> {
t.next("create");
t.next("create1");

}).subscribe(st->{
System.out.println(st);
});

上面是我們要執行的一段代碼。通過debug我們可以看到如下的執行過程。

  • Flux.create方法接受一個函數式接口Consumer作為輸入參數,在我們這個例子中就是。
(t) -> {
t.next("create");
t.next("create1");
}
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {
return create(emitter, OverflowStrategy.BUFFER);
}

  • 我們一路追蹤下去,發現它把我們的函數式接口賦值給了Fluxcreate對象的一個屬性source,然后就返回了。并沒有執行這個函數式接口的邏輯。
FluxCreate(Consumer<? super FluxSink<T>> source, OverflowStrategy backpressure, FluxCreate.CreateMode createMode) {
this.source = (Consumer)Objects.requireNonNull(source, "source");
this.backpressure = (OverflowStrategy)Objects.requireNonNull(backpressure, "backpressure");
this.createMode = createMode;
}

  • 那么什么時候執行我們的代碼邏輯呢,接著向下看。subscribe方法也是接收了一個函數式接口。
(st->{
System.out.println(st);
}
public final Disposable subscribe(Consumer<? super T> consumer) {
Objects.requireNonNull(consumer, "consumer");
return this.subscribe(consumer, (Consumer)null, (Runnable)null);
}

  • 下面我們看看調用subscribe后發生了什么。

調用subscribe函數之1

調用subscribe函數之2

調用subscribe函數之3

沒錯就是通過subscribe出發了Flux.create里面的執行代碼,而這個里面的每次next調用,又觸發了后面的subscriber的執行,最終將結果打印出來。

Connected to the target VM, address: '127.0.0.1:53984', transport: 'socket'
create
create1
Disconnected from the target VM, address: '127.0.0.1:53984', transport: 'socket'
Process finished with exit code 0


責任編輯:華軒 來源: 今日頭條
相關推薦

2025-03-07 10:23:46

2019-07-01 13:34:22

vue系統數據

2022-06-16 13:08:30

Combine響應式編程訂閱

2022-12-28 10:50:34

AI訓練深度學習

2021-07-14 13:12:51

2021-07-28 20:13:04

響應式編程

2025-02-06 08:24:25

AQS開發Java

2024-12-31 08:00:32

2016-10-21 11:04:07

JavaScript異步編程原理解析

2020-08-13 11:24:45

Java技術開發

2023-11-29 09:00:55

ReactuseMemo

2018-08-07 16:17:35

JavaMySQL數據庫

2017-05-04 16:35:45

2020-12-08 08:53:53

編程ThreadPoolE線程池

2020-08-31 07:19:57

MonoFlux Reactor

2022-08-28 09:05:34

分布式存儲Ceph

2023-02-28 09:07:18

ChatGPTAI

2025-05-06 01:14:00

系統編程響應式

2024-05-23 08:02:23

2024-03-20 10:48:09

Java 8內存管理
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 蜜臀网 | 一呦二呦三呦国产精品 | 毛片综合 | 做a的各种视频 | 精品日韩一区二区 | 精品在线99 | www.色午夜.com | 91精品国产综合久久久久久丝袜 | 国产欧美精品 | 欧美久久视频 | 亚洲一区二区视频 | av资源网站 | 国产精品夜色一区二区三区 | 久草网址| 国产一区在线看 | 欧美高清免费 | 久久久久久91香蕉国产 | 亚洲综合色站 | www.色53色.com | 一区二区三区四区视频 | 中文在线观看视频 | 91免费在线视频 | 成人午夜毛片 | 免费三级黄 | 亚洲欧美一区二区三区视频 | 在线国产视频观看 | 亚洲欧美日韩成人在线 | 成人免费三级电影 | 欧美视频 亚洲视频 | 毛片大全 | 午夜视频网站 | 午夜小视频免费观看 | 久久久国产精品视频 | 一区二区三区在线电影 | 最新国产精品视频 | 国产精品视频不卡 | 久久久久久一区 | 国产aⅴ爽av久久久久久久 | 亚洲在线成人 | 久久国产精品72免费观看 | 国产一区二区 |