阿里二面:Java8的Stream api是迭代一次還是迭代多次
面試官:java8新增的stream api用過嗎?
我:這個必須用過啊。
面試官:給你下面一個字符串數組,如果用stream api來實現,找出以字符'a'開頭長度最大的字符串,使用stream api該怎么實現呢?
- {"abb","abcd","fegc","efe","adfes"}
我:用下面這個方法來實現:
- public static void maxLength(List<String> list){
- System.out.println(list.stream().filter(s -> s.startsWith("a")).mapToInt(r -> length(r)).max().orElse(0));;
- }
面試官:這個操作是迭代一次還是迭代兩次呢?也就是說是先迭代一遍,過濾出以字符'a'開頭的字符串數組,然后再迭代一次,找出最大長度,還是一次迭代完成呢?
我:這個是迭代一次完成,如果要是迭代多次,stream后面的操作函數很多的情況下效率會非常低。我們加個打印可以來驗證結果,代碼如下:
- public static void main(String[] args) {
- List<String> list = Arrays.asList("abb", "abcd", "fegc", "efe", "adfes");
- int maxLength = list.stream().
- filter(s -> isStartWitha(s)).
- mapToInt(StreamTest1::length).
- max().orElse(0);
- System.out.println("以字符a開頭的字符串最大長度:" + maxLength);
- }
- private static boolean isStartWitha(String a){
- System.out.println(a + " is start with a:" + a.startsWith("a"));
- return a.startsWith("a");
- }
- private static int length(String a){
- System.out.println("the length of" + a + ":" + a.length());
- return a.length();
- }
打印結果如下:
- abb is start with a:true
- the length of abb:3
- abcd is start with a:true
- the length of abcd:4
- fegc is start with a:false
- efe is start with a:false
- adfes is start with a:true
- the length of adfes:5
- 以字符a開頭的字符串最大長度:5
面試官:你確定只是迭代一次嗎?有其他情況嗎?
我:有。filter是一個無狀態的中間操作,對于這個中間操作來說,stream處理只需要迭代一次。但是對于有狀態的中間操作,就需要迭代多次。
面試官:你剛剛提到有狀態的操作和無狀態的操作,這個是怎么區分呢?
我:在stream api中,無狀態的操作是指當前元素的操作不受前面元素的影響,主要包括如下方法:
- filter(),flatMap(),flatMapToInt(),flatMapToLong(),flatMapToDouble(),map(),mapToInt(),mapToDouble(),mapToLong(),peek(),unordered()
而有狀態的操作是指需要等所有元素處理完之后才能執行當前操作,主要包括下面方法:
- distinct(),limit(),skip(),sorted(),sorted()
面試官:有狀態的操作,能舉個例子嗎?
我:比如下面這段代碼:
- public static void main(String[] args) {
- List<Integer> list = Arrays.asList(5, 2, 3, 1, 4);
- List<Integer> newArray = list.stream()
- .map(StreamTest2::map1)
- .sorted((o1, o2) -> o1 - o2)
- .map(StreamTest2::map2)
- .collect(Collectors.toList());
- System.out.println("新的有序數組:" + newArray);
- }
- private static Integer map1(Integer i) {
- int result = i * 10;
- System.out.println("線程:" + Thread.currentThread().getName() + " 方法map1入參:" + i + ",輸出:" + result);
- return result;
- }
- private static Integer map2(Integer i) {
- int result = i * 10;
- System.out.println("線程:" + Thread.currentThread().getName() + " 方法map2入參:" + i + ",輸出:" + result);
- return result;
- }
上面代碼中,對原始數組進行了兩次迭代,第一次迭代對所有數組元素都調用了map1方法乘以10,然后對新數組進行排序,第二次迭代對排序后的數組元素調用map2方法,即對排序后的數組元素乘以10。方法輸出如下:
- 線程:main 方法map1入參:5,輸出:50
- 線程:main 方法map1入參:2,輸出:20
- 線程:main 方法map1入參:3,輸出:30
- 線程:main 方法map1入參:1,輸出:10
- 線程:main 方法map1入參:4,輸出:40
- 線程:main 方法map2入參:10,輸出:100
- 線程:main 方法map2入參:20,輸出:200
- 線程:main 方法map2入參:30,輸出:300
- 線程:main 方法map2入參:40,輸出:400
- 線程:main 方法map2入參:50,輸出:500
- 新的有序數組:[100, 200, 300, 400, 500]
面試官:了解過底層原理嗎?
我:我來先畫一下Stream的UML類圖:
這個類圖說明以下幾點:
- AbstractPipeline有基本類型的子類,如LongPipeline和DoublePipeline,還有一個引用類型的子類ReferencePipeline。
- 無論是ReferencePipeline,還是LongPipeline和DoublePipeline等基本類型的Pipeline,都有3個內部類來繼承自己。
- StatelessOp對應無狀態的操作,StatefulOp對應有狀態的操作,Head對應Collection.stream()方法返回結果。
- 無論是StatelessOp、StatefulOp還是Head,都是一個Pipeline,這些Pipeline用雙向鏈表串聯起來,每個Pipeline節點被看作一個Stage,Head是鏈表的頭結點。上面UML類圖中AbstractPipeline類中previousStage和nextStage就代表雙向鏈表當前節點指向前后節點的引用。如下圖:
面試官:上面用雙向鏈表把所有操作都串聯起來了,這樣可以實現從Head節點開始依次執行所有的操作。但是這些操作怎么疊加在一起呢?比如下面這段代碼有三個map方法,后面的方法要依賴前面的計算結果:
- List<Integer> list = Arrays.asList(5, 2, 3, 1, 4);
- List<Integer> newArray = list.stream().map(StreamTest2::map1).map(StreamTest2::map2).map(StreamTest2::map3).collect(Collectors.toList());
我:Stream提供了Sink接口來處理操作的疊加。上面代碼的map方法把操作封裝到了Sink,每個節點執行操作時,調用Sink的accept方法就可以把操作結果傳給下一個節點的Sink。比如map方法源代碼如下:
- public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
- Objects.requireNonNull(mapper);
- return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
- StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
- @Override
- //返回包裝成的Sink
- Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
- return new Sink.ChainedReference<P_OUT, R>(sink) {
- @Override
- public void accept(P_OUT u) {
- //downstream是下游節點的Sink,把當前節點的執行結果傳給下游節點
- downstream.accept(mapper.apply(u));
- }
- };
- }
- };
- }
面試官:能詳細講一下Sink嗎?
我:Sink主要提供了下面4個方法
- //執行操作之前調用這個方法
- void begin(long size)
- //執行操作之后調用這個方法
- void end()
- //是否可以結束操作
- boolean cancellationRequested()
- //操作執行函數
- void accept()
對于有狀態的操作,必須實現begin和end兩個方法,因為begin方法會創建一個存放中間結果的容器,accept方法將元素放入該容器,end方法負責對容器中元素處理,比如排序。
面試官:那cancellationRequested方法什么時候用呢?
我:這個方法用于短路操作,比如stream.findAny。
面試官:你剛剛提到短路操作,怎么區分短路操作和非短路操作呢?
我:短路操作和非短路操作都是Stream的結束操作,結束操作是針對中間操作來說的。短路操作是指不用處理全部元素就可以結束,包括下面的方法:
- anyMatch(),allMatch(),noneMatch(),findFirst(),findAny()
非短路操作是指需要處理所有元素才能結束,包括下面的方法:
- forEach(),forEachOrdered(),toArray(),reduce(),collect(),max(),min(),count()
總結一下Stream操作,如下圖:
在遇到結束操作時,所有Pipeline節點封裝的Sink會串成一個鏈表,如下圖:
把Sink串成鏈表的過程可以參考下面這段源代碼:
- final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
- Objects.requireNonNull(sink);
- for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
- sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
- }
- return (Sink<P_IN>) sink;
- }
這樣從Head節點開始依次調用每個節點封裝的Sink中的begin,accept,cancellationRequested,end 四個方法就可以完成Steam流水線的執行。
面試官:上面提到了Sink會串成一個鏈,那對于有返回結果的操作,返回的結果是保存在什么地方呢?
我:這里分三種情況:
- 如果返回結果是boolean(比如 anyMatch、allMatch、noneMatch)和Optional(比如 findFirst、findAny),返回結果存放在對應的Sink。
- collect, reduce等規約操作,返回結果存放在用戶指定的容器中,比如如下代碼返回結果放在Optional容器中:
- Optional accResult = Stream.of(1, 2, 3, 4, 5).reduce((sum, item) -> {
- sum += item;
- return sum;
- });
max 和 min也是規約操作,因為底層是通過調用 reduce 方法實現的。
- 對于返回是數組的情況,返回數組之前,數據會存放在一種多叉樹數據結構中,這種多叉樹結構元素存儲在樹的葉子當中,一個葉子節點可以存放多個元素。
面試官:上面你提到返回數組的時候用到了多叉樹的結構,這樣做對于Stream處理有什么好處呢?
我:按照官方的說法,這樣做是為了避免在并行操作期間不必要地復制數據。
面試官:能簡單介紹一下Stream的并行處理嗎?
我:Stream的并行處理用到了Fork/Join框架,如下圖:
計算過程中,先把任務拆解成子任務,并行計算。計算完成后再把子任務計算結果合并成結果集。
面試官:Fork/Join框架跟普通線程池相比,有什么優勢嗎?
我:fork/join框架的優勢是, 如果某個子任務需要等待另外一個子任務完成才能繼續工作,那么處理線程會主動尋找其他未完成的子任務進行執行。跟普通線程池相比,減少了等待時間。
面試官:使用Stream并行流,一定會比串行快嗎?
我:這個不一定,使用的時候要考慮以下幾個因素:
- 要處理的元素數量,數據越多,性能提升越明顯。
- 數據結構的可分割性,數組、ArrayList支持隨機讀取,可分割性好,HashSet、TreeSet雖然可以分割,但不太容易分割均勻,LinkedList、Streams.iterate、BufferedReader.lines因為長度未知,可分解性差。
- 盡量使用基本類型,避免裝箱拆箱。
- 單個子任務花費時間越長,帶來的性能提升就會越大。
面試官:據說Stream api跟普通迭代相比有性能損耗,你怎么看?
我:對于簡單的處理操作,Stream api性能確實不如普通迭代。但是如果CPU性能好的話,使用Stream并行處理性能會明細提高。對于復雜處理操作,無論并行還是串行,Stream api有明顯的優勢。
對于并行處理,要考慮CPU的核數。