AbstractFetcherThread:拉取消息分幾步?
今天我們來(lái)深入探討Kafka中的延遲處理機(jī)制,即通過(guò)DelayedOperation來(lái)實(shí)現(xiàn)的延時(shí)處理請(qǐng)求。具體來(lái)說(shuō),Kafka使用了一種名為“分層時(shí)間輪”的數(shù)據(jù)結(jié)構(gòu)來(lái)管理延時(shí)任務(wù),并通過(guò)它實(shí)現(xiàn)了對(duì)延遲請(qǐng)求的高效處理。這種延時(shí)機(jī)制廣泛應(yīng)用于Kafka的各個(gè)模塊,比如控制器、分區(qū)管理、副本同步等。
本節(jié)課我們將通過(guò)分析Kafka的相關(guān)源碼,詳細(xì)講解DelayedOperation是如何在Broker中延時(shí)處理請(qǐng)求的。同時(shí),我們還會(huì)講解兩個(gè)關(guān)鍵類:Timer和SystemTimer,看看它們是如何與Kafka的整體框架結(jié)合的。
一、Kafka延時(shí)處理機(jī)制概述
Kafka中延遲請(qǐng)求的處理場(chǎng)景非常多,比如:
- 消費(fèi)者組協(xié)調(diào)器:處理消費(fèi)者組中的成員加入和離開(kāi)時(shí)的超時(shí)。
- 控制器:在處理集群元數(shù)據(jù)的變化時(shí)需要對(duì)副本分配、Leader選舉進(jìn)行延時(shí)操作。
- 副本管理:當(dāng)副本與Leader失聯(lián)時(shí),需要延遲一段時(shí)間再?zèng)Q定是否剔除該副本。
Kafka為了應(yīng)對(duì)這些場(chǎng)景,使用了一種高效的延時(shí)處理機(jī)制:分層時(shí)間輪(Hierarchical Timing Wheels)。這個(gè)數(shù)據(jù)結(jié)構(gòu)通過(guò)將延時(shí)任務(wù)按照超時(shí)時(shí)間分層存儲(chǔ),極大地提高了處理大量延時(shí)任務(wù)的性能。
1.1 什么是分層時(shí)間輪?
分層時(shí)間輪是一種常用于處理延遲任務(wù)的數(shù)據(jù)結(jié)構(gòu),它的核心思想是將時(shí)間分為一系列固定大小的時(shí)間槽(Bucket),每個(gè)槽對(duì)應(yīng)一個(gè)時(shí)間段。延時(shí)任務(wù)會(huì)根據(jù)它的超時(shí)時(shí)間被放入相應(yīng)的時(shí)間槽中,時(shí)間輪會(huì)隨著時(shí)間推移不斷向前轉(zhuǎn)動(dòng),每當(dāng)轉(zhuǎn)到某個(gè)時(shí)間槽時(shí),執(zhí)行其中的所有任務(wù)。
Kafka實(shí)現(xiàn)的分層時(shí)間輪有多個(gè)層次,每一層的時(shí)間槽覆蓋不同的時(shí)間范圍。隨著層次的增加,每個(gè)時(shí)間槽覆蓋的時(shí)間也逐漸變大。這樣設(shè)計(jì)的好處是,可以通過(guò)較少的層次和時(shí)間槽來(lái)管理大范圍的延時(shí)任務(wù)。
二、核心類:Timer 和 SystemTimer
在Kafka中,延時(shí)任務(wù)的管理由兩個(gè)關(guān)鍵類負(fù)責(zé):
- Timer:這是時(shí)間輪的抽象接口,定義了延時(shí)任務(wù)的調(diào)度方法。
- SystemTimer:這是Timer的具體實(shí)現(xiàn),使用分層時(shí)間輪來(lái)管理任務(wù)。
接下來(lái),我們通過(guò)源碼詳細(xì)了解這兩個(gè)類的實(shí)現(xiàn)。
2.1 Timer接口
首先來(lái)看Timer接口,這是Kafka中用于管理延時(shí)任務(wù)的通用接口。它的主要方法包括:
public interface Timer {
/**
* 添加一個(gè)延時(shí)操作到定時(shí)器中。
*/
void add(DelayedOperation operation);
/**
* 觸發(fā)到期的延時(shí)操作。
*/
boolean advanceClock(long timeoutMs) throws InterruptedException;
/**
* 檢查定時(shí)器中是否有待執(zhí)行的操作。
*/
int size();
/**
* 關(guān)閉定時(shí)器。
*/
void shutdown();
}
- add(DelayedOperation operation):將一個(gè)延時(shí)任務(wù)添加到時(shí)間輪中。
- advanceClock(long timeoutMs):推進(jìn)時(shí)間輪的時(shí)鐘,觸發(fā)已經(jīng)到期的延時(shí)任務(wù)。
- size():返回當(dāng)前定時(shí)器中未執(zhí)行的任務(wù)數(shù)。
- shutdown():關(guān)閉定時(shí)器,停止任務(wù)調(diào)度。
Timer接口為Kafka中所有延時(shí)任務(wù)的管理提供了統(tǒng)一的抽象,各個(gè)模塊的延時(shí)任務(wù)都通過(guò)這個(gè)接口進(jìn)行調(diào)度。
2.2 SystemTimer類
SystemTimer是Timer接口的具體實(shí)現(xiàn),它使用了分層時(shí)間輪來(lái)管理延時(shí)任務(wù)。我們來(lái)看一下它的主要實(shí)現(xiàn):
public class SystemTimer implements Timer {
private final String executorName;
private final TimerTaskList[] timeWheel;
private final long tickMs;
private final int wheelSize;
private final long startMs;
// 構(gòu)造函數(shù),初始化時(shí)間輪
public SystemTimer(String executorName, long tickMs, int wheelSize) {
this.executorName = executorName;
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.timeWheel = new TimerTaskList[wheelSize];
this.startMs = System.currentTimeMillis();
// 初始化時(shí)間輪的每個(gè)Bucket
for (int i = 0; i < wheelSize; i++) {
timeWheel[i] = new TimerTaskList();
}
}
@Override
public void add(DelayedOperation operation) {
long expiration = operation.expirationMs();
long delayMs = expiration - System.currentTimeMillis();
int bucketIndex = (int) ((delayMs / tickMs) % wheelSize);
timeWheel[bucketIndex].add(operation);
}
@Override
public boolean advanceClock(long timeoutMs) {
long currentTimeMs = System.currentTimeMillis();
int currentBucket = (int) ((currentTimeMs - startMs) / tickMs % wheelSize);
// 處理當(dāng)前 Bucket 中的到期任務(wù)
timeWheel[currentBucket].advance();
return true;
}
@Override
public int size() {
int size = 0;
for (TimerTaskList taskList : timeWheel) {
size += taskList.size();
}
return size;
}
@Override
public void shutdown() {
// 清理所有未完成的任務(wù)
}
}
SystemTimer的核心成員變量包括:
- tickMs:時(shí)間輪的最小時(shí)間間隔,也就是時(shí)間輪每次轉(zhuǎn)動(dòng)的步長(zhǎng)。
- wheelSize:時(shí)間輪中時(shí)間槽的數(shù)量。
- timeWheel[]:時(shí)間輪的數(shù)組,每個(gè)元素對(duì)應(yīng)一個(gè)時(shí)間槽(Bucket),用來(lái)存儲(chǔ)延時(shí)任務(wù)。
2.2.1 add()方法
add()方法用于將延時(shí)任務(wù)添加到時(shí)間輪中。它通過(guò)計(jì)算任務(wù)的超時(shí)時(shí)間,確定該任務(wù)應(yīng)該存放在哪個(gè)時(shí)間槽中。計(jì)算方式是根據(jù)當(dāng)前時(shí)間和任務(wù)的超時(shí)時(shí)間,確定需要經(jīng)過(guò)多少個(gè)tick,然后取模得到對(duì)應(yīng)的時(shí)間槽。
long expiration = operation.expirationMs();
long delayMs = expiration - System.currentTimeMillis();
int bucketIndex = (int) ((delayMs / tickMs) % wheelSize);
timeWheel[bucketIndex].add(operation);
這樣,Kafka可以將延時(shí)任務(wù)按超時(shí)時(shí)間分布到不同的時(shí)間槽中,隨著時(shí)間輪的轉(zhuǎn)動(dòng)逐漸觸發(fā)這些任務(wù)。
2.2.2 advanceClock()方法
advanceClock()方法用于推進(jìn)時(shí)間輪的時(shí)鐘。當(dāng)時(shí)間輪的時(shí)鐘前進(jìn)時(shí),會(huì)檢查當(dāng)前時(shí)間槽中的任務(wù),觸發(fā)已經(jīng)到期的任務(wù)。
long currentTimeMs = System.currentTimeMillis();
int currentBucket = (int) ((currentTimeMs - startMs) / tickMs % wheelSize);
timeWheel[currentBucket].advance();
這個(gè)方法會(huì)計(jì)算當(dāng)前的時(shí)間槽索引,并處理當(dāng)前槽中的任務(wù)。Kafka通過(guò)不斷推進(jìn)時(shí)間輪的時(shí)鐘,逐步觸發(fā)延時(shí)任務(wù)的執(zhí)行。
2.2.3 TimerTaskList類
時(shí)間輪中的每個(gè)時(shí)間槽是一個(gè)TimerTaskList對(duì)象,它存儲(chǔ)了當(dāng)前槽中的所有延時(shí)任務(wù)。TimerTaskList類的實(shí)現(xiàn)如下:
public class TimerTaskList {
private final List<DelayedOperation> tasks = new LinkedList<>();
// 添加任務(wù)
public void add(DelayedOperation operation) {
tasks.add(operation);
}
// 觸發(fā)到期任務(wù)
public void advance() {
Iterator<DelayedOperation> iterator = tasks.iterator();
while (iterator.hasNext()) {
DelayedOperation task = iterator.next();
if (task.isExpired()) {
task.run();
iterator.remove();
}
}
}
public int size() {
return tasks.size();
}
}
TimerTaskList通過(guò)鏈表存儲(chǔ)延時(shí)任務(wù),并在時(shí)鐘推進(jìn)時(shí)檢查任務(wù)是否到期,執(zhí)行到期任務(wù)并將其從列表中移除。
三、Kafka中的延遲處理示例
接下來(lái)我們結(jié)合Kafka的具體場(chǎng)景,來(lái)看一下DelayedOperation是如何被應(yīng)用的。一個(gè)典型的例子就是消費(fèi)者組協(xié)調(diào)器(GroupCoordinator)中的延遲處理。
3.1 消費(fèi)者組協(xié)調(diào)器中的延遲請(qǐng)求
在Kafka的消費(fèi)者組管理中,延遲請(qǐng)求被廣泛應(yīng)用。比如,當(dāng)一個(gè)消費(fèi)者加入或離開(kāi)消費(fèi)者組時(shí),協(xié)調(diào)器需要等待一段時(shí)間,直到確定沒(méi)有其他消費(fèi)者的變更請(qǐng)求,這時(shí)就需要使用延遲操作來(lái)處理請(qǐng)求。
在GroupCoordinator中,有一個(gè)completeJoinGroupRequest()方法,它通過(guò)延遲操作來(lái)管理消費(fèi)者加入組的請(qǐng)求:
public void completeJoinGroupRequest(String groupId, int memberId, long timeoutMs) {
DelayedJoinGroup delayedJoin = new DelayedJoinGroup(groupId, memberId, timeoutMs);
this.timer.add(delayedJoin);
}
這里DelayedJoinGroup是`
DelayedOperation的一個(gè)子類,用來(lái)處理消費(fèi)者加入組的邏輯。它會(huì)被添加到timer`中,并在超時(shí)后觸發(fā)執(zhí)行。
3.2 DelayedOperation類
DelayedOperation是Kafka中所有延遲任務(wù)的基類,定義了延遲任務(wù)的基本行為。它的核心方法如下:
public abstract class DelayedOperation {
private final long deadlineMs;
public DelayedOperation(long timeoutMs) {
this.deadlineMs = System.currentTimeMillis() + timeoutMs;
}
// 檢查任務(wù)是否超時(shí)
public boolean isExpired() {
return System.currentTimeMillis() >= deadlineMs;
}
// 執(zhí)行任務(wù)
public abstract void run();
}
DelayedOperation通過(guò)isExpired()方法判斷任務(wù)是否超時(shí),并通過(guò)run()方法執(zhí)行任務(wù)。Kafka中很多延時(shí)任務(wù)都是基于這個(gè)類實(shí)現(xiàn)的。
四、總結(jié)
通過(guò)分析Kafka中的Timer和SystemTimer類,我們深入了解了Kafka如何通過(guò)分層時(shí)間輪實(shí)現(xiàn)高效的延時(shí)任務(wù)調(diào)度機(jī)制。Kafka的延時(shí)處理不僅應(yīng)用于消費(fèi)者組協(xié)調(diào)器,還廣泛用于副本管理、控制器等模塊。
延時(shí)處理機(jī)制通過(guò)將任務(wù)分層存儲(chǔ),極大地提高了Kafka處理大量延時(shí)任務(wù)的性能。這種機(jī)制的設(shè)計(jì)既簡(jiǎn)潔又高效,適用于大規(guī)模分布式系統(tǒng)的延時(shí)任務(wù)處理需求。