面試官:Kafka中的key有什么用?
我們在使用 Kafka 時,最簡單、最常用的方式是只設置 topic(主題)和 value(消息體),如下所示:這樣的話獲取消息的代碼也很簡單,如下所示:
@KafkaListener(topics = "mytopic", groupId = "my-group")
public void listen(String data) {
System.out.println("監聽到消息:" + data);
}
但是,除了我們可以設置和傳遞 topic 和 value 之外,我們還可以傳遞 key,如下圖所示:那問題來了,發送消息時設置這個 key 有什么用呢?
key的作用
發送消息時,設置 key 的作用如下:
1.決定分區
當生產者發送消息時,如果指定了 key,Kafka 會根據 key 的 hash 值來決定這條消息應該發送到哪個分區。
“
如果沒有指定 key,Kafka 會采用輪詢(早期版本)或隨機(最新版本)的方式將消息分配到其他分區中。
分區的具體實現源碼在 DefaultPartitioner 中 partition 方法中體現,核心源碼如下:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
}
指定 key 之后的分區實現代碼如下:
public static int partitionForKey(byte[] serializedKey, int numPartitions) {
return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}
以上源碼的大概含義是:使用 MurmurHash2 算法對字節數組 serializedKey 進行哈希運算,并將其結果轉換為正數,然后對 numPartitions 取模,以確定鍵在分區中的位置,返回值表示鍵所在的分區編號。
“
所以,從上述源碼可以看出,發送消息如果設置了 key 之后,會將相同 key 放到同一個分區中。
2.保證消息順序
在 Kafka 中,同一個分區中的消息是有序的。而相同的 key,根據上面的分區算法可知,它們會存放到同一個分區,這樣就能保證消息的有序性了。
3.消息過濾
對于某些應用場景,消費者可以根據消息的鍵來進行過濾或聚合操作。例如,在實時數據分析場景中,可能需要對具有相同鍵的消息進行分組處理。
Kafka 設置了 key 之后,可以通過以下方式實現消息過濾,如下代碼所示:
@KafkaListener(topics = "topicName", groupId = "groupId")
public void listen(String message, ConsumerRecord<?,?> record) {
Object key = record.key();
if (key instanceof String && ((String) key).matches("regexPattern")) {
// 處理滿足正則表達式條件的消息
}
}
也就是,我們在接收到消息之后,通過對 key 的正則匹配實現消息的過濾和聚合等操作。