成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Java9異步編程-反應式流使用

開發 后端
在本文中,主要研究下Java9+中的反應流。簡單地說,就是使用Flow類,Flow類包含了用于構建響應式流處理的主要模塊。

[[438594]]

 在本文中,主要研究下Java9+中的反應流。簡單地說,就是使用Flow類,Flow類包含了用于構建響應式流處理的主要模塊。

這里說的反應流其實是一種非阻塞式背壓的異步流處理標準(有點繞口)。

該規范在響應式宣言中定義,并且有各種各樣的實現,例如RxJava或Akka-Streams。

Reactive API總覽

要構建一個流,主要使用三個抽象,并將它們組合成異步處理邏輯。

每個流都需要處理由Publisher實例發布給它的事件;發布者有一個subscribe()的方法。

如果某個訂閱者希望接收發布者發布的事件,則需要使用subscribe()訂閱發布者。

消息的接收方需要實現訂閱者接口。一般情況下,接受者是每個Flow處理的結束,因為它的實例不會進一步發送消息。

可以將Subscriber看作Sink。有四個方法需要重寫onSubscribe(), onNext(), onError()和onComplete()。

如果希望轉換傳入的消息并將其進一步傳遞給下一個訂閱服務,則需要實現Processor接口。

它既充當訂閱服務(因為它接收消息),又充當發布服務(因為它處理這些消息并將它們發送以進行進一步處理)。

發布和消費消息

假設想要創建一個簡單的流,其中有一個發布者發布消息,一個簡單的訂閱者在消息到達時使用消息。

先創建一個EndSubscriber類。需要實現訂閱服務接口。接下來,重寫所需的方法。

onSubscribe()方法在處理開始之前被調用。

訂閱的實例subscription作為參數傳遞。Subscription是控制訂閱服務和發布服務之間的消息流的類.

  1.  1public class EndSubscriber<T> implements Subscriber<T> { 
  2.  2       // 多少消息需要消費 
  3.  3    private final AtomicInteger howMuchMessagesToConsume; 
  4.  4    private Flow.Subscription subscription; 
  5.  5    // 保存消費過的消息 
  6.  6    public List<T> consumedElements = new LinkedList<>(); 
  7.  7 
  8.  8    public EndSubscriber(Integer howMuchMessagesToConsume) { 
  9.  9        this.howMuchMessagesToConsume = new AtomicInteger(howMuchMessagesToConsume); 
  10. 10    } 
  11. 11 
  12. 12    @Override 
  13. 13    public void onSubscribe(Flow.Subscription subscription) { 
  14. 14        this.subscription = subscription; 
  15. 15        subscription.request(1); 
  16. 16    } 
  17. 17} 

在這里還初始化了一個在測試中使用的消耗元素的空列表。

現在,需要從訂閱者接口實現其余的方法。這里的主要方法是onNext(),它在發布者發布新消息時被調用

  1. 1@Override 
  2. 2public void onNext(T item) { 
  3. 3    System.out.println("Got : " + item); 
  4. 4    consumedElements.add(item); 
  5. 5    subscription.request(1); 
  6. 6} 

這里需要注意的的是,當在onSubscribe()方法中啟動開始訂閱時,以及當處理消息時onNext(),需要調用subscription上的request()方法來通知當前訂閱器準備使用更多消息。

最后,需要實現onError(),它會在處理過程中拋出異常時被調用.

在發布者關閉時調用onComplete().

  1. 1@Override 
  2. 2public void onError(Throwable t) { 
  3. 3    t.printStackTrace(); 
  4. 4} 
  5. 6@Override 
  6. 7public void onComplete() { 
  7. 8    System.out.println("Done"); 
  8. 9} 

接下來為這個處理流編寫一個測試。將使用SubmissionPublisher類,這是java.util.concurrent中的一個類,它實現了Publisher接口。

測試中向發布者提交N個元素,我們的終端訂閱者會接收到這些元素。

  1.  1@Test 
  2.  2public void whenSubscribeToIt_thenShouldConsumeAll()  
  3.  3  throws InterruptedException { 
  4.  4 
  5.  5    // given 
  6.  6    SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 
  7.  7    EndSubscriber<String> subscriber = new EndSubscriber<>(); 
  8.  8    publisher.subscribe(subscriber); 
  9.  9    List<String> items = List.of("1""x""2""x""3""x"); 
  10. 10 
  11. 11    // when 
  12. 12    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); 
  13. 13    items.forEach(publisher::submit); 
  14. 14    publisher.close(); 
  15. 15 
  16. 16    // then 
  17. 17     await().atMost(1000, TimeUnit.MILLISECONDS) 
  18. 18       .until( 
  19. 19         () -> assertThat(subscriber.consumedElements) 
  20. 20         .containsExactlyElementsOf(items) 
  21. 21     ); 
  22. 22} 

注意,在publisher實例上調用close()方法。它將在每個訂閱者上調用onComplete()。

程序輸出如下:

  1. 1Got : 1 
  2. 2Got : x 
  3. 3Got : 2 
  4. 4Got : x 
  5. 5Got : 3 
  6. 6Got : x 
  7. 7Done 

消息的轉換

假設還希望在發布者和訂閱者之間做一些數據的轉換。

下面我創建一個TransformProcessor類,它實現了Processor并擴展了SubmissionPublisher,因為它同時包含Publisher和Subscriber。

并且將傳入一個Function將輸入轉換到輸出。

  1.  1import java.util.concurrent.Flow; 
  2.  2import java.util.concurrent.SubmissionPublisher; 
  3.  3import java.util.function.Function
  4.  4 
  5.  5public class TransformProcessor<T,R> extends SubmissionPublisher<R> implements Flow.Processor<T,R> { 
  6.  6    private Function<T,R> function
  7.  7    private Flow.Subscription subscription; 
  8.  8 
  9.  9    public TransformProcessor(Function<T, R> function) { 
  10. 10        super(); 
  11. 11        this.function = function
  12. 12    } 
  13. 13 
  14. 14    @Override 
  15. 15    public void onSubscribe(Flow.Subscription subscription) { 
  16. 16        this.subscription = subscription; 
  17. 17        subscription.request(1); 
  18. 18    } 
  19. 19 
  20. 20    @Override 
  21. 21    public void onNext(T item) { 
  22. 22        submit(function.apply(item)); 
  23. 23        subscription.request(1); 
  24. 24    } 
  25. 25 
  26. 26    @Override 
  27. 27    public void onError(Throwable t) { 
  28. 28        t.printStackTrace(); 
  29. 29    } 
  30. 30 
  31. 31    @Override 
  32. 32    public void onComplete() { 
  33. 33        close(); 
  34. 34    } 
  35. 35} 

這里的TransformProcessor將把String轉換為兩個String,看下面我寫的測試用例。

  1.  1 @Test 
  2.  2    public void whenSubscribeAndTransformElements_thenShouldConsumeAll() { 
  3.  3        // given 
  4.  4        SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 
  5.  5        Function<String, String> dup = x -> x.concat(x); 
  6.  6        TransformProcessor<String, String> transformProcessor 
  7.  7                = new TransformProcessor<>(dup); 
  8.  8        EndSubscriber<String> subscriber = new EndSubscriber<>(6); 
  9.  9        List<String> items = List.of("1""2""3"); 
  10. 10        List<String> expectedResult = List.of("11""22""33"); 
  11. 11        // when 
  12. 12        publisher.subscribe(transformProcessor); 
  13. 13        transformProcessor.subscribe(subscriber); 
  14. 14        items.forEach(publisher::submit); 
  15. 15        publisher.close(); 
  16. 16 
  17. 17        await().atMost(1000, TimeUnit.MILLISECONDS) 
  18. 18                .untilAsserted(() -> assertTrue(subscriber.consumedElements.containsAll(expectedResult))); 
  19. 19    } 

使用訂閱控制消息需求

假設只想消費第一個消息,應用一些邏輯并完成處理??梢允褂胷equest()方法來實現這一點。

修改下代碼:

  1.  1public class EndSubscriber<T> implements Flow.Subscriber<T> { 
  2.  2    // 多少消息需要消費 
  3.  3    private final AtomicInteger howMuchMessagesToConsume; 
  4.  4    private Flow.Subscription subscription; 
  5.  5    // 保存消費過的消息 
  6.  6    public List<T> consumedElements = new LinkedList<>(); 
  7.  7 
  8.  8    public EndSubscriber(Integer howMuchMessagesToConsume) { 
  9.  9        this.howMuchMessagesToConsume = new AtomicInteger(howMuchMessagesToConsume); 
  10. 10    } 
  11. 11 
  12. 12    @Override 
  13. 13    public void onSubscribe(Flow.Subscription subscription) { 
  14. 14        this.subscription = subscription; 
  15. 15        subscription.request(1); 
  16. 16    } 
  17. 17 
  18. 18    @Override 
  19. 19    public void onNext(T item) { 
  20. 20        howMuchMessagesToConsume.decrementAndGet(); // 減一 
  21. 21        System.out.println("Got : " + item); 
  22. 22        consumedElements.add(item); 
  23. 23        if (howMuchMessagesToConsume.get() > 0) { 
  24. 24            subscription.request(1); 
  25. 25        } 
  26. 26    } 
  27. 27 
  28. 28    @Override 
  29. 29    public void onError(Throwable t) { 
  30. 30        t.printStackTrace(); 
  31. 31    } 
  32. 32 
  33. 33    @Override 
  34. 34    public void onComplete() { 
  35. 35        System.out.println("Done"); 
  36. 36    } 
  37. 37} 

測試

  1.  1@Test 
  2.  2public void whenRequestForOnlyOneElement_thenShouldConsumeOne(){ 
  3.  3    // given 
  4.  4    SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 
  5.  5    EndSubscriber<String> subscriber = new EndSubscriber<>(1); 
  6.  6    publisher.subscribe(subscriber); 
  7.  7    List<String> items = List.of("1""x""2""x""3""x"); 
  8.  8    List<String> expected = List.of("1"); 
  9.  9 
  10. 10    // when 
  11. 11    assertEquals(publisher.getNumberOfSubscribers(),1); 
  12. 12    items.forEach(publisher::submit); 
  13. 13    publisher.close(); 
  14. 14 
  15. 15    // then 
  16. 16    await().atMost(1000, TimeUnit.MILLISECONDS) 
  17. 17            .untilAsserted(() -> 
  18. 18                    assertTrue(subscriber.consumedElements.containsAll(expected)) 
  19. 19            ); 
  20. 20} 

盡管發布者發布了6個元素,但EndSubscriber將只使用一個元素,因為它表示只需要處理這一個元素。

通過在Subscription上使用request()方法,我們可以實現更復雜的回壓機制來控制消息消費的速度。

責任編輯:武曉燕 來源: 碼小菜
相關推薦

2023-08-31 16:47:05

反應式編程數據流

2022-08-15 09:00:00

JavaScript前端架構

2022-03-29 07:32:38

R2DBC數據庫反應式

2023-12-26 08:15:11

反應式遠程接口

2023-09-21 08:01:27

SpringR2DBC實現數據庫

2021-03-22 08:45:30

異步編程Java

2021-05-07 16:19:36

異步編程Java線程

2024-01-31 08:26:44

2015-07-30 10:05:37

Java9JShell

2017-12-06 16:28:59

JDK 9JDK 8開發者

2023-04-10 07:44:04

java9java21java

2013-04-01 15:38:54

異步編程異步編程模型

2014-09-12 10:46:35

Java9

2015-09-16 15:11:58

C#異步編程

2020-02-06 19:12:36

Java函數式編程編程語言

2022-09-22 08:19:26

WebFlux函數式編程

2011-07-27 14:10:43

javascript

2023-01-12 11:23:11

Promise異步編程

2015-07-16 09:52:40

Java9新特性軟件開發

2013-04-01 15:25:41

異步編程異步EMP
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产成人jvid在线播放 | 91精品国产91久久久久久 | 久久国产视频播放 | 伦理一区二区 | 成人免费小视频 | 中文字幕第一页在线 | 日韩在线精品视频 | 精品乱子伦一区二区三区 | 国产无套一区二区三区久久 | 日韩爱爱网站 | 女人夜夜春 | 国产成人一区二区三区 | 久久精品欧美一区二区三区不卡 | 国产美女福利在线观看 | 亚洲精品国产成人 | 国产精品久久久久久吹潮 | 精品视频在线观看 | 国产成人久久精品一区二区三区 | 午夜资源 | 欧美日韩精品一区二区三区蜜桃 | 欧美激情精品久久久久久免费 | 一区二区三区免费看 | 性色网站| 少妇性l交大片免费一 | 国产精品无码久久久久 | 亚洲精品成人在线 | 欧美性video| 欧美精品1区2区3区 精品国产欧美一区二区 | 中文字幕视频在线 | 亚洲人精品午夜 | 一区二区手机在线 | 欧美成人专区 | 精品伊人 | 亚洲精品短视频 | 久久久久久九九九九九九 | 国产精品久久久亚洲 | 国产日韩欧美一区二区 | 久久久蜜臀国产一区二区 | 99久热在线精品视频观看 | 玖玖综合在线 | 91久久视频 |