深入学习Kafka:集群中Controller和Broker之间通讯机制分析 - ControllerChannelManager

所有博文均在个人独立博客http://blog.mozhu.org首发,欢迎访问!

Kafka集群中,首先会选举出一个broker作为controller,然后该controller负责跟其他broker进行协调topic创建,partition主副本选举,topic删除等事务。
下面我们来分析controller和其他broker的通讯机制
controller会发三种请求给其他broker,即:

  • LeaderAndIsrRequest - 针对topic,KafkaController会进行partition选主,产生最新的ISR, 然后发送LeaderAndIsrRequest到topic的各个replica,即其他broker。
  • StopReplicaRequest - 当broker挂掉或用户删除某replica时,会发送LeaderAndIsrRequest给其他broker
  • UpdateMetadataRequest - 当broker变动(挂掉或重启), topic创建, partition增加,partition主副本选举等等时机都需要更新metadata,以便KafkaClient能知道最新的topic的partition信息,也知道每个partition的leader是哪个broker,知道该向哪个broker读写消息。

为了提高KafkaController Leader和集群其他broker的通信效率,ControllerBrokerRequestBatch实现批量发送请求的功能。
在ControllerBrokerRequestBatch先将一批需要发送给其他broker的信息压入queue中,然后通过ControllerChannelManager从queue中取出数据批量发送给其他broker。
leaderAndIsrRequestMap:保存了发往指定broker的LeaderAndIsrRequest请求相关的信息
stopReplicaRequestMap: 保存了发往指定broker的StopReplicaRequest请求相关的信息
updateMetadataRequestMap:保存了发往指定broker的UpdateMetadataRequest请求相关的信息

ControllerChannelManager

ControllerChannelManager在构造对象时,会初始化brokerStateInfo,调用addNewBroker方法,为每个存活的broker维持一个ControllerBrokerStateInfo对象,这个ControllerBrokerStateInfo对象中有networkClient, brokerNode, messageQueue, requestThread。其中

  • networkClient - 负责底层的网络通信的客户端
  • brokerNode - 有broker的IP,端口号以及机架信息
  • messageQueue - 需要发送的消息队列BlockingQueue
  • requestThread - 发送请求的线程RequestSendThread

注意在addNewBroker中,初始化RequestSendThread后,不会立即运行,需要等到startup方法执行时才运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private def addNewBroker(broker: Broker) {
//初始化阻塞队列,用于存放发给broker的消息
val messageQueue = new LinkedBlockingQueue[QueueItem]
debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id))
val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol)
val brokerNode = new Node(broker.id, brokerEndPoint.host, brokerEndPoint.port)
//初始化底层的通信客户端
val networkClient = {
val channelBuilder = ChannelBuilders.create(
config.interBrokerSecurityProtocol,
Mode.CLIENT,
LoginType.SERVER,
config.values,
config.saslMechanismInterBrokerProtocol,
config.saslInterBrokerHandshakeRequestEnable
)
val selector = new Selector(
NetworkReceive.UNLIMITED,
config.connectionsMaxIdleMs,
metrics,
time,
"controller-channel",
Map("broker-id" -> broker.id.toString).asJava,
false,
channelBuilder
)
new NetworkClient(
selector,
new ManualMetadataUpdater(Seq(brokerNode).asJava),
config.brokerId.toString,
1,
0,
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.requestTimeoutMs,
time
)
}
val threadName = threadNamePrefix match {
case None => "Controller-%d-to-broker-%d-send-thread".format(config.brokerId, broker.id)
case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name, config.brokerId, broker.id)
}

//初始化发送请求的线程RequestSendThread,但是不开始
val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
brokerNode, config, time, threadName)
requestThread.setDaemon(false)
brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread))
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging {
protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
private val brokerLock = new Object
this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "

controllerContext.liveBrokers.foreach(addNewBroker(_))

def startup() = {
brokerLock synchronized {
brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))
}
}

def shutdown() = {
brokerLock synchronized {
brokerStateInfo.values.foreach(removeExistingBroker)
}
}

def addBroker(broker: Broker) {
// be careful here. Maybe the startup() API has already started the request send thread
//有可能startup中已经启动了thread
brokerLock synchronized {
if(!brokerStateInfo.contains(broker.id)) {
addNewBroker(broker)
startRequestSendThread(broker.id)
}
}
}

def removeBroker(brokerId: Int) {
brokerLock synchronized {
removeExistingBroker(brokerStateInfo(brokerId))
}
}

//移除已经存在的broker
private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
try {
brokerState.networkClient.close()
brokerState.messageQueue.clear()
brokerState.requestSendThread.shutdown()
brokerStateInfo.remove(brokerState.brokerNode.id)
} catch {
case e: Throwable => error("Error while removing broker by the controller", e)
}
}

//开始请求发送线程
protected def startRequestSendThread(brokerId: Int) {
val requestThread = brokerStateInfo(brokerId).requestSendThread
if(requestThread.getState == Thread.State.NEW)
requestThread.start()
}
}

sendRequest方法中,只是把request压入对应broker的queue中

1
2
3
4
5
6
7
8
9
10
11
def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) {
brokerLock synchronized {
val stateInfoOpt = brokerStateInfo.get(brokerId)
stateInfoOpt match {
case Some(stateInfo) =>
stateInfo.messageQueue.put(QueueItem(apiKey, apiVersion, request, callback))
case None =>
warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))
}
}
}

RequestSendThread

当broker被选举成controller后,KafkaController的onControllerFailover方法会被调用,在这个方法中会调用ControllerChannelManager的startup方法。
在startup中会依次启动每个broker的请求发送线程,在RequestSendThread的doWork方法中,会从queue中取出需要发送的Item,然后一个个发送给相应的Broker,遇到broker未准备就绪,或者发送失败,都会等待300ms后再次重试,直到有收到正确的响应,并调用callback方法回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class RequestSendThread(val controllerId: Int,
val controllerContext: ControllerContext,
val queue: BlockingQueue[QueueItem],
val networkClient: NetworkClient,
val brokerNode: Node,
val config: KafkaConfig,
val time: Time,
name: String)
extends ShutdownableThread(name = name) {

private val lock = new Object()
private val stateChangeLogger = KafkaController.stateChangeLogger
private val socketTimeoutMs = config.controllerSocketTimeoutMs

override def doWork(): Unit = {

def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(300))

//从queue中取出QueueItem
val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
import NetworkClientBlockingOps._
var clientResponse: ClientResponse = null
try {
lock synchronized {
var isSendSuccessful = false
while (isRunning.get() && !isSendSuccessful) {
// if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
// removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
//如果某个broker宕机了,controller会收到zookeeper事件,会触发removeBroker,将会关闭该broker相应的thread
try {
if (!brokerReady()) {
//如果broker没有就绪,则等待300ms后继续尝试
isSendSuccessful = false
backoff()
}
else {
val requestHeader = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _))
val send = new RequestSend(brokerNode.idString, requestHeader, request.toStruct)
val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)
//发送请求
clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
isSendSuccessful = true
}
} catch {
case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
"Reconnecting to broker.").format(controllerId, controllerContext.epoch,
request.toString, brokerNode.toString()), e)
networkClient.close(brokerNode.idString)
//如果发送失败,则等待300ms后继续尝试
isSendSuccessful = false
backoff()
}
}
if (clientResponse != null) {
val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match {
case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody)
case ApiKeys.STOP_REPLICA => new StopReplicaResponse(clientResponse.responseBody)
case ApiKeys.UPDATE_METADATA_KEY => new UpdateMetadataResponse(clientResponse.responseBody)
case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey")
}
stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
.format(controllerId, controllerContext.epoch, response.toString, brokerNode.toString))

if (callback != null) {
callback(response)
}
}
}
} catch {
case e: Throwable =>
error("Controller %d fails to send a request to broker %s".format(controllerId, brokerNode.toString()), e)
// If there is any socket error (eg, socket timeout), the connection is no longer usable and needs to be recreated.
networkClient.close(brokerNode.idString)
}
}

}

ControllerBrokerRequestBatch

在KafkaController,PartitionStateMachine,ReplicaStateMachine等类中,有Broker,Partition状态等发生改变时,需要向其他Broker同步这些信息时,都会调用ControllerBrokerRequestBatch来批量发送。
先看一个例子,先调用newBatch方法,然后再调用addStopReplicaRequestForBrokers等方法向queue中添加需要发送的请求,最后调用sendRequestsToBrokers方法来完成发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
callbacks: Callbacks = (new CallbackBuilder).build) {
if(replicas.size > 0) {
info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
try {
brokerRequestBatch.newBatch()
replicas.foreach(r => handleStateChange(r, targetState, callbacks))
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
}catch {
case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
}
}
}

ControllerBrokerRequestBatch中对于三种请求leaderAndIsrRequest,stopReplicaRequest,updateMetadataRequest维持了不同的map,分别提供的addLeaderAndIsrRequestForBrokers,addStopReplicaRequestForBrokers,addUpdateMetadataRequestForBrokers用来添加对应的Request到map中,最后调用sendRequestsToBrokers方法来将不同的request压入不同的RequestSendThread线程队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
8