成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

圖解 Kafka 源碼之服務端啟動流程

云計算 Kafka
從今天開始,我們來深度剖析 Kafka「Controller」的底層源碼實現,這是 Controller 系列第一篇,我們先回過頭來繼續來深度聊聊「Kafka 服務端啟動的流程」,看看 Kafka 服務端是如何啟動的。

前面「八篇」文章通過「場景驅動方式」帶你深度剖析了 Kafka「日志系統」源碼架構設計的方方面面,從今天開始,我們來深度剖析 Kafka「Controller」的底層源碼實現,這是 Controller 系列第一篇,我們先回過頭來繼續來深度聊聊「Kafka  服務端啟動的流程」,看看 Kafka 服務端是如何啟動的。

一、總體概述

在深入剖析Kafka「Controller」之前,我想你可能或多或少會有這樣的疑問:

Kafka  服務端都有哪些組件,這些組件又是通過哪個類來啟動的呢?

這里我們通過啟動 Kafka 來了解,大家都知道,啟動 Kafka 可以執行以下命令來啟動

# 1、啟動 kafka 服務命令:
bin/kafka-server-start.sh config/server.properties &

那么今天就來看看通過這個腳本 KafkaServer 初始化了哪些組件。

二、kafka-server-start.sh

我們來看下里面的 shell 內容,如下:

#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 1、注釋說明該腳本的版權信息和使用許可。
if [ $# -lt 1 ];
then
        echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
        exit 1
fi
# 2、檢查命令行參數的個數,若小于 1 則輸出腳本的使用方法并退出。
base_dir=$(dirname $0)
# 3、獲取當前腳本所在目錄的路徑,并將其賦值給 base_dir 變量。
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.cnotallow=file:$base_dir/../config/log4j.properties"
fi
# 4、檢查 KAFKA_LOG4J_OPTS 環境變量是否設置,若未設置則設置該變量的值。
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export JMX_PORT="9999"
    export JMX_RMI_PORT="10000"
fi
# 5、檢查 KAFKA_HEAP_OPTS 環境變量是否設置,若未設置則設置該變量的值,并設置 JMX_PORT 和 JMX_RMI_PORT 環境變量的值,將 EXTRA_ARGS 變量的值設置為字符串 -name kafkaServer -loggc。
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
# 6、檢查命令行參數中 COMMAND 變量的值是否為 -daemon,若是則將 EXTRA_ARGS 變量的值添加 -daemon 選項。同時將命令行參數向左移一位,即從 $2 開始計算參數。
COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac
# 7、調用 $base_dir/kafka-run-class.sh 腳本并傳遞相應的參數。其中 "@ 代表傳遞的為命令行參數。具體執行的封裝在 Kafka 客戶端庫中的 kafka.Kafka 類。整個腳本的作用是啟動 Kafka 服務。
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
esac
# 7、調用 $base_dir/kafka-run-class.sh 腳本并傳遞相應的參數。其中 "@ 代表傳遞的為命令行參數。具體執行的封裝在 Kafka 客戶端庫中的 kafka.Kafka 類。整個腳本的作用是啟動 Kafka 服務。
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

這里我們重點來看 「第 7 步」,它底層執行的是封裝在 Kafka 客戶端庫中的 kafka.Kafka 類。接下來我們來看下該類都做了什么。

三、kafka.Kafka 類

「Kafka.scala」類源碼在 Kafka 源碼包的 core 包下,具體的 github 源碼位置如下:

https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/Kafka.scala。

從整體上來看,該類就 3 個方法,相對比較簡單,我能來看下里面的重點。

這里我們通過「2.8.x」版本來講解,「2.7.x」還未增加 KafkaRaftServer 類。

1、getPropsFromArgs

def getPropsFromArgs(args: Array[String]): Properties = {
  // 創建一個命令行參數解析器
  val optionParser = new OptionParser(false)
  // 定義 --override 選項,用于覆蓋 server.properties 文件中的屬性
  val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
    .withRequiredArg()
    .ofType(classOf[String])
   
  // 定義 --version 選項,用于打印版本信息并退出
  optionParser.accepts("version", "Print version information and exit.")
  // 若沒有提供參數或者參數包含 --help 選項,則打印用法并退出
  if (args.length == 0 || args.contains("--help")) {
    CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName()))
  }
  // 若參數中包含 --version 選項,則打印版本信息并退出
  if (args.contains("--version")) {
    CommandLineUtils.printVersionAndDie()
  }
  // 加載 server.properties 文件中的屬性到 Properties 對象中
  val props = Utils.loadProps(args(0))
  // 若提供了其他參數,則解析這些參數
  if (args.length > 1) {
    // 解析參數中的選項和參數值
    val options = optionParser.parse(args.slice(1, args.length): _*)
    // 檢查是否有非選項參數
    if (options.nonOptionArguments().size() > 0) {
      CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
    }
    // 將解析得到的選項和參數值添加到 props 對象中
    props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
  }
  // 返回解析得到的屬性集合
  props
}

該函數的作用是從命令行參數中解析出屬性集合。它內部使用了 OptionParser 類庫來解析命令行選項,并從 server.properties 文件中加載屬性。

如果提供了 override 選項,則它將覆蓋 server.properties 文件中的相應屬性。函數返回一個 Properties 對象,其中包含了解析得到的屬性。

如果沒有提供正確的命令行參數或者提供了 --help 或 --version 選項,函數會打印幫助信息或版本信息并退出。

2、buildServer

private def buildServer(props: Properties): Server = {
    val config = KafkaConfig.fromProps(props, false)
    // 直接啟動定時任務、網絡層、請求處理層
    if (config.requiresZookeeper) {
      new KafkaServer(
        config,
        Time.SYSTEM,
        threadNamePrefix = None,
        enableForwarding = false
      )
    } else {
      // 調用 BrokerServer 等來啟動網絡層和請求處理層
      new KafkaRaftServer(
        config,
        Time.SYSTEM,
        threadNamePrefix = None
      )
    }
}

在 kafka 2.8.x 版本中 新增了 raft 協議之后將 BrokerServer、ControllServer 使用了單獨的文件來啟動最終調用網絡層和請求處理層,如果還是使用 zk 的方式啟動則是 KafkaServer 啟動網絡層和請求處理層。

3、main

# 2.7.x 版本源碼
def main(args: Array[String]): Unit = {
  try {
    // 1、解析命令行參數,獲得屬性集合
    val serverProps = getPropsFromArgs(args)
    // 2、從屬性集合創建 KafkaServerStartable 對象
    val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
    try {
      // 如果不是 Windows 操作系統,并且不是 IBM JDK,則注冊 LoggingSignalHandler
      if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
        new LoggingSignalHandler().register()
    } catch {
      // 如果注冊 LoggingSignalHandler 失敗,則在日志中打印警告信息
      case e: ReflectiveOperationException =>
        warn("Failed to register optional signal handler that logs a message when the process is terminated " +
          s"by a signal. Reason for registration failure is: $e", e)
    }
    // 3、添加 shutdown hook,用于在程序結束時執行 KafkaServerStartable 的 shutdown 方法
    Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown())
    // 4、啟動 KafkaServerStartable 實例
    kafkaServerStartable.startup()
    // 5、等待 KafkaServerStartable 實例終止
    kafkaServerStartable.awaitShutdown()
  }
  catch {
    // 如果有異常發生,則記錄日志并退出程序
    case e: Throwable =>
      fatal("Exiting Kafka due to fatal exception", e)
      Exit.exit(1)
  }
  // 6、正常終止程序
  Exit.exit(0)
}

該函數是 Kafka 服務進程的入口,它是整個 Kafka 運行過程的驅動程序。該函數首先通過調用 getPropsFromArgs 函數解析命令行參數并獲得屬性集合,然后使用這些屬性創建 KafkaServerStartable 實例。接著,它注冊一個 shutdown hook,用于在程序終止時執行 KafkaServerStartable 的 shutdown 方法。然后它啟動 KafkaServerStartable 實例,并等待該實例終止。如果發生異常,則記錄日志并退出程序。函數最后調用 Exit.exit 方法退出程序,返回 0 表示正常終止。

# 2.8.x 版本
def main(args: Array[String]): Unit = {
    // 獲取Kafka服務的配置信息
    val serverProps = getPropsFromArgs(args)
    // 根據配置信息構建Kafka服務
    val server = buildServer(serverProps)
    try {
      // 注冊用于記錄日志的信號處理器(若實現失敗則退出)
      if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
        new LoggingSignalHandler().register()
    } catch {
      case e: ReflectiveOperationException =>
        warn("Failed to register optional signal handler that logs a message when the process is terminated " +
          s"by a signal. Reason for registration failure is: $e", e)
    }
    // 掛載關閉處理器,用于捕獲終止信號和常規終止請求
    Exit.addShutdownHook("kafka-shutdown-hook", {
      try server.shutdown() // 關閉Kafka服務
      catch {
        case _: Throwable =>
          fatal("Halting Kafka.") // 日志記錄致命錯誤信息
          // 調用Exit.halt()強制退出,避免重復調用Exit.exit()引發死鎖
          Exit.halt(1)
      }
    })
    try server.startup() // 啟動Kafka服務
    catch {
      case _: Throwable =>
        // 調用Exit.exit()設置退出狀態碼,KafkaServer.startup()會在拋出異常時調用shutdown()
        fatal("Exiting Kafka.")
        Exit.exit(1)
    }
    server.awaitShutdown() // 等待Kafka服務關閉
    Exit.exit(0) // 調用Exit.exit()設置退出狀態碼
}

這里最重要的是 「第 4 步」,調用 kafkaServerStartable.startup() 或者 server.startup() 來啟動 kafka。

這里我們還是以「ZK 模式」的方式來啟動,后面抽空再進行對 「Raft 模式」啟動進行補充。

四、KafkaServerStartable

「KafkaServerStartable.scala」類源碼在 Kafka 源碼包的 core 包下,具體的 github 源碼位置如下:

https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServerStartable.scala。

在 Scala 語言里,在一個源代碼文件中同時定義相同名字的 class 和 object 的用法被稱為伴生(Companion)。Class 對象被稱為伴生類,它和 Java 中的類是一樣的;而 Object 對象是一個單例對象,用于保存一些靜態變量或靜態方法。

這里我們主要來看下 Class 類代碼。

class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter], threadNamePrefix: Option[String] = None) extends Logging {
  // 創建 KafkaServer 實例
  // 構造函數有兩個參數 —— staticServerConfig 表示靜態服務器配置,reporters 表示 Kafka 指標報告器。如果 threadNamePrefix 參數未用于構造函數,則默認值為 None。threadNamePrefix 參數表示線程名稱前綴,用于調試和維護目的。
  private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters, threadNamePrefix = threadNamePrefix)

  def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
  // 啟動 KafkaServer
  // startup 方法嘗試啟動 Kafka 服務器。如果啟動 Kafka 服務器時發生異常,則記錄一條 fatal 錯誤日志并退出程序。對于成功啟動的 Kafka 服務器,它將開始監聽客戶端連接,并在收到消息時執行所需的操作。
  def startup(): Unit = {
    try server.startup()
    catch {
      // 如果出現異常,則記錄日志并退出程序
      case _: Throwable =>
        // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
        fatal("Exiting Kafka.")
        Exit.exit(1)
    }
  }
  // 關閉 KafkaServer
  // shutdown 方法嘗試停止 Kafka 服務器。如果在停止服務器時出現異常,則記錄一條 fatal 錯誤日志并強制退出程序。調用 shutdown 方法后,服務器將不再接受新的請求,并開始等待當前進行中的請求完成。當所有處理中的請求都完成后,服務器將徹底停止。
  def shutdown(): Unit = {
    try server.shutdown()
    catch {
      // 如果出現異常,則記錄日志并強制退出程序
      case _: Throwable =>
        fatal("Halting Kafka.")
        // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
        Exit.halt(1)
    }
  }
  // setServerState 方法允許從 KafkaServerStartable 對象中設置 broker 狀態。如果自定義 KafkaServerStartable 對象想要引入新的狀態,則此方法很有用。
  def setServerState(newState: Byte): Unit = {
    server.brokerState.newState(newState)
  }
  // 等待 KafkaServer 退出
  // awaitShutdown 方法等待 Kafka 服務器完全退出。在 Kafka 服務器執行 shutdown 方法后,它將不再接受新的請求。但是,服務器可能仍在處理一些已經接收的請求。awaitShutdown 方法將阻塞當前線程,直到服務器徹底停止。
  def awaitShutdown(): Unit = server.awaitShutdown()
}

KafkaServerStartable 類是一個可啟動和停止的 Kafka 服務器。類中的 server 成員變量是 KafkaServer 類的實例,它將在 KafkaServerStartable 類對象啟動時創建。該類提供了啟動和停止 Kafka 服務器的方法,以及設置 broker 狀態和等待 Kafka 服務器退出的方法。

跟本文有關系的是 「啟動」方法,它調用了 KafkaServer#startup 方法進行啟動。

五、KafkaServer 類

Kafka 集群由多個 Broker 節點構成,每個節點上都運行著一個 Kafka 實例,這些實例之間基于 ZK 來發現彼此,并由集群控制器 KafkaController 統籌協調運行,彼此之間基于 socket 連接進行通信。

「KafkaServer.scala」類源碼在 Kafka 源碼包的 core 包下,具體的 github 源碼位置如下:

https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServer.scala。

KafkaServer 為 Kafka 的啟動類,其中包含了 Kafka 的所有組件,如 KafkaController、groupCoordinator、replicaManager 等。

class KafkaServer(val config: KafkaConfig, //配置信息
time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None,
                  kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() //監控上報
                  ) extends Logging with KafkaMetricsGroup {
  //標識節點已經啟動完成
  private val startupComplete = new AtomicBoolean(false)
  //標識節點正在執行關閉操作
  private val isShuttingDown = new AtomicBoolean(false)
  //標識節點正在執行啟動操作
  private val isStartingUp = new AtomicBoolean(false)
  //阻塞主線程等待 KafkaServer 的關閉
  private var shutdownLatch = new CountDownLatch(1)
  //日志上下文
  private var logContext: LogContext = null
  var metrics: Metrics = null
  //記錄節點的當前狀態
  val brokerState: BrokerState = new BrokerState
  //API接口類,用于處理數據類請求
  var dataPlaneRequestProcessor: KafkaApis = null
  //API接口,用于處理控制類請求
  var controlPlaneRequestProcessor: KafkaApis = null
  //權限管理
  var authorizer: Option[Authorizer] = None
  //啟動socket,監聽9092端口,等待接收客戶端請求 
  var socketServer: SocketServer = null
  //數據類請求處理線程池
  var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
  //命令類處理線程池
  var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
  //日志管理器    
  var logDirFailureChannel: LogDirFailureChannel = null
  var logManager: LogManager = null
  //副本管理器
  var replicaManager: ReplicaManager = null
  //topic增刪管理器
  var adminManager: AdminManager = null
  //token管理器
  var tokenManager: DelegationTokenManager = null
  //動態配置管理器
  var dynamicConfigHandlers: Map[String, ConfigHandler] = null
  var dynamicConfigManager: DynamicConfigManager = null
  var credentialProvider: CredentialProvider = null
  var tokenCache: DelegationTokenCache = null
  //分組協調器
  var groupCoordinator: GroupCoordinator = null
  //事務協調器
  var transactionCoordinator: TransactionCoordinator = null
  //集群控制器
  var kafkaController: KafkaController = null
  //定時任務調度器
  var kafkaScheduler: KafkaScheduler = null
  //集群分區狀態信息緩存
  var metadataCache: MetadataCache = null
  //配額管理器
  var quotaManagers: QuotaFactory.QuotaManagers = null
  //zk客戶端配置
  val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
  private var _zkClient: KafkaZkClient = null
  val correlationId: AtomicInteger = new AtomicInteger(0)
  val brokerMetaPropsFile = "meta.properties"
  val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap
  private var _clusterId: String = null
  private var _brokerTopicStats: BrokerTopicStats = null
  def clusterId: String = _clusterId
  // Visible for testing
  private[kafka] def zkClient = _zkClient
  private[kafka] def brokerTopicStats = _brokerTopicStats
  ....
}

1、startup

該類方法很多,我們這里只看 startup 啟動方法,來看看其內部都啟動了哪些組件,來解決本文開頭提出的問題。

/**
   * Start up API for bringing up a single instance of the Kafka server.
   * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
   */
  def startup(): Unit = {
    try {
      info("starting")
      // 是否已關閉
      if (isShuttingDown.get)
        throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
      // 是否已啟動
      if (startupComplete.get)
        return
      // 是否可以啟動
      val canStartup = isStartingUp.compareAndSet(false, true)
      if (canStartup) { // 設置broker狀態為Starting
        brokerState.newState(Starting)
        /* setup zookeeper */
        // 連接ZK,并創建根節點
        initZkClient(time)
        /* initialize features */
        _featureChangeListener = new FinalizedFeatureChangeListener(featureCache, _zkClient)
        if (config.isFeatureVersioningSupported) {
          _featureChangeListener.initOrThrow(config.zkConnectionTimeoutMs)
        }
        /* Get or create cluster_id */
        // 從ZK獲取或創建集群id,規則:UUID的mostSigBits、leastSigBits組合轉base64
        _clusterId = getOrGenerateClusterId(zkClient)
        info(s"Cluster ID = $clusterId")
        /* load metadata */
        // 獲取brokerId及log存儲路徑,brokerId通過zk生成或者server.properties配置broker.id
        // 規則:/brokers/seqid的version值 + maxReservedBrokerId(默認1000),保證唯一性
        val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) = getBrokerMetadataAndOfflineDirs
        /* check cluster id */
        if (preloadedBrokerMetadataCheckpoint.clusterId.isDefined && preloadedBrokerMetadataCheckpoint.clusterId.get != clusterId)
          throw new InconsistentClusterIdException(
            s"The Cluster ID ${clusterId} doesn't match stored clusterId ${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " +
            s"The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.")
        /* generate brokerId */
        config.brokerId = getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint)
        logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
        // 配置logger
        this.logIdent = logContext.logPrefix
        // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
        // applied after DynamicConfigManager starts.
        // 初始化AdminZkClient,支持動態修改配置 
        config.dynamicConfig.initialize(zkClient)
        /* start scheduler */
        // 初始化定時任務調度器
        kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
        kafkaScheduler.startup()
        /* create and configure metrics */
        // 創建及配置監控,默認使用JMX及Yammer Metrics
        kafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
        kafkaYammerMetrics.configure(config.originals)
        val jmxReporter = new JmxReporter()
        jmxReporter.configure(config.originals)
        val reporters = new util.ArrayList[MetricsReporter]
        reporters.add(jmxReporter)
        val metricConfig = KafkaServer.metricConfig(config)
        val metricsContext = createKafkaMetricsContext()
        metrics = new Metrics(metricConfig, reporters, time, true, metricsContext)
        /* register broker metrics */
        _brokerTopicStats = new BrokerTopicStats
        // 初始化配額管理器
        quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
        notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)
        // 用于保證kafka-log數據目錄的存在
        logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
        /* start log manager */
        // 啟動日志管理器,kafka的消息以日志形式存儲
        logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
        // 啟動日志清理、刷新、校驗、恢復等的定時線程
        logManager.startup()
        metadataCache = new MetadataCache(config.brokerId)
        // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
        // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
        // SCRAM認證方式的token緩存
        tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
        credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
        // Create and start the socket server acceptor threads so that the bound port is known.
        // Delay starting processors until the end of the initialization sequence to ensure
        // that credentials have been loaded before processing authentications.
        // 啟動socket,監聽9092端口,等待接收客戶端請求 
        socketServer = new SocketServer(config, metrics, time, credentialProvider)
        socketServer.startup(startProcessingRequests = false)
        /* start replica manager */
        brokerToControllerChannelManager = new BrokerToControllerChannelManagerImpl(metadataCache, time, metrics, config, threadNamePrefix)
        // 啟動副本管理器,高可用相關
        replicaManager = createReplicaManager(isShuttingDown)
        replicaManager.startup()
        brokerToControllerChannelManager.start()
        // 將broker信息注冊到ZK上
        val brokerInfo = createBrokerInfo
        val brokerEpoch = zkClient.registerBroker(brokerInfo)
        // Now that the broker is successfully registered, checkpoint its metadata
        // 校驗 broker 信息
        checkpointBrokerMetadata(BrokerMetadata(config.brokerId, Some(clusterId)))
        /* start token manager */
        // 啟動 token 管理器
        tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
        tokenManager.startup()
        /* start kafka controller */
        // 啟動Kafka控制器,只有 Leader 會與ZK建連
        kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)
        kafkaController.startup()
        // admin管理器
        adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
        /* start group coordinator */
        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
        // 啟動集群群組協調器
        groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics)
        groupCoordinator.startup()
        /* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
        // 啟動事務協調器
        transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
        transactionCoordinator.startup()
        /* Get the authorizer and initialize it if one is specified.*/
        // ACL
        authorizer = config.authorizer
        authorizer.foreach(_.configure(config.originals))
        val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
          case Some(authZ) =>
            authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) =>
              ep -> cs.toCompletableFuture
            }
          case None =>
            brokerInfo.broker.endPoints.map { ep =>
              ep.toJava -> CompletableFuture.completedFuture[Void](null)
            }.toMap
        }
        // 創建拉取管理器
        val fetchManager = new FetchManager(Time.SYSTEM,
          new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
            KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
        /* start processing requests */
        // 初始化數據類請求的KafkaApis,負責數據類請求邏輯處理
        dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
        // 初始化數據類請求處理的線程池  
        dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
          config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
        socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
          // 初始化控制類請求的 KafkaApis
          controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
            kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
            fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
          // 初始化控制類請求的線程池
          controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
            1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
        }
        Mx4jLoader.maybeLoad()
        /* Add all reconfigurables for config change notification before starting config handlers */
        config.dynamicConfig.addReconfigurables(this)
        /* start dynamic config manager */
        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),
                                                           ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
                                                           ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
                                                           ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))

        // Create the config manager. start listening to notifications
        // 啟動動態配置處理器
        dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
        dynamicConfigManager.startup()
        // 啟動請求處理線程
        socketServer.startProcessingRequests(authorizerFutures)
        // 更新broker狀態
        brokerState.newState(RunningAsBroker)
        shutdownLatch = new CountDownLatch(1)
        startupComplete.set(true)
        isStartingUp.set(false)
        AppInfoParser.registerAppInfo(metricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
        info("started")
      }
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
        isStartingUp.set(false)
        shutdown()
        throw e
    }
 }

這里總結下該方法都啟動了哪些組件:

  • initZkClient(time) 初始化 Zk。
  • kafkaScheduler  定時器。
  • logManager 日志模塊。
  • MetadataCache  元數據緩存。
  • socketServer 網絡服務器。
  • replicaManager 副本模塊。
  • kafkaController 控制器。
  • groupCoordinator 協調器用于和ConsumerCoordinator 交互
  • transactionCoordinator 事務相關
  • fetchManager  副本拉取管理器。
  • dynamicConfigManager 動態配置管理器。

2、Broker 狀態

這個是在 2.7.x 版本之前的狀態,在 2.8.x 之后版本進行了重構。

sealed trait BrokerStates { def state: Byte }
case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
  • NotRunning:初始狀態,標識當前 broker 節點未運行。
  • Starting:標識當前 broker 節點正在啟動中。
  • RecoveringFromUncleanShutdown:標識當前 broker 節點正在從上次非正常關閉中恢復。
  • RuningAsBroker:標識當前 broker 節點啟動成功,可以對外提供服務。
  • PendingControlledShutdown:標識當前 broker 節點正在等待 controlled shutdown 操作完成。
  • BrokerShuttingDown:標識當前 broker 節點正在執行 shutdown 操作。

這些就是 KafkaServer 中主要模塊的入口,接下來的文章會通過這些入口一一進行分析。

六、總結

這里,我們一起來總結一下這篇文章的重點。

  • 文章開頭通過對「kafka-server-start.sh」內容進行剖析,引出了 「kafka.Kafka」類。
  • 在「kafka.Kafka」的 main 方法中調用了「KafkaServerStartable」嘗試啟動 Kafka 服務器。
  • 接著在 「KafkaServerStartable」的 startup 方法中調用了 「KafkaServer」的 startup 方法啟動服務器需要的各種組件類。

下篇我們來深度剖析「Broker 啟動集群如何感知」,大家期待,我們下期見。

責任編輯:姜華 來源: 華仔聊技術
相關推薦

2019-09-20 08:54:38

KafkaBroker消息

2022-09-23 08:02:42

Kafka消息緩存

2023-12-26 08:16:56

Kafka緩存架構客戶端

2023-02-22 08:12:30

KafkaSender 線程

2022-05-08 17:53:38

Nacos服務端客戶端

2021-09-06 09:46:26

Dubbo 服務端開發

2021-06-11 06:54:34

Dubbo客戶端服務端

2017-03-03 09:10:41

2016-03-18 09:04:42

swift服務端

2023-03-15 08:17:27

Kafka網絡通信組件

2015-10-12 08:33:06

TCP網絡協議服務端

2021-04-16 08:54:03

CMS系統redisnode服務器

2021-08-10 20:41:33

AndroidApp流程

2022-03-06 12:15:38

NettyReactor線程

2012-03-02 10:38:33

MySQL

2013-03-25 10:08:44

PHPWeb

2019-09-23 10:47:52

Kafka架構微服務

2022-08-22 08:45:57

Kafka網絡層源碼實現

2021-06-30 06:59:47

Zabbix Server服務端MySQL

2010-08-03 09:59:30

NFS服務
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲视频一区二区三区 | 国产精品成人久久久久a级 久久蜜桃av一区二区天堂 | 九九色综合 | 亚洲欧美在线免费观看 | 午夜性视频 | 狠狠躁天天躁夜夜躁婷婷老牛影视 | 国产乱码精品一区二区三区中文 | 日韩伦理电影免费在线观看 | 成年人网站免费 | 久久一区精品 | 国产免费拔擦拔擦8x高清 | 日韩中文字幕在线 | 国产在线精品一区二区 | 狠狠视频 | 91av入口 | 国产精品久久精品 | 亚洲性视频网站 | 免费性视频 | 国产欧美性成人精品午夜 | 成人毛片网 | 久久久久久一区 | 黄色毛片网站在线观看 | 久久99视频这里只有精品 | 特黄色毛片 | 黄网站免费在线观看 | 国产精品一区二区三区免费观看 | 亚洲精品久久久9婷婷中文字幕 | 国产成人区 | 久久精品国产一区老色匹 | 久久国产免费看 | 色婷婷在线视频 | 在线播放精品视频 | 2019中文字幕视频 | 亚洲一区在线播放 | 久久久久成人精品免费播放动漫 | 岛国av在线免费观看 | 一区二区三区小视频 | 色女人天堂 | 天天av网 | 1204国产成人精品视频 | 国产高清视频 |