Java9異步編程-反應式流使用
在本文中,主要研究下Java9+中的反應流。簡單地說,就是使用Flow類,Flow類包含了用于構建響應式流處理的主要模塊。
這里說的反應流其實是一種非阻塞式背壓的異步流處理標準(有點繞口)。
該規范在響應式宣言中定義,并且有各種各樣的實現,例如RxJava或Akka-Streams。
Reactive API總覽
要構建一個流,主要使用三個抽象,并將它們組合成異步處理邏輯。
每個流都需要處理由Publisher實例發布給它的事件;發布者有一個subscribe()的方法。
如果某個訂閱者希望接收發布者發布的事件,則需要使用subscribe()訂閱發布者。
消息的接收方需要實現訂閱者接口。一般情況下,接受者是每個Flow處理的結束,因為它的實例不會進一步發送消息。
可以將Subscriber看作Sink。有四個方法需要重寫onSubscribe(), onNext(), onError()和onComplete()。
如果希望轉換傳入的消息并將其進一步傳遞給下一個訂閱服務,則需要實現Processor接口。
它既充當訂閱服務(因為它接收消息),又充當發布服務(因為它處理這些消息并將它們發送以進行進一步處理)。
發布和消費消息
假設想要創建一個簡單的流,其中有一個發布者發布消息,一個簡單的訂閱者在消息到達時使用消息。
先創建一個EndSubscriber類。需要實現訂閱服務接口。接下來,重寫所需的方法。
onSubscribe()方法在處理開始之前被調用。
訂閱的實例subscription作為參數傳遞。Subscription是控制訂閱服務和發布服務之間的消息流的類.
- 1public class EndSubscriber<T> implements Subscriber<T> {
- 2 // 多少消息需要消費
- 3 private final AtomicInteger howMuchMessagesToConsume;
- 4 private Flow.Subscription subscription;
- 5 // 保存消費過的消息
- 6 public List<T> consumedElements = new LinkedList<>();
- 7
- 8 public EndSubscriber(Integer howMuchMessagesToConsume) {
- 9 this.howMuchMessagesToConsume = new AtomicInteger(howMuchMessagesToConsume);
- 10 }
- 11
- 12 @Override
- 13 public void onSubscribe(Flow.Subscription subscription) {
- 14 this.subscription = subscription;
- 15 subscription.request(1);
- 16 }
- 17}
在這里還初始化了一個在測試中使用的消耗元素的空列表。
現在,需要從訂閱者接口實現其余的方法。這里的主要方法是onNext(),它在發布者發布新消息時被調用
- 1@Override
- 2public void onNext(T item) {
- 3 System.out.println("Got : " + item);
- 4 consumedElements.add(item);
- 5 subscription.request(1);
- 6}
這里需要注意的的是,當在onSubscribe()方法中啟動開始訂閱時,以及當處理消息時onNext(),需要調用subscription上的request()方法來通知當前訂閱器準備使用更多消息。
最后,需要實現onError(),它會在處理過程中拋出異常時被調用.
在發布者關閉時調用onComplete().
- 1@Override
- 2public void onError(Throwable t) {
- 3 t.printStackTrace();
- 4}
- 5
- 6@Override
- 7public void onComplete() {
- 8 System.out.println("Done");
- 9}
接下來為這個處理流編寫一個測試。將使用SubmissionPublisher類,這是java.util.concurrent中的一個類,它實現了Publisher接口。
測試中向發布者提交N個元素,我們的終端訂閱者會接收到這些元素。
- 1@Test
- 2public void whenSubscribeToIt_thenShouldConsumeAll()
- 3 throws InterruptedException {
- 4
- 5 // given
- 6 SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
- 7 EndSubscriber<String> subscriber = new EndSubscriber<>();
- 8 publisher.subscribe(subscriber);
- 9 List<String> items = List.of("1", "x", "2", "x", "3", "x");
- 10
- 11 // when
- 12 assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
- 13 items.forEach(publisher::submit);
- 14 publisher.close();
- 15
- 16 // then
- 17 await().atMost(1000, TimeUnit.MILLISECONDS)
- 18 .until(
- 19 () -> assertThat(subscriber.consumedElements)
- 20 .containsExactlyElementsOf(items)
- 21 );
- 22}
注意,在publisher實例上調用close()方法。它將在每個訂閱者上調用onComplete()。
程序輸出如下:
- 1Got : 1
- 2Got : x
- 3Got : 2
- 4Got : x
- 5Got : 3
- 6Got : x
- 7Done
消息的轉換
假設還希望在發布者和訂閱者之間做一些數據的轉換。
下面我創建一個TransformProcessor類,它實現了Processor并擴展了SubmissionPublisher,因為它同時包含Publisher和Subscriber。
并且將傳入一個Function將輸入轉換到輸出。
- 1import java.util.concurrent.Flow;
- 2import java.util.concurrent.SubmissionPublisher;
- 3import java.util.function.Function;
- 4
- 5public class TransformProcessor<T,R> extends SubmissionPublisher<R> implements Flow.Processor<T,R> {
- 6 private Function<T,R> function;
- 7 private Flow.Subscription subscription;
- 8
- 9 public TransformProcessor(Function<T, R> function) {
- 10 super();
- 11 this.function = function;
- 12 }
- 13
- 14 @Override
- 15 public void onSubscribe(Flow.Subscription subscription) {
- 16 this.subscription = subscription;
- 17 subscription.request(1);
- 18 }
- 19
- 20 @Override
- 21 public void onNext(T item) {
- 22 submit(function.apply(item));
- 23 subscription.request(1);
- 24 }
- 25
- 26 @Override
- 27 public void onError(Throwable t) {
- 28 t.printStackTrace();
- 29 }
- 30
- 31 @Override
- 32 public void onComplete() {
- 33 close();
- 34 }
- 35}
這里的TransformProcessor將把String轉換為兩個String,看下面我寫的測試用例。
- 1 @Test
- 2 public void whenSubscribeAndTransformElements_thenShouldConsumeAll() {
- 3 // given
- 4 SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
- 5 Function<String, String> dup = x -> x.concat(x);
- 6 TransformProcessor<String, String> transformProcessor
- 7 = new TransformProcessor<>(dup);
- 8 EndSubscriber<String> subscriber = new EndSubscriber<>(6);
- 9 List<String> items = List.of("1", "2", "3");
- 10 List<String> expectedResult = List.of("11", "22", "33");
- 11 // when
- 12 publisher.subscribe(transformProcessor);
- 13 transformProcessor.subscribe(subscriber);
- 14 items.forEach(publisher::submit);
- 15 publisher.close();
- 16
- 17 await().atMost(1000, TimeUnit.MILLISECONDS)
- 18 .untilAsserted(() -> assertTrue(subscriber.consumedElements.containsAll(expectedResult)));
- 19 }
使用訂閱控制消息需求
假設只想消費第一個消息,應用一些邏輯并完成處理??梢允褂胷equest()方法來實現這一點。
修改下代碼:
- 1public class EndSubscriber<T> implements Flow.Subscriber<T> {
- 2 // 多少消息需要消費
- 3 private final AtomicInteger howMuchMessagesToConsume;
- 4 private Flow.Subscription subscription;
- 5 // 保存消費過的消息
- 6 public List<T> consumedElements = new LinkedList<>();
- 7
- 8 public EndSubscriber(Integer howMuchMessagesToConsume) {
- 9 this.howMuchMessagesToConsume = new AtomicInteger(howMuchMessagesToConsume);
- 10 }
- 11
- 12 @Override
- 13 public void onSubscribe(Flow.Subscription subscription) {
- 14 this.subscription = subscription;
- 15 subscription.request(1);
- 16 }
- 17
- 18 @Override
- 19 public void onNext(T item) {
- 20 howMuchMessagesToConsume.decrementAndGet(); // 減一
- 21 System.out.println("Got : " + item);
- 22 consumedElements.add(item);
- 23 if (howMuchMessagesToConsume.get() > 0) {
- 24 subscription.request(1);
- 25 }
- 26 }
- 27
- 28 @Override
- 29 public void onError(Throwable t) {
- 30 t.printStackTrace();
- 31 }
- 32
- 33 @Override
- 34 public void onComplete() {
- 35 System.out.println("Done");
- 36 }
- 37}
測試
- 1@Test
- 2public void whenRequestForOnlyOneElement_thenShouldConsumeOne(){
- 3 // given
- 4 SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
- 5 EndSubscriber<String> subscriber = new EndSubscriber<>(1);
- 6 publisher.subscribe(subscriber);
- 7 List<String> items = List.of("1", "x", "2", "x", "3", "x");
- 8 List<String> expected = List.of("1");
- 9
- 10 // when
- 11 assertEquals(publisher.getNumberOfSubscribers(),1);
- 12 items.forEach(publisher::submit);
- 13 publisher.close();
- 14
- 15 // then
- 16 await().atMost(1000, TimeUnit.MILLISECONDS)
- 17 .untilAsserted(() ->
- 18 assertTrue(subscriber.consumedElements.containsAll(expected))
- 19 );
- 20}
盡管發布者發布了6個元素,但EndSubscriber將只使用一個元素,因為它表示只需要處理這一個元素。
通過在Subscription上使用request()方法,我們可以實現更復雜的回壓機制來控制消息消費的速度。