 源码剖析-KafkaRequestHandlerPool
源码剖析-KafkaRequestHandlerPool
  # 4.8 Kafka源码剖析之KafkaRequestHandlerPool
KafkaRequestHandlerPool的作用是创建numThreads个KafkaRequestHandler实例,使用numThreads个线程启动KafkaRequestHandler。
每个KafkaRequestHandler包含了id,brokerId,线程数,请求的channel,处理请求的api等信息。
只要该类进行实例化,就执行创建KafkaRequestHandler实例并启动的逻辑。
/**
* @param brokerId
* @param requestChannel
* @param apis 处理具体请求和响应的api
* @param time
* @param numThreads 运行KafkaRequestHandler的线程数
*/
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
numThreads: Int) extends Logging with KafkaMetricsGroup {
   // 创建包含numThreads个元素的数组
   val runnables = new Array[KafkaRequestHandler](numThreads)
   // 循环numThreads次,初始化KafkaRequestHandler实例numThreads个
   for (i <- 0 until numThreads) {
   	// 赋值:每个KafkaRequestHandler中包含了KafkaRequestHandler的id,brokerId,线程数,请求的channel,处理请求的api等。
   	runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter,numThreads, requestChannel, apis, time)
   	// 启动这些KafkaRequestHandler线程用于请求的处理
   	KafkaThread.daemon("kafka-request-handler-" + i, runnables(i)).start()
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

KafkaThread的start方法即是调用Thread的start方法,而start执行run方法,即此处执行的是KafkaThread的run方法:
def run() {
   while(true) {
   	// We use a single meter for aggregate idle percentage for the thread pool.
   	// Since meter is calculated as total_recorded_value / time_window and
   	// time_window is independent of the number of threads, each recorded idle
   	// time should be discounted by # threads.
   	val startSelectTime = time.nanoseconds
   	// 获取请求
   	val req = requestChannel.receiveRequest(300)
   	val endTime = time.nanoseconds
   	val idleTime = endTime - startSelectTime
   	aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
   	req match {
   		case RequestChannel.ShutdownRequest =>
   		debug(s"Kafka request handler $id on broker $brokerId received shut down command")
   		latch.countDown()
   		return
   		case request: RequestChannel.Request =>
   		try {
   			request.requestDequeueTimeNanos = endTime
   			trace(s"Kafka request handler $id on broker $brokerId handling request $request")
   			// 对于其他请求,直接交给apis来负责处理。
   			apis.handle(request)
   		}
   		catch {
   			case e: FatalExitError =>
   			latch.countDown()
   			Exit.exit(e.statusCode)
   			case e: Throwable => error("Exception when handling request", e)
   		}
   		finally {
   			request.releaseBuffer()
   		}
   		case null => // continue
   	}
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

该类包含了关闭KafkaRequestHandler的方法:

具体的方法:

首先发送停止的请求,等待用户请求处理的结束 latch.await() 。
优雅停机。

将请求直接放到requestQueue中。
其中处理ShutdownRequest的处理逻辑:

上次更新: 2025/04/03, 11:07:08
