当前位置:首页 >娱乐 >图解 Kafka 源码之服务端启动流程 并将其赋值给 base_dir 变量

图解 Kafka 源码之服务端启动流程 并将其赋值给 base_dir 变量

2024-06-29 07:01:47 [百科] 来源:避面尹邢网

图解 Kafka 源码之服务端启动流程

作者:王江华 云计算 Kafka 从今天开始,图解我们来深度剖析 Kafka「Controller」的服务底层源码实现,这是端启动流 Controller 系列第一篇,我们先回过头来继续来深度聊聊「Kafka 服务端启动的图解流程」,看看 Kafka 服务端是服务如何启动的。

前面「八篇」文章通过「场景驱动方式」带你深度剖析了 Kafka「日志系统」源码架构设计的端启动流方方面面,从今天开始,图解我们来深度剖析 Kafka「Controller」的服务底层源码实现,这是端启动流 Controller 系列第一篇,我们先回过头来继续来深度聊聊「Kafka  服务端启动的图解流程」,看看 Kafka 服务端是服务如何启动的。

图解 Kafka 源码之服务端启动流程 并将其赋值给 base_dir 变量

图解 Kafka 源码之服务端启动流程 并将其赋值给 base_dir 变量

一、端启动流总体概述

在深入剖析Kafka「Controller」之前,图解我想你可能或多或少会有这样的服务疑问:

图解 Kafka 源码之服务端启动流程 并将其赋值给 base_dir 变量

Kafka  服务端都有哪些组件,这些组件又是端启动流通过哪个类来启动的呢?

这里我们通过启动 Kafka 来了解,大家都知道,启动 Kafka 可以执行以下命令来启动:

# 1、启动 kafka 服务命令:bin/kafka-server-start.sh config/server.properties &

那么今天就来看看通过这个脚本 KafkaServer 初始化了哪些组件。

二、kafka-server-start.sh

我们来看下里面的 shell 内容,如下:

#!/bin/bash# Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements.  See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License.  You may obtain a copy of the License at##    http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.# 1、注释说明该脚本的版权信息和使用许可。if [ $# -lt 1 ];then        echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"        exit 1fi# 2、检查命令行参数的个数,若小于 1 则输出脚本的使用方法并退出。base_dir=$(dirname $0)# 3、获取当前脚本所在目录的路径,并将其赋值给 base_dir 变量。if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then    export KAFKA_LOG4J_OPTS="-Dlog4j.cnotallow=file:$base_dir/../config/log4j.properties"fi# 4、检查 KAFKA_LOG4J_OPTS 环境变量是否设置,若未设置则设置该变量的值。if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"    export JMX_PORT="9999"    export JMX_RMI_PORT="10000"fi# 5、检查 KAFKA_HEAP_OPTS 环境变量是否设置,若未设置则设置该变量的值,并设置 JMX_PORT 和 JMX_RMI_PORT 环境变量的值,将 EXTRA_ARGS 变量的值设置为字符串 -name kafkaServer -loggc。EXTRA_ARGS=${ EXTRA_ARGS-'-name kafkaServer -loggc'}# 6、检查命令行参数中 COMMAND 变量的值是否为 -daemon,若是则将 EXTRA_ARGS 变量的值添加 -daemon 选项。同时将命令行参数向左移一位,即从 $2 开始计算参数。COMMAND=$1case $COMMAND in  -daemon)    EXTRA_ARGS="-daemon "$EXTRA_ARGS    shift    ;;  *)    ;;esac# 7、调用 $base_dir/kafka-run-class.sh 脚本并传递相应的参数。其中 "@ 代表传递的为命令行参数。具体执行的封装在 Kafka 客户端库中的 kafka.Kafka 类。整个脚本的作用是启动 Kafka 服务。exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"esac# 7、调用 $base_dir/kafka-run-class.sh 脚本并传递相应的参数。其中 "@ 代表传递的为命令行参数。具体执行的封装在 Kafka 客户端库中的 kafka.Kafka 类。整个脚本的作用是启动 Kafka 服务。exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

这里我们重点来看 「第 7 步」,它底层执行的是封装在 Kafka 客户端库中的 kafka.Kafka 类。接下来我们来看下该类都做了什么。

三、kafka.Kafka 类

「Kafka.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:

https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/Kafka.scala。

从整体上来看,该类就 3 个方法,相对比较简单,我能来看下里面的重点。

这里我们通过「2.8.x」版本来讲解,「2.7.x」还未增加 KafkaRaftServer 类。

1、getPropsFromArgs

def getPropsFromArgs(args: Array[String]): Properties = {   // 创建一个命令行参数解析器  val optionParser = new OptionParser(false)  // 定义 --override 选项,用于覆盖 server.properties 文件中的属性  val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")    .withRequiredArg()    .ofType(classOf[String])     // 定义 --version 选项,用于打印版本信息并退出  optionParser.accepts("version", "Print version information and exit.")  // 若没有提供参数或者参数包含 --help 选项,则打印用法并退出  if (args.length == 0 || args.contains("--help")) {     CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName()))  }  // 若参数中包含 --version 选项,则打印版本信息并退出  if (args.contains("--version")) {     CommandLineUtils.printVersionAndDie()  }  // 加载 server.properties 文件中的属性到 Properties 对象中  val props = Utils.loadProps(args(0))  // 若提供了其他参数,则解析这些参数  if (args.length > 1) {     // 解析参数中的选项和参数值    val options = optionParser.parse(args.slice(1, args.length): _*)    // 检查是否有非选项参数    if (options.nonOptionArguments().size() > 0) {       CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))    }    // 将解析得到的选项和参数值添加到 props 对象中    props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)  }  // 返回解析得到的属性集合  props}

该函数的作用是从命令行参数中解析出属性集合。它内部使用了 OptionParser 类库来解析命令行选项,并从 server.properties 文件中加载属性。

如果提供了 override 选项,则它将覆盖 server.properties 文件中的相应属性。函数返回一个 Properties 对象,其中包含了解析得到的属性。

如果没有提供正确的命令行参数或者提供了 --help 或 --version 选项,函数会打印帮助信息或版本信息并退出。

2、buildServer

private def buildServer(props: Properties): Server = {     val config = KafkaConfig.fromProps(props, false)    // 直接启动定时任务、网络层、请求处理层    if (config.requiresZookeeper) {       new KafkaServer(        config,        Time.SYSTEM,        threadNamePrefix = None,        enableForwarding = false      )    } else {       // 调用 BrokerServer 等来启动网络层和请求处理层      new KafkaRaftServer(        config,        Time.SYSTEM,        threadNamePrefix = None      )    }}

在 kafka 2.8.x 版本中 新增了 raft 协议之后将 BrokerServer、ControllServer 使用了单独的文件来启动最终调用网络层和请求处理层,如果还是使用 zk 的方式启动则是 KafkaServer 启动网络层和请求处理层。

3、main

# 2.7.x 版本源码def main(args: Array[String]): Unit = {   try {     // 1、解析命令行参数,获得属性集合    val serverProps = getPropsFromArgs(args)    // 2、从属性集合创建 KafkaServerStartable 对象    val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)    try {       // 如果不是 Windows 操作系统,并且不是 IBM JDK,则注册 LoggingSignalHandler      if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)        new LoggingSignalHandler().register()    } catch {       // 如果注册 LoggingSignalHandler 失败,则在日志中打印警告信息      case e: ReflectiveOperationException =>        warn("Failed to register optional signal handler that logs a message when the process is terminated " +          s"by a signal. Reason for registration failure is: $e", e)    }    // 3、添加 shutdown hook,用于在程序结束时执行 KafkaServerStartable 的 shutdown 方法    Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown())    // 4、启动 KafkaServerStartable 实例    kafkaServerStartable.startup()    // 5、等待 KafkaServerStartable 实例终止    kafkaServerStartable.awaitShutdown()  }  catch {     // 如果有异常发生,则记录日志并退出程序    case e: Throwable =>      fatal("Exiting Kafka due to fatal exception", e)      Exit.exit(1)  }  // 6、正常终止程序  Exit.exit(0)}

该函数是 Kafka 服务进程的入口,它是整个 Kafka 运行过程的驱动程序。该函数首先通过调用 getPropsFromArgs 函数解析命令行参数并获得属性集合,然后使用这些属性创建 KafkaServerStartable 实例。接着,它注册一个 shutdown hook,用于在程序终止时执行 KafkaServerStartable 的 shutdown 方法。然后它启动 KafkaServerStartable 实例,并等待该实例终止。如果发生异常,则记录日志并退出程序。函数最后调用 Exit.exit 方法退出程序,返回 0 表示正常终止。

# 2.8.x 版本def main(args: Array[String]): Unit = {     // 获取Kafka服务的配置信息    val serverProps = getPropsFromArgs(args)    // 根据配置信息构建Kafka服务    val server = buildServer(serverProps)    try {       // 注册用于记录日志的信号处理器(若实现失败则退出)      if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)        new LoggingSignalHandler().register()    } catch {       case e: ReflectiveOperationException =>        warn("Failed to register optional signal handler that logs a message when the process is terminated " +          s"by a signal. Reason for registration failure is: $e", e)    }    // 挂载关闭处理器,用于捕获终止信号和常规终止请求    Exit.addShutdownHook("kafka-shutdown-hook", {       try server.shutdown() // 关闭Kafka服务      catch {         case _: Throwable =>          fatal("Halting Kafka.") // 日志记录致命错误信息          // 调用Exit.halt()强制退出,避免重复调用Exit.exit()引发死锁          Exit.halt(1)      }    })    try server.startup() // 启动Kafka服务    catch {       case _: Throwable =>        // 调用Exit.exit()设置退出状态码,KafkaServer.startup()会在抛出异常时调用shutdown()        fatal("Exiting Kafka.")        Exit.exit(1)    }    server.awaitShutdown() // 等待Kafka服务关闭    Exit.exit(0) // 调用Exit.exit()设置退出状态码}

这里最重要的是 「第 4 步」,调用 kafkaServerStartable.startup() 或者 server.startup() 来启动 kafka。

这里我们还是以「ZK 模式」的方式来启动,后面抽空再进行对 「Raft 模式」启动进行补充。

四、KafkaServerStartable

「KafkaServerStartable.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:

https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServerStartable.scala。

在 Scala 语言里,在一个源代码文件中同时定义相同名字的 class 和 object 的用法被称为伴生(Companion)。Class 对象被称为伴生类,它和 Java 中的类是一样的;而 Object 对象是一个单例对象,用于保存一些静态变量或静态方法。

这里我们主要来看下 Class 类代码。

class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter], threadNamePrefix: Option[String] = None) extends Logging {   // 创建 KafkaServer 实例  // 构造函数有两个参数 —— staticServerConfig 表示静态服务器配置,reporters 表示 Kafka 指标报告器。如果 threadNamePrefix 参数未用于构造函数,则默认值为 None。threadNamePrefix 参数表示线程名称前缀,用于调试和维护目的。  private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters, threadNamePrefix = threadNamePrefix)  def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)  // 启动 KafkaServer  // startup 方法尝试启动 Kafka 服务器。如果启动 Kafka 服务器时发生异常,则记录一条 fatal 错误日志并退出程序。对于成功启动的 Kafka 服务器,它将开始监听客户端连接,并在收到消息时执行所需的操作。  def startup(): Unit = {     try server.startup()    catch {       // 如果出现异常,则记录日志并退出程序      case _: Throwable =>        // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code        fatal("Exiting Kafka.")        Exit.exit(1)    }  }  // 关闭 KafkaServer  // shutdown 方法尝试停止 Kafka 服务器。如果在停止服务器时出现异常,则记录一条 fatal 错误日志并强制退出程序。调用 shutdown 方法后,服务器将不再接受新的请求,并开始等待当前进行中的请求完成。当所有处理中的请求都完成后,服务器将彻底停止。  def shutdown(): Unit = {     try server.shutdown()    catch {       // 如果出现异常,则记录日志并强制退出程序      case _: Throwable =>        fatal("Halting Kafka.")        // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.        Exit.halt(1)    }  }  // setServerState 方法允许从 KafkaServerStartable 对象中设置 broker 状态。如果自定义 KafkaServerStartable 对象想要引入新的状态,则此方法很有用。  def setServerState(newState: Byte): Unit = {     server.brokerState.newState(newState)  }  // 等待 KafkaServer 退出  // awaitShutdown 方法等待 Kafka 服务器完全退出。在 Kafka 服务器执行 shutdown 方法后,它将不再接受新的请求。但是,服务器可能仍在处理一些已经接收的请求。awaitShutdown 方法将阻塞当前线程,直到服务器彻底停止。  def awaitShutdown(): Unit = server.awaitShutdown()}

KafkaServerStartable 类是一个可启动和停止的 Kafka 服务器。类中的 server 成员变量是 KafkaServer 类的实例,它将在 KafkaServerStartable 类对象启动时创建。该类提供了启动和停止 Kafka 服务器的方法,以及设置 broker 状态和等待 Kafka 服务器退出的方法。

跟本文有关系的是 「启动」方法,它调用了 KafkaServer#startup 方法进行启动。

五、KafkaServer 类

Kafka 集群由多个 Broker 节点构成,每个节点上都运行着一个 Kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。

「KafkaServer.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:

https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServer.scala。

KafkaServer 为 Kafka 的启动类,其中包含了 Kafka 的所有组件,如 KafkaController、groupCoordinator、replicaManager 等。

class KafkaServer(val config: KafkaConfig, //配置信息time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None,                  kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() //监控上报                  ) extends Logging with KafkaMetricsGroup {   //标识节点已经启动完成  private val startupComplete = new AtomicBoolean(false)  //标识节点正在执行关闭操作  private val isShuttingDown = new AtomicBoolean(false)  //标识节点正在执行启动操作  private val isStartingUp = new AtomicBoolean(false)  //阻塞主线程等待 KafkaServer 的关闭  private var shutdownLatch = new CountDownLatch(1)  //日志上下文  private var logContext: LogContext = null  var metrics: Metrics = null  //记录节点的当前状态  val brokerState: BrokerState = new BrokerState  //API接口类,用于处理数据类请求  var dataPlaneRequestProcessor: KafkaApis = null  //API接口,用于处理控制类请求  var controlPlaneRequestProcessor: KafkaApis = null  //权限管理  var authorizer: Option[Authorizer] = None  //启动socket,监听9092端口,等待接收客户端请求   var socketServer: SocketServer = null  //数据类请求处理线程池  var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null  //命令类处理线程池  var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null  //日志管理器      var logDirFailureChannel: LogDirFailureChannel = null  var logManager: LogManager = null  //副本管理器  var replicaManager: ReplicaManager = null  //topic增删管理器  var adminManager: AdminManager = null  //token管理器  var tokenManager: DelegationTokenManager = null  //动态配置管理器  var dynamicConfigHandlers: Map[String, ConfigHandler] = null  var dynamicConfigManager: DynamicConfigManager = null  var credentialProvider: CredentialProvider = null  var tokenCache: DelegationTokenCache = null  //分组协调器  var groupCoordinator: GroupCoordinator = null  //事务协调器  var transactionCoordinator: TransactionCoordinator = null  //集群控制器  var kafkaController: KafkaController = null  //定时任务调度器  var kafkaScheduler: KafkaScheduler = null  //集群分区状态信息缓存  var metadataCache: MetadataCache = null  //配额管理器  var quotaManagers: QuotaFactory.QuotaManagers = null  //zk客户端配置  val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())  private var _zkClient: KafkaZkClient = null  val correlationId: AtomicInteger = new AtomicInteger(0)  val brokerMetaPropsFile = "meta.properties"  val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap  private var _clusterId: String = null  private var _brokerTopicStats: BrokerTopicStats = null  def clusterId: String = _clusterId  // Visible for testing  private[kafka] def zkClient = _zkClient  private[kafka] def brokerTopicStats = _brokerTopicStats  ....}

1、startup

该类方法很多,我们这里只看 startup 启动方法,来看看其内部都启动了哪些组件,来解决本文开头提出的问题。

/**   * Start up API for bringing up a single instance of the Kafka server.   * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers   */  def startup(): Unit = {     try {       info("starting")      // 是否已关闭      if (isShuttingDown.get)        throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")      // 是否已启动      if (startupComplete.get)        return      // 是否可以启动      val canStartup = isStartingUp.compareAndSet(false, true)      if (canStartup) {  // 设置broker状态为Starting        brokerState.newState(Starting)        /* setup zookeeper */        // 连接ZK,并创建根节点        initZkClient(time)        /* initialize features */        _featureChangeListener = new FinalizedFeatureChangeListener(featureCache, _zkClient)        if (config.isFeatureVersioningSupported) {           _featureChangeListener.initOrThrow(config.zkConnectionTimeoutMs)        }        /* Get or create cluster_id */        // 从ZK获取或创建集群id,规则:UUID的mostSigBits、leastSigBits组合转base64        _clusterId = getOrGenerateClusterId(zkClient)        info(s"Cluster ID = $clusterId")        /* load metadata */        // 获取brokerId及log存储路径,brokerId通过zk生成或者server.properties配置broker.id        // 规则:/brokers/seqid的version值 + maxReservedBrokerId(默认1000),保证唯一性        val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) = getBrokerMetadataAndOfflineDirs        /* check cluster id */        if (preloadedBrokerMetadataCheckpoint.clusterId.isDefined && preloadedBrokerMetadataCheckpoint.clusterId.get != clusterId)          throw new InconsistentClusterIdException(            s"The Cluster ID ${ clusterId} doesn't match stored clusterId ${ preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " +            s"The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.")        /* generate brokerId */        config.brokerId = getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint)        logContext = new LogContext(s"[KafkaServer id=${ config.brokerId}] ")        // 配置logger        this.logIdent = logContext.logPrefix        // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be        // applied after DynamicConfigManager starts.        // 初始化AdminZkClient,支持动态修改配置         config.dynamicConfig.initialize(zkClient)        /* start scheduler */        // 初始化定时任务调度器        kafkaScheduler = new KafkaScheduler(config.backgroundThreads)        kafkaScheduler.startup()        /* create and configure metrics */        // 创建及配置监控,默认使用JMX及Yammer Metrics        kafkaYammerMetrics = KafkaYammerMetrics.INSTANCE        kafkaYammerMetrics.configure(config.originals)        val jmxReporter = new JmxReporter()        jmxReporter.configure(config.originals)        val reporters = new util.ArrayList[MetricsReporter]        reporters.add(jmxReporter)        val metricConfig = KafkaServer.metricConfig(config)        val metricsContext = createKafkaMetricsContext()        metrics = new Metrics(metricConfig, reporters, time, true, metricsContext)        /* register broker metrics */        _brokerTopicStats = new BrokerTopicStats        // 初始化配额管理器        quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))        notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)        // 用于保证kafka-log数据目录的存在        logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)        /* start log manager */        // 启动日志管理器,kafka的消息以日志形式存储        logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)        // 启动日志清理、刷新、校验、恢复等的定时线程        logManager.startup()        metadataCache = new MetadataCache(config.brokerId)        // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.        // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.        // SCRAM认证方式的token缓存        tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)        credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)        // Create and start the socket server acceptor threads so that the bound port is known.        // Delay starting processors until the end of the initialization sequence to ensure        // that credentials have been loaded before processing authentications.        // 启动socket,监听9092端口,等待接收客户端请求         socketServer = new SocketServer(config, metrics, time, credentialProvider)        socketServer.startup(startProcessingRequests = false)        /* start replica manager */        brokerToControllerChannelManager = new BrokerToControllerChannelManagerImpl(metadataCache, time, metrics, config, threadNamePrefix)        // 启动副本管理器,高可用相关        replicaManager = createReplicaManager(isShuttingDown)        replicaManager.startup()        brokerToControllerChannelManager.start()        // 将broker信息注册到ZK上        val brokerInfo = createBrokerInfo        val brokerEpoch = zkClient.registerBroker(brokerInfo)        // Now that the broker is successfully registered, checkpoint its metadata        // 校验 broker 信息        checkpointBrokerMetadata(BrokerMetadata(config.brokerId, Some(clusterId)))        /* start token manager */        // 启动 token 管理器        tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)        tokenManager.startup()        /* start kafka controller */        // 启动Kafka控制器,只有 Leader 会与ZK建连        kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix)        kafkaController.startup()        // admin管理器        adminManager = new AdminManager(config, metrics, metadataCache, zkClient)        /* start group coordinator */        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue        // 启动集群群组协调器        groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics)        groupCoordinator.startup()        /* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue        // 启动事务协调器        transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)        transactionCoordinator.startup()        /* Get the authorizer and initialize it if one is specified.*/        // ACL        authorizer = config.authorizer        authorizer.foreach(_.configure(config.originals))        val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {           case Some(authZ) =>            authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map {  case (ep, cs) =>              ep -> cs.toCompletableFuture            }          case None =>            brokerInfo.broker.endPoints.map {  ep =>              ep.toJava -> CompletableFuture.completedFuture[Void](null)            }.toMap        }        // 创建拉取管理器        val fetchManager = new FetchManager(Time.SYSTEM,          new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,            KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))        /* start processing requests */        // 初始化数据类请求的KafkaApis,负责数据类请求逻辑处理        dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)        // 初始化数据类请求处理的线程池          dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,          config.numIoThreads, s"${ SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)        socketServer.controlPlaneRequestChannelOpt.foreach {  controlPlaneRequestChannel =>          // 初始化控制类请求的 KafkaApis          controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,            kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,            fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)          // 初始化控制类请求的线程池          controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,            1, s"${ SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)        }        Mx4jLoader.maybeLoad()        /* Add all reconfigurables for config change notification before starting config handlers */        config.dynamicConfig.addReconfigurables(this)        /* start dynamic config manager */        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),                                                           ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),                                                           ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),                                                           ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))        // Create the config manager. start listening to notifications        // 启动动态配置处理器        dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)        dynamicConfigManager.startup()        // 启动请求处理线程        socketServer.startProcessingRequests(authorizerFutures)        // 更新broker状态        brokerState.newState(RunningAsBroker)        shutdownLatch = new CountDownLatch(1)        startupComplete.set(true)        isStartingUp.set(false)        AppInfoParser.registerAppInfo(metricsPrefix, config.brokerId.toString, metrics, time.milliseconds())        info("started")      }    }    catch {       case e: Throwable =>        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)        isStartingUp.set(false)        shutdown()        throw e    } }

这里总结下该方法都启动了哪些组件:

  • initZkClient(time) 初始化 Zk。
  • kafkaScheduler  定时器。
  • logManager 日志模块。
  • MetadataCache  元数据缓存。
  • socketServer 网络服务器。
  • replicaManager 副本模块。
  • kafkaController 控制器。
  • groupCoordinator 协调器用于和ConsumerCoordinator 交互
  • transactionCoordinator 事务相关
  • fetchManager  副本拉取管理器。
  • dynamicConfigManager 动态配置管理器。

2、Broker 状态

这个是在 2.7.x 版本之前的状态,在 2.8.x 之后版本进行了重构。

sealed trait BrokerStates {  def state: Byte }case object NotRunning extends BrokerStates {  val state: Byte = 0 }case object Starting extends BrokerStates {  val state: Byte = 1 }case object RecoveringFromUncleanShutdown extends BrokerStates {  val state: Byte = 2 }case object RunningAsBroker extends BrokerStates {  val state: Byte = 3 }case object PendingControlledShutdown extends BrokerStates {  val state: Byte = 6 }case object BrokerShuttingDown extends BrokerStates {  val state: Byte = 7 }
  • NotRunning:初始状态,标识当前 broker 节点未运行。
  • Starting:标识当前 broker 节点正在启动中。
  • RecoveringFromUncleanShutdown:标识当前 broker 节点正在从上次非正常关闭中恢复。
  • RuningAsBroker:标识当前 broker 节点启动成功,可以对外提供服务。
  • PendingControlledShutdown:标识当前 broker 节点正在等待 controlled shutdown 操作完成。
  • BrokerShuttingDown:标识当前 broker 节点正在执行 shutdown 操作。

这些就是 KafkaServer 中主要模块的入口,接下来的文章会通过这些入口一一进行分析。

六、总结

这里,我们一起来总结一下这篇文章的重点。

  • 文章开头通过对「kafka-server-start.sh」内容进行剖析,引出了 「kafka.Kafka」类。
  • 在「kafka.Kafka」的 main 方法中调用了「KafkaServerStartable」尝试启动 Kafka 服务器。
  • 接着在 「KafkaServerStartable」的 startup 方法中调用了 「KafkaServer」的 startup 方法启动服务器需要的各种组件类。

下篇我们来深度剖析「Broker 启动集群如何感知」,大家期待,我们下期见。

责任编辑:姜华 来源: 华仔聊技术 Kafka服务端

(责任编辑:知识)

    推荐文章
    热点阅读