前面「八篇」文章通过「场景驱动方式」带你深度剖析了 Kafka「日志系统」源码架构设计的端启动流方方面面,从今天开始,图解我们来深度剖析 Kafka「Controller」的服务底层源码实现,这是端启动流 Controller 系列第一篇,我们先回过头来继续来深度聊聊「Kafka 服务端启动的图解流程」,看看 Kafka 服务端是服务如何启动的。
在深入剖析Kafka「Controller」之前,图解我想你可能或多或少会有这样的服务疑问:
Kafka 服务端都有哪些组件,这些组件又是端启动流通过哪个类来启动的呢?
这里我们通过启动 Kafka 来了解,大家都知道,启动 Kafka 可以执行以下命令来启动:
# 1、启动 kafka 服务命令:bin/kafka-server-start.sh config/server.properties &
那么今天就来看看通过这个脚本 KafkaServer 初始化了哪些组件。
我们来看下里面的 shell 内容,如下:
#!/bin/bash# Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.# 1、注释说明该脚本的版权信息和使用许可。if [ $# -lt 1 ];then echo "USAGE: $0 [-daemon] server.properties [--override property=value]*" exit 1fi# 2、检查命令行参数的个数,若小于 1 则输出脚本的使用方法并退出。base_dir=$(dirname $0)# 3、获取当前脚本所在目录的路径,并将其赋值给 base_dir 变量。if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then export KAFKA_LOG4J_OPTS="-Dlog4j.cnotallow=file:$base_dir/../config/log4j.properties"fi# 4、检查 KAFKA_LOG4J_OPTS 环境变量是否设置,若未设置则设置该变量的值。if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" export JMX_PORT="9999" export JMX_RMI_PORT="10000"fi# 5、检查 KAFKA_HEAP_OPTS 环境变量是否设置,若未设置则设置该变量的值,并设置 JMX_PORT 和 JMX_RMI_PORT 环境变量的值,将 EXTRA_ARGS 变量的值设置为字符串 -name kafkaServer -loggc。EXTRA_ARGS=${ EXTRA_ARGS-'-name kafkaServer -loggc'}# 6、检查命令行参数中 COMMAND 变量的值是否为 -daemon,若是则将 EXTRA_ARGS 变量的值添加 -daemon 选项。同时将命令行参数向左移一位,即从 $2 开始计算参数。COMMAND=$1case $COMMAND in -daemon) EXTRA_ARGS="-daemon "$EXTRA_ARGS shift ;; *) ;;esac# 7、调用 $base_dir/kafka-run-class.sh 脚本并传递相应的参数。其中 "@ 代表传递的为命令行参数。具体执行的封装在 Kafka 客户端库中的 kafka.Kafka 类。整个脚本的作用是启动 Kafka 服务。exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"esac# 7、调用 $base_dir/kafka-run-class.sh 脚本并传递相应的参数。其中 "@ 代表传递的为命令行参数。具体执行的封装在 Kafka 客户端库中的 kafka.Kafka 类。整个脚本的作用是启动 Kafka 服务。exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
这里我们重点来看 「第 7 步」,它底层执行的是封装在 Kafka 客户端库中的 kafka.Kafka 类。接下来我们来看下该类都做了什么。
「Kafka.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:
https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/Kafka.scala。
从整体上来看,该类就 3 个方法,相对比较简单,我能来看下里面的重点。
这里我们通过「2.8.x」版本来讲解,「2.7.x」还未增加 KafkaRaftServer 类。
def getPropsFromArgs(args: Array[String]): Properties = { // 创建一个命令行参数解析器 val optionParser = new OptionParser(false) // 定义 --override 选项,用于覆盖 server.properties 文件中的属性 val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file") .withRequiredArg() .ofType(classOf[String]) // 定义 --version 选项,用于打印版本信息并退出 optionParser.accepts("version", "Print version information and exit.") // 若没有提供参数或者参数包含 --help 选项,则打印用法并退出 if (args.length == 0 || args.contains("--help")) { CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName())) } // 若参数中包含 --version 选项,则打印版本信息并退出 if (args.contains("--version")) { CommandLineUtils.printVersionAndDie() } // 加载 server.properties 文件中的属性到 Properties 对象中 val props = Utils.loadProps(args(0)) // 若提供了其他参数,则解析这些参数 if (args.length > 1) { // 解析参数中的选项和参数值 val options = optionParser.parse(args.slice(1, args.length): _*) // 检查是否有非选项参数 if (options.nonOptionArguments().size() > 0) { CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(",")) } // 将解析得到的选项和参数值添加到 props 对象中 props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala) } // 返回解析得到的属性集合 props}
该函数的作用是从命令行参数中解析出属性集合。它内部使用了 OptionParser 类库来解析命令行选项,并从 server.properties 文件中加载属性。
如果提供了 override 选项,则它将覆盖 server.properties 文件中的相应属性。函数返回一个 Properties 对象,其中包含了解析得到的属性。
如果没有提供正确的命令行参数或者提供了 --help 或 --version 选项,函数会打印帮助信息或版本信息并退出。
private def buildServer(props: Properties): Server = { val config = KafkaConfig.fromProps(props, false) // 直接启动定时任务、网络层、请求处理层 if (config.requiresZookeeper) { new KafkaServer( config, Time.SYSTEM, threadNamePrefix = None, enableForwarding = false ) } else { // 调用 BrokerServer 等来启动网络层和请求处理层 new KafkaRaftServer( config, Time.SYSTEM, threadNamePrefix = None ) }}
在 kafka 2.8.x 版本中 新增了 raft 协议之后将 BrokerServer、ControllServer 使用了单独的文件来启动最终调用网络层和请求处理层,如果还是使用 zk 的方式启动则是 KafkaServer 启动网络层和请求处理层。
# 2.7.x 版本源码def main(args: Array[String]): Unit = { try { // 1、解析命令行参数,获得属性集合 val serverProps = getPropsFromArgs(args) // 2、从属性集合创建 KafkaServerStartable 对象 val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps) try { // 如果不是 Windows 操作系统,并且不是 IBM JDK,则注册 LoggingSignalHandler if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk) new LoggingSignalHandler().register() } catch { // 如果注册 LoggingSignalHandler 失败,则在日志中打印警告信息 case e: ReflectiveOperationException => warn("Failed to register optional signal handler that logs a message when the process is terminated " + s"by a signal. Reason for registration failure is: $e", e) } // 3、添加 shutdown hook,用于在程序结束时执行 KafkaServerStartable 的 shutdown 方法 Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown()) // 4、启动 KafkaServerStartable 实例 kafkaServerStartable.startup() // 5、等待 KafkaServerStartable 实例终止 kafkaServerStartable.awaitShutdown() } catch { // 如果有异常发生,则记录日志并退出程序 case e: Throwable => fatal("Exiting Kafka due to fatal exception", e) Exit.exit(1) } // 6、正常终止程序 Exit.exit(0)}
该函数是 Kafka 服务进程的入口,它是整个 Kafka 运行过程的驱动程序。该函数首先通过调用 getPropsFromArgs 函数解析命令行参数并获得属性集合,然后使用这些属性创建 KafkaServerStartable 实例。接着,它注册一个 shutdown hook,用于在程序终止时执行 KafkaServerStartable 的 shutdown 方法。然后它启动 KafkaServerStartable 实例,并等待该实例终止。如果发生异常,则记录日志并退出程序。函数最后调用 Exit.exit 方法退出程序,返回 0 表示正常终止。
# 2.8.x 版本def main(args: Array[String]): Unit = { // 获取Kafka服务的配置信息 val serverProps = getPropsFromArgs(args) // 根据配置信息构建Kafka服务 val server = buildServer(serverProps) try { // 注册用于记录日志的信号处理器(若实现失败则退出) if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk) new LoggingSignalHandler().register() } catch { case e: ReflectiveOperationException => warn("Failed to register optional signal handler that logs a message when the process is terminated " + s"by a signal. Reason for registration failure is: $e", e) } // 挂载关闭处理器,用于捕获终止信号和常规终止请求 Exit.addShutdownHook("kafka-shutdown-hook", { try server.shutdown() // 关闭Kafka服务 catch { case _: Throwable => fatal("Halting Kafka.") // 日志记录致命错误信息 // 调用Exit.halt()强制退出,避免重复调用Exit.exit()引发死锁 Exit.halt(1) } }) try server.startup() // 启动Kafka服务 catch { case _: Throwable => // 调用Exit.exit()设置退出状态码,KafkaServer.startup()会在抛出异常时调用shutdown() fatal("Exiting Kafka.") Exit.exit(1) } server.awaitShutdown() // 等待Kafka服务关闭 Exit.exit(0) // 调用Exit.exit()设置退出状态码}
这里最重要的是 「第 4 步」,调用 kafkaServerStartable.startup() 或者 server.startup() 来启动 kafka。
这里我们还是以「ZK 模式」的方式来启动,后面抽空再进行对 「Raft 模式」启动进行补充。
「KafkaServerStartable.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:
https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServerStartable.scala。
在 Scala 语言里,在一个源代码文件中同时定义相同名字的 class 和 object 的用法被称为伴生(Companion)。Class 对象被称为伴生类,它和 Java 中的类是一样的;而 Object 对象是一个单例对象,用于保存一些静态变量或静态方法。
这里我们主要来看下 Class 类代码。
class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter], threadNamePrefix: Option[String] = None) extends Logging { // 创建 KafkaServer 实例 // 构造函数有两个参数 —— staticServerConfig 表示静态服务器配置,reporters 表示 Kafka 指标报告器。如果 threadNamePrefix 参数未用于构造函数,则默认值为 None。threadNamePrefix 参数表示线程名称前缀,用于调试和维护目的。 private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters, threadNamePrefix = threadNamePrefix) def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty) // 启动 KafkaServer // startup 方法尝试启动 Kafka 服务器。如果启动 Kafka 服务器时发生异常,则记录一条 fatal 错误日志并退出程序。对于成功启动的 Kafka 服务器,它将开始监听客户端连接,并在收到消息时执行所需的操作。 def startup(): Unit = { try server.startup() catch { // 如果出现异常,则记录日志并退出程序 case _: Throwable => // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code fatal("Exiting Kafka.") Exit.exit(1) } } // 关闭 KafkaServer // shutdown 方法尝试停止 Kafka 服务器。如果在停止服务器时出现异常,则记录一条 fatal 错误日志并强制退出程序。调用 shutdown 方法后,服务器将不再接受新的请求,并开始等待当前进行中的请求完成。当所有处理中的请求都完成后,服务器将彻底停止。 def shutdown(): Unit = { try server.shutdown() catch { // 如果出现异常,则记录日志并强制退出程序 case _: Throwable => fatal("Halting Kafka.") // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit. Exit.halt(1) } } // setServerState 方法允许从 KafkaServerStartable 对象中设置 broker 状态。如果自定义 KafkaServerStartable 对象想要引入新的状态,则此方法很有用。 def setServerState(newState: Byte): Unit = { server.brokerState.newState(newState) } // 等待 KafkaServer 退出 // awaitShutdown 方法等待 Kafka 服务器完全退出。在 Kafka 服务器执行 shutdown 方法后,它将不再接受新的请求。但是,服务器可能仍在处理一些已经接收的请求。awaitShutdown 方法将阻塞当前线程,直到服务器彻底停止。 def awaitShutdown(): Unit = server.awaitShutdown()}
KafkaServerStartable 类是一个可启动和停止的 Kafka 服务器。类中的 server 成员变量是 KafkaServer 类的实例,它将在 KafkaServerStartable 类对象启动时创建。该类提供了启动和停止 Kafka 服务器的方法,以及设置 broker 状态和等待 Kafka 服务器退出的方法。
跟本文有关系的是 「启动」方法,它调用了 KafkaServer#startup 方法进行启动。
Kafka 集群由多个 Broker 节点构成,每个节点上都运行着一个 Kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。
「KafkaServer.scala」类源码在 Kafka 源码包的 core 包下,具体的 github 源码位置如下:
https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServer.scala。
KafkaServer 为 Kafka 的启动类,其中包含了 Kafka 的所有组件,如 KafkaController、groupCoordinator、replicaManager 等。
class KafkaServer(val config: KafkaConfig, //配置信息time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None, kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() //监控上报 ) extends Logging with KafkaMetricsGroup { //标识节点已经启动完成 private val startupComplete = new AtomicBoolean(false) //标识节点正在执行关闭操作 private val isShuttingDown = new AtomicBoolean(false) //标识节点正在执行启动操作 private val isStartingUp = new AtomicBoolean(false) //阻塞主线程等待 KafkaServer 的关闭 private var shutdownLatch = new CountDownLatch(1) //日志上下文 private var logContext: LogContext = null var metrics: Metrics = null //记录节点的当前状态 val brokerState: BrokerState = new BrokerState //API接口类,用于处理数据类请求 var dataPlaneRequestProcessor: KafkaApis = null //API接口,用于处理控制类请求 var controlPlaneRequestProcessor: KafkaApis = null //权限管理 var authorizer: Option[Authorizer] = None //启动socket,监听9092端口,等待接收客户端请求 var socketServer: SocketServer = null //数据类请求处理线程池 var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null //命令类处理线程池 var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null //日志管理器 var logDirFailureChannel: LogDirFailureChannel = null var logManager: LogManager = null //副本管理器 var replicaManager: ReplicaManager = null //topic增删管理器 var adminManager: AdminManager = null //token管理器 var tokenManager: DelegationTokenManager = null //动态配置管理器 var dynamicConfigHandlers: Map[String, ConfigHandler] = null var dynamicConfigManager: DynamicConfigManager = null var credentialProvider: CredentialProvider = null var tokenCache: DelegationTokenCache = null //分组协调器 var groupCoordinator: GroupCoordinator = null //事务协调器 var transactionCoordinator: TransactionCoordinator = null //集群控制器 var kafkaController: KafkaController = null //定时任务调度器 var kafkaScheduler: KafkaScheduler = null //集群分区状态信息缓存 var metadataCache: MetadataCache = null //配额管理器 var quotaManagers: QuotaFactory.QuotaManagers = null //zk客户端配置 val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig()) private var _zkClient: KafkaZkClient = null val correlationId: AtomicInteger = new AtomicInteger(0) val brokerMetaPropsFile = "meta.properties" val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap private var _clusterId: String = null private var _brokerTopicStats: BrokerTopicStats = null def clusterId: String = _clusterId // Visible for testing private[kafka] def zkClient = _zkClient private[kafka] def brokerTopicStats = _brokerTopicStats ....}
该类方法很多,我们这里只看 startup 启动方法,来看看其内部都启动了哪些组件,来解决本文开头提出的问题。
/** * Start up API for bringing up a single instance of the Kafka server. * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers */ def startup(): Unit = { try { info("starting") // 是否已关闭 if (isShuttingDown.get) throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!") // 是否已启动 if (startupComplete.get) return // 是否可以启动 val canStartup = isStartingUp.compareAndSet(false, true) if (canStartup) { // 设置broker状态为Starting brokerState.newState(Starting) /* setup zookeeper */ // 连接ZK,并创建根节点 initZkClient(time) /* initialize features */ _featureChangeListener = new FinalizedFeatureChangeListener(featureCache, _zkClient) if (config.isFeatureVersioningSupported) { _featureChangeListener.initOrThrow(config.zkConnectionTimeoutMs) } /* Get or create cluster_id */ // 从ZK获取或创建集群id,规则:UUID的mostSigBits、leastSigBits组合转base64 _clusterId = getOrGenerateClusterId(zkClient) info(s"Cluster ID = $clusterId") /* load metadata */ // 获取brokerId及log存储路径,brokerId通过zk生成或者server.properties配置broker.id // 规则:/brokers/seqid的version值 + maxReservedBrokerId(默认1000),保证唯一性 val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) = getBrokerMetadataAndOfflineDirs /* check cluster id */ if (preloadedBrokerMetadataCheckpoint.clusterId.isDefined && preloadedBrokerMetadataCheckpoint.clusterId.get != clusterId) throw new InconsistentClusterIdException( s"The Cluster ID ${ clusterId} doesn't match stored clusterId ${ preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " + s"The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.") /* generate brokerId */ config.brokerId = getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint) logContext = new LogContext(s"[KafkaServer id=${ config.brokerId}] ") // 配置logger this.logIdent = logContext.logPrefix // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be // applied after DynamicConfigManager starts. // 初始化AdminZkClient,支持动态修改配置 config.dynamicConfig.initialize(zkClient) /* start scheduler */ // 初始化定时任务调度器 kafkaScheduler = new KafkaScheduler(config.backgroundThreads) kafkaScheduler.startup() /* create and configure metrics */ // 创建及配置监控,默认使用JMX及Yammer Metrics kafkaYammerMetrics = KafkaYammerMetrics.INSTANCE kafkaYammerMetrics.configure(config.originals) val jmxReporter = new JmxReporter() jmxReporter.configure(config.originals) val reporters = new util.ArrayList[MetricsReporter] reporters.add(jmxReporter) val metricConfig = KafkaServer.metricConfig(config) val metricsContext = createKafkaMetricsContext() metrics = new Metrics(metricConfig, reporters, time, true, metricsContext) /* register broker metrics */ _brokerTopicStats = new BrokerTopicStats // 初始化配额管理器 quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse("")) notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala) // 用于保证kafka-log数据目录的存在 logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) /* start log manager */ // 启动日志管理器,kafka的消息以日志形式存储 logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel) // 启动日志清理、刷新、校验、恢复等的定时线程 logManager.startup() metadataCache = new MetadataCache(config.brokerId) // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update. // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically. // SCRAM认证方式的token缓存 tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) // Create and start the socket server acceptor threads so that the bound port is known. // Delay starting processors until the end of the initialization sequence to ensure // that credentials have been loaded before processing authentications. // 启动socket,监听9092端口,等待接收客户端请求 socketServer = new SocketServer(config, metrics, time, credentialProvider) socketServer.startup(startProcessingRequests = false) /* start replica manager */ brokerToControllerChannelManager = new BrokerToControllerChannelManagerImpl(metadataCache, time, metrics, config, threadNamePrefix) // 启动副本管理器,高可用相关 replicaManager = createReplicaManager(isShuttingDown) replicaManager.startup() brokerToControllerChannelManager.start() // 将broker信息注册到ZK上 val brokerInfo = createBrokerInfo val brokerEpoch = zkClient.registerBroker(brokerInfo) // Now that the broker is successfully registered, checkpoint its metadata // 校验 broker 信息 checkpointBrokerMetadata(BrokerMetadata(config.brokerId, Some(clusterId))) /* start token manager */ // 启动 token 管理器 tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient) tokenManager.startup() /* start kafka controller */ // 启动Kafka控制器,只有 Leader 会与ZK建连 kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix) kafkaController.startup() // admin管理器 adminManager = new AdminManager(config, metrics, metadataCache, zkClient) /* start group coordinator */ // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue // 启动集群群组协调器 groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics) groupCoordinator.startup() /* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */ // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue // 启动事务协调器 transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM) transactionCoordinator.startup() /* Get the authorizer and initialize it if one is specified.*/ // ACL authorizer = config.authorizer authorizer.foreach(_.configure(config.originals)) val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match { case Some(authZ) => authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) => ep -> cs.toCompletableFuture } case None => brokerInfo.broker.endPoints.map { ep => ep.toJava -> CompletableFuture.completedFuture[Void](null) }.toMap } // 创建拉取管理器 val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) /* start processing requests */ // 初始化数据类请求的KafkaApis,负责数据类请求逻辑处理 dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache) // 初始化数据类请求处理的线程池 dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, s"${ SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix) socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel => // 初始化控制类请求的 KafkaApis controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator, kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache) // 初始化控制类请求的线程池 controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time, 1, s"${ SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix) } Mx4jLoader.maybeLoad() /* Add all reconfigurables for config change notification before starting config handlers */ config.dynamicConfig.addReconfigurables(this) /* start dynamic config manager */ dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController), ConfigType.Client -> new ClientIdConfigHandler(quotaManagers), ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider), ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers)) // Create the config manager. start listening to notifications // 启动动态配置处理器 dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers) dynamicConfigManager.startup() // 启动请求处理线程 socketServer.startProcessingRequests(authorizerFutures) // 更新broker状态 brokerState.newState(RunningAsBroker) shutdownLatch = new CountDownLatch(1) startupComplete.set(true) isStartingUp.set(false) AppInfoParser.registerAppInfo(metricsPrefix, config.brokerId.toString, metrics, time.milliseconds()) info("started") } } catch { case e: Throwable => fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e) isStartingUp.set(false) shutdown() throw e } }
这里总结下该方法都启动了哪些组件:
这个是在 2.7.x 版本之前的状态,在 2.8.x 之后版本进行了重构。
sealed trait BrokerStates { def state: Byte }case object NotRunning extends BrokerStates { val state: Byte = 0 }case object Starting extends BrokerStates { val state: Byte = 1 }case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
这些就是 KafkaServer 中主要模块的入口,接下来的文章会通过这些入口一一进行分析。
这里,我们一起来总结一下这篇文章的重点。
下篇我们来深度剖析「Broker 启动集群如何感知」,大家期待,我们下期见。
责任编辑:姜华 来源: 华仔聊技术 Kafka服务端(责任编辑:知识)
谷歌Android N预览版更新 支持Vulkan和3D渲染API
销量捅破天!华为Mate 50首批备货量曝光:余承东高端硬拼苹果14
九兴控股(01836.HK)发布公告:授出1969.5万份购股权