聊聊Flink:Flink的分區機制
一、前言
flink任務在執行過程中,一個流(stream)包含一個或多個分區(Stream partition)。TaskManager中的一個slot的subtask就是一個stream partition(流分區),一個Job的流(stream)分布在多個不同的Slot上執行。每一個算子可以包含一個或多個子任務(subtask),這些subtask執行在不同的分區中,本質是在不同的線程、不同的物理機或不同的容器中彼此互不依賴地執行。
1.1 Flink數據傳輸
- 組件之間的通信消息傳輸,即Client、JobManager、TaskManager之間的信息傳遞,采用Akka框架(主要用作組件間的協同,如心跳檢測、狀態上報、指標統計、作業提交和部署等)。
- 算子之間的流數據傳輸
本地線程內的流數據傳輸(同一個SubTask中):同一個SubTask內的兩個Operator(屬于同一個OperatorChain)之間的數據傳輸是方法調用,即上游算子處理完數據后,直接調用下游算子的processElement方法。
本地線程間的流數據傳輸(同一個TaskManager的不同SubTask中):即同一個TaskManager(JVM進程)中的不同Task(線程,本質上是SubTask)的算子之間的數據傳輸,通過本地內存進行數據傳遞,存在數據序列化和反序列過程。
跨網絡的流數據傳輸(不同TaskManager的SubTask中):采用Netty框架,通過Socket傳遞,也存在數據序列化和反序列過程。
flink中的重分區算子定義上下游subtask之間數據傳遞的方式,SubTask之間進行數據傳遞模式有兩種,一種是one-to-one(forwarding)模式,另一種是redistributing的模式。
1.2 重分區算子數據傳遞的兩種方式
- One-to-one:數據不需要重新分布,上游SubTask生產的數據與下游SubTask受到的數據完全一致,數據不需要重分區,也就是數據不需要經過IO,比如下圖中source->map的數據傳遞形式就是One-to-One方式。常見的map、fliter、flatMap等算子的SubTask的數據傳遞都是one-to-one的對應關系。類似于spark中的窄依賴。
- Redistributing:數據需要通過shuffle過程重新分區,需要經過IO,比如上圖中的map->keyBy。創建的keyBy、broadcast、rebalance、shuffle等算子的SubTask的數據傳遞都是Redistributing方式,但它們具體數據傳遞方式是不同的。類似于spark中的寬依賴。
圖片
flink中的重分區算子除了keyBy以外,還有broadcast、rebalance、shuffle、rescale、global、partitionCustom等多種算子,它們的分區方式各不相同。需要注意的是,這些算子中除了keyBy能將DataStream轉化為KeyedStream外,其它重分區算子均不會改變Stream的類型。
二、分區策略
數據在算子之間流動需要依靠分區策略(分區器),Flink目前內置了以下幾種分區策略和自定義分區策略。已實現的分區策略對應的API為:
圖片
自定義分區策略的API為CustomPartitionerWrapper。
各個API的繼承關系如下圖所示:
圖片
ChannelSelector是分區策略的頂層接口,其決定了記錄應該寫入哪個邏輯通道,通道可理解為下游算子的某個實例,或下游并行算子的某個子任務。該接口的定義源碼如下:
圖片
抽象類StreamPartitioner實現了ChannelSelector接口,是一個用于流程序的特殊的ChannelSelector,其中定義了一些通用的分區策略方法。Flink中的所有分區策略(分區器)都繼承了StreamPartitioner類,并且實現了各自獨有的分區規則。
三、內置分區策略
3.1 BinaryHashPartitioner
該分區策略位于Blink的Table API的org.apache.flink.table.runtime.partitioner包中,是一種針對BinaryRowData的哈希分區器。BinaryRowData是RowData的實現,可以顯著減少Java對象的序列化/反序列化。RowData用于表示結構化數據類型,運行時通過Table API或SQL管道傳遞的所有頂級記錄都是RowData的實例。關于BinaryHashPartitioner,我們這里不做過多講解。
3.2 BroadcastPartitioner
廣播分區策略將上游數據記錄輸出到下游算子的每個并行實例中,即下游每個分區都會有上游的所有數據。使用DataStream的broadcast()方法即可設置該DataStream向下游發送數據時使用廣播分區策略。
來一段代碼演示下:
/**
* 微信公眾號:老周聊架構
*/
public class PartitionerTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6);
//1.分區策略前的操作
//輸出dataStream每個元素及所屬的子任務編號
dataStream.map(new RichMapFunction<Integer, Object>() {
@Override
public Object map(Integer value) throws Exception {
System.out.println(String.format("元素值: %s, 分區策略前,子任務編號: %s", value,
getRuntimeContext().getIndexOfThisSubtask()));
return value;
}
});
//2.設置分區策略
//設置DataStream向下游發送數據時使用的策略
DataStream<Integer> dataStreamAfter = dataStream.broadcast();
//3.分區策略后的操作
dataStreamAfter.map(new RichMapFunction<Integer, Object>() {
@Override
public Object map(Integer value) throws Exception {
System.out.println(String.format("元素值: %s, 分區策略后,子任務編號: %s", value,
getRuntimeContext().getIndexOfThisSubtask()));
return value;
}
}).print();
env.execute("PartitionerTest Job");
}
}
直接IDEA控制臺輸出:
圖片
從輸出結果可以看出,數據共分為3個分區(編號為0、1、2)。執行分區策略前,每個元素所屬的分區:
圖片
執行分區策略后,每個元素所屬的分區如下:
圖片
對比表發現,廣播分區策略將上游每個元素分別發送到了下游算子的所有分區,這種策略會把數據復制多份,向下游算子的每個分區發送一份。
圖片
我們把上面的任務提交到Flink,同樣也可以看出前面分區前每個子任務兩條數據,分區后每個子任務六條數據。
圖片
圖片
3.3 ForwardPartitioner
轉發分區策略只將元素轉發給本地運行的下游算子的實例,即將元素發送到與當前算子實例在同一個TaskManager的下游算子實例,而不需要進行網絡傳輸。要求上下游算子并行度一樣,這樣上下游算子可以同屬一個子任務。
這里把上面的代碼調整下:
dataStream.forward()
IDEA控制臺輸出:
圖片
從輸出結果可以看出,數據共分為3個分區(編號為0、1、2)。執行分區策略前,每個元素所屬的分區:
圖片
執行分區策略后,每個元素所屬的分區如下:
圖片
對比發現,轉發分區策略將上游同一個分區的元素發送到了下游同一個分區中。使用數據流圖表示如下圖:
圖片
在上下游的算子沒有指定分區策略的情況下,如果上下游的算子并行度一致,則默認使用ForwardPartitioner,否則使用RebalancePartitioner。在StreamGraph類的源碼中可以看到該規則:
圖片
對于ForwardPartitioner,必須保證上下游算子并行度一致,否則會拋出異常。
圖片
3.4 GlobalPartitioner
全局分區策略將上游所有元素發送到下游子任務編號等于0的分區算子實例上(下游第一個實例)。
這里把上面的代碼調整下:
dataStream.global()
IDEA控制臺輸出:
圖片
分區前:
圖片
分區后:
圖片
全局分區策略將上游所有分區中的所有元素發送到了下游編號為0的分區中:
圖片
3.5 .KeyGroupStreamPartitioner
Key分區策略根據元素Key的Hash值輸出到下游算子指定的實例。keyBy()算子底層正是使用的該分區策略,底層最終會調用KeyGroupStreamPartitioner的selectChannel()方法,計算每個Key對應的通道索引(通道編號,可理解為分區編號),根據通道索引將Key發送到下游相應的分區中。selectChannel()方法源碼如下:
圖片
圖片
總的來說,Flink底層計算通道索引(分區編號)的流程如下:
- 計算Key的HashCode值。
- 將Key的HashCode值進行特殊的Hash處理,即MathUtils.murmurHash(keyHash),返回一個非負哈希碼。
- 將非負哈希碼除以最大并行度取余數,得到keyGroupId,即Key組索引。
- 使用公式keyGroupId×parallelism/maxParallelism得到分區編號。parallelism為當前算子的并行度,即通道數量;maxParallelism為系統默認支持的最大并行度,即128。
3.6 RebalancePartitioner
平衡分區策略使用循環遍歷下游分區的方式,將上游元素均勻分配給下游算子的每個實例。每個下游算子的實例都具有相等的負載。當數據流中的元素存在數據傾斜時,使用該策略對性能有很大的提升。
這里把上面的代碼調整下:
dataStream.setParallelism(2);
dataStreamAfter.setParallelism(3);
dataStream.rebalance()
IDEA控制臺輸出:
圖片
分區前:
圖片
分區后:
圖片
平衡分區策略將上游所有元素均勻發送到了下游算子的所有分區:
圖片
3.7 RescalePartitioner
重新調節分區策略基于上下游算子的并行度,將元素以循環的方式輸出到下游算子的每個實例。類似于平衡分區策略,但又與平衡分區策略不同。
上游算子將元素發送到下游哪一個算子實例,取決于上游和下游算子的并行度。例如,如果上游算子的并行度為2,而下游算子的并行度為4,那么一個上游算子實例將把元素均勻分配給兩個下游算子實例,而另一個上游算子實例將把元素均勻分配給另外兩個下游算子實例。相反,如果下游算子的并行度為2,而上游算子的并行度為4,那么兩個上游算子實例將分配給一個下游算子實例,而另外兩個上游算子實例將分配給另一個下游算子實例。
假設上游算子并行度為2,分區編號為A和B,下游算子并行度為4,分區編號為1、2、3、4,那么A將把數據循環發送給1和2,B則把數據循環發送給3和4。假設上游算子并行度為4,編號為A、B、C、D,下游算子并行度為2,編號為1、2,那么A和B把數據發送給1,C和D則把數據發送給2。
這里把上面的代碼調整下:
dataStream.rescale()
同時將第一個map算子的并行度設置為2,第二個map算子的并行度設置為4。
IDEA控制臺輸出:
圖片
分區前:
圖片
分區后:
圖片
圖片
接下來改變map算子的并行度,將第一個map算子的并行度設置為4,第二個map算子的并行度設置為2。
圖片
如果想將元素均勻地輸出到下游算子的每個實例,以實現負載均衡,同時又不希望使用平衡分區策略的全局負載均衡,則可以使用重新調節分區策略。該策略會盡可能避免數據在網絡間傳輸,而能否避免還取決于TaskManager的Task Slot數量、上下游算子的并行度等。
3.8 ShufflePartitioner
隨機分區策略將上游算子元素輸出到下游算子的隨機實例中。元素會被均勻分配到下游算子的每個實例。這種策略可以實現計算任務的負載均衡。
這里把上面的代碼調整下:
dataStream.shuffle()
這里就不做過多演示了。我們下面來看下自定義分區策略。
四、自定義分區策略
自定義分區策略的API為CustomPartitionerWrapper。該策略允許開發者自定義規則將上游算子元素發送到下游指定的算子實例中。
4.1 新建自定義分區器
新建分區器類MyCustomPartitioner并實現接口Partitioner(Object表示分區Key的數據類型),實現其中未實現的方法partition(),在該方法中添加相應的分區邏輯。
/**
* 自定義分區策略
* 微信公眾號:老周聊架構
*/
public class MyCustomPartitioner implements Partitioner {
@Override
public int partition(Object key, int numPartitions) {
if (key.equals("chinese")) {
return 0;
} else if (key.equals("math")) {
return 1;
} else {
return 2;
}
}
}
上述代碼通過partition()方法取得分區編號,將Key值等于chinese的元素分配到編號為0的分區,將Key值等于math的元素分配到編號為1的分區,其余元素分配到編號為2的分區。
4.2 使用自定義分區器
調用DataStream的partitionCustom()方法傳入自定義分區器類MyCustomPartitioner的實例,可以對DataStream按照自定義規則進行重新分區,代碼如下:
/**
* 自定義分區策略
* 微信公眾號:老周聊架構
*/
public class CustomPartitionerTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> dataStream = env.fromElements("chinese,98", "math,88", "english,96");
//1.分區策略前的操作
//輸出dataStream每個元素及所屬的子任務編號
SingleOutputStreamOperator<Map<String, Integer>> dataStreamBefore =
dataStream.map(new RichMapFunction<String, Map<String, Integer>>() {
@Override
public Map<String, Integer> map(String value) throws Exception {
System.out.println(String.format("元素值: %s, 分區策略前,子任務編號: %s", value,
getRuntimeContext().getIndexOfThisSubtask()));
Map<String, Integer> map = new HashMap<>();
map.put(value.split(",")[0], Integer.parseInt(value.split(",")[1]));
return map;
}
}).setParallelism(2);
//2.設置分區策略
//設置DataStream向下游發送數據時使用的策略
DataStream<Map<String, Integer>> dataStreamAfter = dataStreamBefore.partitionCustom(new MyCustomPartitioner(), value -> value);
//3.分區策略后的操作
dataStreamAfter.map(new RichMapFunction<Map<String, Integer>, Map<String, Integer>>() {
@Override
public Map<String, Integer> map(Map<String, Integer> value) throws Exception {
System.out.println(String.format("元素值: %s, 分區策略后,子任務編號: %s", value,
getRuntimeContext().getIndexOfThisSubtask()));
return value;
}
}).setParallelism(3).print();
env.execute("CustomPartitionerTest Job");
}
}
分區前:
圖片
分區后:
圖片
自定義分區策略將上游所有元素按照自定義的規則發送到了下游的3個分區中。
把任務給到Flink上去跑,發現:
圖片
這是因為泛型擦除,下面的DataStream泛型需要指定類型,不能
圖片
小知識:
在編譯之后程序會采取去泛型化的措施。也就是說Java中的泛型,只在編譯階段有效。在編譯過程中,正確檢驗泛型結果后,在運行時會將泛型的相關信息擦除,編譯器只會在對象進入JVM和離開JVM的邊界處添加類型檢查和轉換的方法,泛型的信息不會進入到運行時階段,這就是所謂的Java類型擦除。
類型加好以后,再跑一下任務,會出現任務成功。
圖片
圖片