源码剖析-KafkaRequestHandlerPool
# 4.8 Kafka源码剖析之KafkaRequestHandlerPool
KafkaRequestHandlerPool的作用是创建numThreads个KafkaRequestHandler实例,使用numThreads个线程启动KafkaRequestHandler。
每个KafkaRequestHandler包含了id,brokerId,线程数,请求的channel,处理请求的api等信息。
只要该类进行实例化,就执行创建KafkaRequestHandler实例并启动的逻辑。
/**
* @param brokerId
* @param requestChannel
* @param apis 处理具体请求和响应的api
* @param time
* @param numThreads 运行KafkaRequestHandler的线程数
*/
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
numThreads: Int) extends Logging with KafkaMetricsGroup {
// 创建包含numThreads个元素的数组
val runnables = new Array[KafkaRequestHandler](numThreads)
// 循环numThreads次,初始化KafkaRequestHandler实例numThreads个
for (i <- 0 until numThreads) {
// 赋值:每个KafkaRequestHandler中包含了KafkaRequestHandler的id,brokerId,线程数,请求的channel,处理请求的api等。
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter,numThreads, requestChannel, apis, time)
// 启动这些KafkaRequestHandler线程用于请求的处理
KafkaThread.daemon("kafka-request-handler-" + i, runnables(i)).start()
}
}
KafkaThread的start方法即是调用Thread的start方法,而start执行run方法,即此处执行的是KafkaThread的run方法:
def run() {
while(true) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = time.nanoseconds
// 获取请求
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
val idleTime = endTime - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
req match {
case RequestChannel.ShutdownRequest =>
debug(s"Kafka request handler $id on broker $brokerId received shut down command")
latch.countDown()
return
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
trace(s"Kafka request handler $id on broker $brokerId handling request $request")
// 对于其他请求,直接交给apis来负责处理。
apis.handle(request)
}
catch {
case e: FatalExitError =>
latch.countDown()
Exit.exit(e.statusCode)
case e: Throwable => error("Exception when handling request", e)
}
finally {
request.releaseBuffer()
}
case null => // continue
}
}
}
该类包含了关闭KafkaRequestHandler的方法:
具体的方法:
首先发送停止的请求,等待用户请求处理的结束 latch.await() 。
优雅停机。
将请求直接放到requestQueue中。
其中处理ShutdownRequest的处理逻辑:
上次更新: 2023/08/12, 20:54:07