圖解 Kafka 源碼之服務端啟動流程
前面「八篇」文章通過「場景驅動方式」帶你深度剖析了 Kafka「日志系統」源碼架構設計的方方面面,從今天開始,我們來深度剖析 Kafka「Controller」的底層源碼實現,這是 Controller 系列第一篇,我們先回過頭來繼續來深度聊聊「Kafka 服務端啟動的流程」,看看 Kafka 服務端是如何啟動的。
一、總體概述
在深入剖析Kafka「Controller」之前,我想你可能或多或少會有這樣的疑問:
Kafka 服務端都有哪些組件,這些組件又是通過哪個類來啟動的呢?
這里我們通過啟動 Kafka 來了解,大家都知道,啟動 Kafka 可以執行以下命令來啟動:
# 1、啟動 kafka 服務命令:
bin/kafka-server-start.sh config/server.properties &
那么今天就來看看通過這個腳本 KafkaServer 初始化了哪些組件。
二、kafka-server-start.sh
我們來看下里面的 shell 內容,如下:
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 1、注釋說明該腳本的版權信息和使用許可。
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
# 2、檢查命令行參數的個數,若小于 1 則輸出腳本的使用方法并退出。
base_dir=$(dirname $0)
# 3、獲取當前腳本所在目錄的路徑,并將其賦值給 base_dir 變量。
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.cnotallow=file:$base_dir/../config/log4j.properties"
fi
# 4、檢查 KAFKA_LOG4J_OPTS 環境變量是否設置,若未設置則設置該變量的值。
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT="9999"
export JMX_RMI_PORT="10000"
fi
# 5、檢查 KAFKA_HEAP_OPTS 環境變量是否設置,若未設置則設置該變量的值,并設置 JMX_PORT 和 JMX_RMI_PORT 環境變量的值,將 EXTRA_ARGS 變量的值設置為字符串 -name kafkaServer -loggc。
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
# 6、檢查命令行參數中 COMMAND 變量的值是否為 -daemon,若是則將 EXTRA_ARGS 變量的值添加 -daemon 選項。同時將命令行參數向左移一位,即從 $2 開始計算參數。
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
# 7、調用 $base_dir/kafka-run-class.sh 腳本并傳遞相應的參數。其中 "@ 代表傳遞的為命令行參數。具體執行的封裝在 Kafka 客戶端庫中的 kafka.Kafka 類。整個腳本的作用是啟動 Kafka 服務。
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
esac
# 7、調用 $base_dir/kafka-run-class.sh 腳本并傳遞相應的參數。其中 "@ 代表傳遞的為命令行參數。具體執行的封裝在 Kafka 客戶端庫中的 kafka.Kafka 類。整個腳本的作用是啟動 Kafka 服務。
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
這里我們重點來看 「第 7 步」,它底層執行的是封裝在 Kafka 客戶端庫中的 kafka.Kafka 類。接下來我們來看下該類都做了什么。
三、kafka.Kafka 類
「Kafka.scala」類源碼在 Kafka 源碼包的 core 包下,具體的 github 源碼位置如下:
https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/Kafka.scala。
從整體上來看,該類就 3 個方法,相對比較簡單,我能來看下里面的重點。
這里我們通過「2.8.x」版本來講解,「2.7.x」還未增加 KafkaRaftServer 類。
1、getPropsFromArgs
def getPropsFromArgs(args: Array[String]): Properties = {
// 創建一個命令行參數解析器
val optionParser = new OptionParser(false)
// 定義 --override 選項,用于覆蓋 server.properties 文件中的屬性
val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
.withRequiredArg()
.ofType(classOf[String])
// 定義 --version 選項,用于打印版本信息并退出
optionParser.accepts("version", "Print version information and exit.")
// 若沒有提供參數或者參數包含 --help 選項,則打印用法并退出
if (args.length == 0 || args.contains("--help")) {
CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName()))
}
// 若參數中包含 --version 選項,則打印版本信息并退出
if (args.contains("--version")) {
CommandLineUtils.printVersionAndDie()
}
// 加載 server.properties 文件中的屬性到 Properties 對象中
val props = Utils.loadProps(args(0))
// 若提供了其他參數,則解析這些參數
if (args.length > 1) {
// 解析參數中的選項和參數值
val options = optionParser.parse(args.slice(1, args.length): _*)
// 檢查是否有非選項參數
if (options.nonOptionArguments().size() > 0) {
CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
}
// 將解析得到的選項和參數值添加到 props 對象中
props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
}
// 返回解析得到的屬性集合
props
}
該函數的作用是從命令行參數中解析出屬性集合。它內部使用了 OptionParser 類庫來解析命令行選項,并從 server.properties 文件中加載屬性。
如果提供了 override 選項,則它將覆蓋 server.properties 文件中的相應屬性。函數返回一個 Properties 對象,其中包含了解析得到的屬性。
如果沒有提供正確的命令行參數或者提供了 --help 或 --version 選項,函數會打印幫助信息或版本信息并退出。
2、buildServer
private def buildServer(props: Properties): Server = {
val config = KafkaConfig.fromProps(props, false)
// 直接啟動定時任務、網絡層、請求處理層
if (config.requiresZookeeper) {
new KafkaServer(
config,
Time.SYSTEM,
threadNamePrefix = None,
enableForwarding = false
)
} else {
// 調用 BrokerServer 等來啟動網絡層和請求處理層
new KafkaRaftServer(
config,
Time.SYSTEM,
threadNamePrefix = None
)
}
}
在 kafka 2.8.x 版本中 新增了 raft 協議之后將 BrokerServer、ControllServer 使用了單獨的文件來啟動最終調用網絡層和請求處理層,如果還是使用 zk 的方式啟動則是 KafkaServer 啟動網絡層和請求處理層。
3、main
# 2.7.x 版本源碼
def main(args: Array[String]): Unit = {
try {
// 1、解析命令行參數,獲得屬性集合
val serverProps = getPropsFromArgs(args)
// 2、從屬性集合創建 KafkaServerStartable 對象
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
try {
// 如果不是 Windows 操作系統,并且不是 IBM JDK,則注冊 LoggingSignalHandler
if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
new LoggingSignalHandler().register()
} catch {
// 如果注冊 LoggingSignalHandler 失敗,則在日志中打印警告信息
case e: ReflectiveOperationException =>
warn("Failed to register optional signal handler that logs a message when the process is terminated " +
s"by a signal. Reason for registration failure is: $e", e)
}
// 3、添加 shutdown hook,用于在程序結束時執行 KafkaServerStartable 的 shutdown 方法
Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown())
// 4、啟動 KafkaServerStartable 實例
kafkaServerStartable.startup()
// 5、等待 KafkaServerStartable 實例終止
kafkaServerStartable.awaitShutdown()
}
catch {
// 如果有異常發生,則記錄日志并退出程序
case e: Throwable =>
fatal("Exiting Kafka due to fatal exception", e)
Exit.exit(1)
}
// 6、正常終止程序
Exit.exit(0)
}
該函數是 Kafka 服務進程的入口,它是整個 Kafka 運行過程的驅動程序。該函數首先通過調用 getPropsFromArgs 函數解析命令行參數并獲得屬性集合,然后使用這些屬性創建 KafkaServerStartable 實例。接著,它注冊一個 shutdown hook,用于在程序終止時執行 KafkaServerStartable 的 shutdown 方法。然后它啟動 KafkaServerStartable 實例,并等待該實例終止。如果發生異常,則記錄日志并退出程序。函數最后調用 Exit.exit 方法退出程序,返回 0 表示正常終止。
# 2.8.x 版本
def main(args: Array[String]): Unit = {
// 獲取Kafka服務的配置信息
val serverProps = getPropsFromArgs(args)
// 根據配置信息構建Kafka服務
val server = buildServer(serverProps)
try {
// 注冊用于記錄日志的信號處理器(若實現失敗則退出)
if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
new LoggingSignalHandler().register()
} catch {
case e: ReflectiveOperationException =>
warn("Failed to register optional signal handler that logs a message when the process is terminated " +
s"by a signal. Reason for registration failure is: $e", e)
}
// 掛載關閉處理器,用于捕獲終止信號和常規終止請求
Exit.addShutdownHook("kafka-shutdown-hook", {
try server.shutdown() // 關閉Kafka服務
catch {
case _: Throwable =>
fatal("Halting Kafka.") // 日志記錄致命錯誤信息
// 調用Exit.halt()強制退出,避免重復調用Exit.exit()引發死鎖
Exit.halt(1)
}
})
try server.startup() // 啟動Kafka服務
catch {
case _: Throwable =>
// 調用Exit.exit()設置退出狀態碼,KafkaServer.startup()會在拋出異常時調用shutdown()
fatal("Exiting Kafka.")
Exit.exit(1)
}
server.awaitShutdown() // 等待Kafka服務關閉
Exit.exit(0) // 調用Exit.exit()設置退出狀態碼
}
這里最重要的是 「第 4 步」,調用 kafkaServerStartable.startup() 或者 server.startup() 來啟動 kafka。
這里我們還是以「ZK 模式」的方式來啟動,后面抽空再進行對 「Raft 模式」啟動進行補充。
四、KafkaServerStartable
「KafkaServerStartable.scala」類源碼在 Kafka 源碼包的 core 包下,具體的 github 源碼位置如下:
https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServerStartable.scala。
在 Scala 語言里,在一個源代碼文件中同時定義相同名字的 class 和 object 的用法被稱為伴生(Companion)。Class 對象被稱為伴生類,它和 Java 中的類是一樣的;而 Object 對象是一個單例對象,用于保存一些靜態變量或靜態方法。
這里我們主要來看下 Class 類代碼。
class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter], threadNamePrefix: Option[String] = None) extends Logging {
// 創建 KafkaServer 實例
// 構造函數有兩個參數 —— staticServerConfig 表示靜態服務器配置,reporters 表示 Kafka 指標報告器。如果 threadNamePrefix 參數未用于構造函數,則默認值為 None。threadNamePrefix 參數表示線程名稱前綴,用于調試和維護目的。
private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters, threadNamePrefix = threadNamePrefix)
def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
// 啟動 KafkaServer
// startup 方法嘗試啟動 Kafka 服務器。如果啟動 Kafka 服務器時發生異常,則記錄一條 fatal 錯誤日志并退出程序。對于成功啟動的 Kafka 服務器,它將開始監聽客戶端連接,并在收到消息時執行所需的操作。
def startup(): Unit = {
try server.startup()
catch {
// 如果出現異常,則記錄日志并退出程序
case _: Throwable =>
// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka.")
Exit.exit(1)
}
}
// 關閉 KafkaServer
// shutdown 方法嘗試停止 Kafka 服務器。如果在停止服務器時出現異常,則記錄一條 fatal 錯誤日志并強制退出程序。調用 shutdown 方法后,服務器將不再接受新的請求,并開始等待當前進行中的請求完成。當所有處理中的請求都完成后,服務器將徹底停止。
def shutdown(): Unit = {
try server.shutdown()
catch {
// 如果出現異常,則記錄日志并強制退出程序
case _: Throwable =>
fatal("Halting Kafka.")
// Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
Exit.halt(1)
}
}
// setServerState 方法允許從 KafkaServerStartable 對象中設置 broker 狀態。如果自定義 KafkaServerStartable 對象想要引入新的狀態,則此方法很有用。
def setServerState(newState: Byte): Unit = {
server.brokerState.newState(newState)
}
// 等待 KafkaServer 退出
// awaitShutdown 方法等待 Kafka 服務器完全退出。在 Kafka 服務器執行 shutdown 方法后,它將不再接受新的請求。但是,服務器可能仍在處理一些已經接收的請求。awaitShutdown 方法將阻塞當前線程,直到服務器徹底停止。
def awaitShutdown(): Unit = server.awaitShutdown()
}
KafkaServerStartable 類是一個可啟動和停止的 Kafka 服務器。類中的 server 成員變量是 KafkaServer 類的實例,它將在 KafkaServerStartable 類對象啟動時創建。該類提供了啟動和停止 Kafka 服務器的方法,以及設置 broker 狀態和等待 Kafka 服務器退出的方法。
跟本文有關系的是 「啟動」方法,它調用了 KafkaServer#startup 方法進行啟動。
五、KafkaServer 類
Kafka 集群由多個 Broker 節點構成,每個節點上都運行著一個 Kafka 實例,這些實例之間基于 ZK 來發現彼此,并由集群控制器 KafkaController 統籌協調運行,彼此之間基于 socket 連接進行通信。
「KafkaServer.scala」類源碼在 Kafka 源碼包的 core 包下,具體的 github 源碼位置如下:
https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServer.scala。
KafkaServer 為 Kafka 的啟動類,其中包含了 Kafka 的所有組件,如 KafkaController、groupCoordinator、replicaManager 等。
class KafkaServer(val config: KafkaConfig, //配置信息
time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None,
kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() //監控上報
) extends Logging with KafkaMetricsGroup {
//標識節點已經啟動完成
private val startupComplete = new AtomicBoolean(false)
//標識節點正在執行關閉操作
private val isShuttingDown = new AtomicBoolean(false)
//標識節點正在執行啟動操作
private val isStartingUp = new AtomicBoolean(false)
//阻塞主線程等待 KafkaServer 的關閉
private var shutdownLatch = new CountDownLatch(1)
//日志上下文
private var logContext: LogContext = null
var metrics: Metrics = null
//記錄節點的當前狀態
val brokerState: BrokerState = new BrokerState
//API接口類,用于處理數據類請求
var dataPlaneRequestProcessor: KafkaApis = null
//API接口,用于處理控制類請求
var controlPlaneRequestProcessor: KafkaApis = null
//權限管理
var authorizer: Option[Authorizer] = None
//啟動socket,監聽9092端口,等待接收客戶端請求
var socketServer: SocketServer = null
//數據類請求處理線程池
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
//命令類處理線程池
var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
//日志管理器
var logDirFailureChannel: LogDirFailureChannel = null
var logManager: LogManager = null
//副本管理器
var replicaManager: ReplicaManager = null
//topic增刪管理器
var adminManager: AdminManager = null
//token管理器
var tokenManager: DelegationTokenManager = null
//動態配置管理器
var dynamicConfigHandlers: Map[String, ConfigHandler] = null
var dynamicConfigManager: DynamicConfigManager = null
var credentialProvider: CredentialProvider = null
var tokenCache: DelegationTokenCache = null
//分組協調器
var groupCoordinator: GroupCoordinator = null
//事務協調器
var transactionCoordinator: TransactionCoordinator = null
//集群控制器
var kafkaController: KafkaController = null
//定時任務調度器
var kafkaScheduler: KafkaScheduler = null
//集群分區狀態信息緩存
var metadataCache: MetadataCache = null
//配額管理器
var quotaManagers: QuotaFactory.QuotaManagers = null
//zk客戶端配置
val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
private var _zkClient: KafkaZkClient = null
val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties"
val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap
private var _clusterId: String = null
private var _brokerTopicStats: BrokerTopicStats = null
def clusterId: String = _clusterId
// Visible for testing
private[kafka] def zkClient = _zkClient
private[kafka] def brokerTopicStats = _brokerTopicStats
....
}
1、startup
該類方法很多,我們這里只看 startup 啟動方法,來看看其內部都啟動了哪些組件,來解決本文開頭提出的問題。
/**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
*/
def startup(): Unit = {
try {
info("starting")
// 是否已關閉
if (isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
// 是否已啟動
if (startupComplete.get)
return
// 是否可以啟動
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) { // 設置broker狀態為Starting
brokerState.newState(Starting)
/* setup zookeeper */
// 連接ZK,并創建根節點
initZkClient(time)
/* initialize features */
_featureChangeListener = new FinalizedFeatureChangeListener(featureCache, _zkClient)
if (config.isFeatureVersioningSupported) {
_featureChangeListener.initOrThrow(config.zkConnectionTimeoutMs)
}
/* Get or create cluster_id */
// 從ZK獲取或創建集群id,規則:UUID的mostSigBits、leastSigBits組合轉base64
_clusterId = getOrGenerateClusterId(zkClient)
info(s"Cluster ID = $clusterId")
/* load metadata */
// 獲取brokerId及log存儲路徑,brokerId通過zk生成或者server.properties配置broker.id
// 規則:/brokers/seqid的version值 + maxReservedBrokerId(默認1000),保證唯一性
val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) = getBrokerMetadataAndOfflineDirs
/* check cluster id */
if (preloadedBrokerMetadataCheckpoint.clusterId.isDefined && preloadedBrokerMetadataCheckpoint.clusterId.get != clusterId)
throw new InconsistentClusterIdException(
s"The Cluster ID ${clusterId} doesn't match stored clusterId ${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " +
s"The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.")
/* generate brokerId */
config.brokerId = getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint)
logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
// 配置logger
this.logIdent = logContext.logPrefix
// initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
// applied after DynamicConfigManager starts.
// 初始化AdminZkClient,支持動態修改配置
config.dynamicConfig.initialize(zkClient)
/* start scheduler */
// 初始化定時任務調度器
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
kafkaScheduler.startup()
/* create and configure metrics */
// 創建及配置監控,默認使用JMX及Yammer Metrics
kafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
kafkaYammerMetrics.configure(config.originals)
val jmxReporter = new JmxReporter()
jmxReporter.configure(config.originals)
val reporters = new util.ArrayList[MetricsReporter]
reporters.add(jmxReporter)
val metricConfig = KafkaServer.metricConfig(config)
val metricsContext = createKafkaMetricsContext()
metrics = new Metrics(metricConfig, reporters, time, true, metricsContext)
/* register broker metrics */
_brokerTopicStats = new BrokerTopicStats
// 初始化配額管理器
quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)
// 用于保證kafka-log數據目錄的存在
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
/* start log manager */
// 啟動日志管理器,kafka的消息以日志形式存儲
logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
// 啟動日志清理、刷新、校驗、恢復等的定時線程
logManager.startup()
metadataCache = new MetadataCache(config.brokerId)
// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
// This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
// SCRAM認證方式的token緩存
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
// 啟動socket,監聽9092端口,等待接收客戶端請求
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup(startProcessingRequests = false)
/* start replica manager */
brokerToControllerChannelManager = new BrokerToControllerChannelManagerImpl(metadataCache, time, metrics, config, threadNamePrefix)
// 啟動副本管理器,高可用相關
replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()
brokerToControllerChannelManager.start()
// 將broker信息注冊到ZK上
val brokerInfo = createBrokerInfo
val brokerEpoch = zkClient.registerBroker(brokerInfo)
// Now that the broker is successfully registered, checkpoint its metadata
// 校驗 broker 信息
checkpointBrokerMetadata(BrokerMetadata(config.brokerId, Some(clusterId)))
/* start token manager */
// 啟動 token 管理器
tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
tokenManager.startup()
/* start kafka controller */
// 啟動Kafka控制器,只有 Leader 會與ZK建連
kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
kafkaController.startup()
// admin管理器
adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
// 啟動集群群組協調器
groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics)
groupCoordinator.startup()
/* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
// 啟動事務協調器
transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
transactionCoordinator.startup()
/* Get the authorizer and initialize it if one is specified.*/
// ACL
authorizer = config.authorizer
authorizer.foreach(_.configure(config.originals))
val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
case Some(authZ) =>
authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) =>
ep -> cs.toCompletableFuture
}
case None =>
brokerInfo.broker.endPoints.map { ep =>
ep.toJava -> CompletableFuture.completedFuture[Void](null)
}.toMap
}
// 創建拉取管理器
val fetchManager = new FetchManager(Time.SYSTEM,
new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
/* start processing requests */
// 初始化數據類請求的KafkaApis,負責數據類請求邏輯處理
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
// 初始化數據類請求處理的線程池
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
// 初始化控制類請求的 KafkaApis
controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
// 初始化控制類請求的線程池
controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
}
Mx4jLoader.maybeLoad()
/* Add all reconfigurables for config change notification before starting config handlers */
config.dynamicConfig.addReconfigurables(this)
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),
ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
// Create the config manager. start listening to notifications
// 啟動動態配置處理器
dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()
// 啟動請求處理線程
socketServer.startProcessingRequests(authorizerFutures)
// 更新broker狀態
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(metricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}
這里總結下該方法都啟動了哪些組件:
- initZkClient(time) 初始化 Zk。
- kafkaScheduler 定時器。
- logManager 日志模塊。
- MetadataCache 元數據緩存。
- socketServer 網絡服務器。
- replicaManager 副本模塊。
- kafkaController 控制器。
- groupCoordinator 協調器用于和ConsumerCoordinator 交互
- transactionCoordinator 事務相關
- fetchManager 副本拉取管理器。
- dynamicConfigManager 動態配置管理器。
2、Broker 狀態
這個是在 2.7.x 版本之前的狀態,在 2.8.x 之后版本進行了重構。
sealed trait BrokerStates { def state: Byte }
case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
- NotRunning:初始狀態,標識當前 broker 節點未運行。
- Starting:標識當前 broker 節點正在啟動中。
- RecoveringFromUncleanShutdown:標識當前 broker 節點正在從上次非正常關閉中恢復。
- RuningAsBroker:標識當前 broker 節點啟動成功,可以對外提供服務。
- PendingControlledShutdown:標識當前 broker 節點正在等待 controlled shutdown 操作完成。
- BrokerShuttingDown:標識當前 broker 節點正在執行 shutdown 操作。
這些就是 KafkaServer 中主要模塊的入口,接下來的文章會通過這些入口一一進行分析。
六、總結
這里,我們一起來總結一下這篇文章的重點。
- 文章開頭通過對「kafka-server-start.sh」內容進行剖析,引出了 「kafka.Kafka」類。
- 在「kafka.Kafka」的 main 方法中調用了「KafkaServerStartable」嘗試啟動 Kafka 服務器。
- 接著在 「KafkaServerStartable」的 startup 方法中調用了 「KafkaServer」的 startup 方法啟動服務器需要的各種組件類。
下篇我們來深度剖析「Broker 啟動集群如何感知」,大家期待,我們下期見。