源码剖析-SocketServer
# 4.7 Kafka源码剖析之SocketServer
线程模型:
- 当前broker上配置了多少个listener,就有多少个Acceptor,用于新建连接。
- 每个Acceptor对应N个线程的处理器(Processor),用于接收客户端请求。
- 处理器们对应M个线程的处理程序(Handler),处理用户请求,并将响应发送给等待给客户写响应的处理器线程。
在启动KakfaServer的startup方法中启动SocketServer:
每个listener就是一个端点,每个端点创建多个处理程序。
究竟启动多少个处理程序?
processor个数为numProcessorThreads个。上图中for循环为从processorBeginIndex到prodessorEndIndex(不包括)。
numProcessorThread为:
acceptor的启动过程:
KafkaThread:
调用Thread的构造器:
KafkaThread的start方法即是Thread的start方法,此时调用的是acceptor的run方法:
/**
* 使用Java的NIO
* 循环检查是否有新的连接尝试
* 轮询的方式将请求交给各个processor来处理。
*/
def run() {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessor = 0
while (isRunning) {
try {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable)
// 指定一个processor处理请求
accept(key, processors(currentProcessor)) else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
// round robin to the next processor thread
// 通过轮询的方式找到下一个processor线程
currentProcessor = (currentProcessor + 1) % processors.length
}
catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
// to a select operation on a specific channel or a bad request.We don't want
// the broker to stop responding to requests from other clients in these scenarios.
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
}
finally {
debug("Closing server socket and selector.")
swallowError(serverChannel.close())
swallowError(nioSelector.close())
shutdownComplete()
}
}
Acceptor建立连接,处理请求:
/*
* Accept a new connection
* 建立一个新连接
*/
def accept(key: SelectionKey, processor: Processor) {
// 服务端
val serverSocketChannel =key.channel().asInstanceOf[ServerSocketChannel]
// 客户端
val socketChannel = serverSocketChannel.accept()
try {
connectionQuotas.inc(socketChannel.socket().getInetAddress)
// 非阻塞
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
// 设置发送缓冲大小
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socketChannel.socket().setSendBufferSize(sendBufferSize)
debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: %d|%d"
.format(socketChannel.socket.getRemoteSocketAddress,socketChannel.socket.getLocalSocketAddress, processor.id,
socketChannel.socket.getSendBufferSize, sendBufferSize,
socketChannel.socket.getReceiveBufferSize,recvBufferSize))
// 调用Processor的accept方法,由processor处理请求
processor.accept(socketChannel)
}
catch {
case e: TooManyConnectionsException =>
info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
close(socketChannel)
}
}
Processor将连接加入缓冲队列,同时唤醒处理线程:
Processor的run方法从newConnections中取出请求的channel,解析封装请求,交给handler处理:
将请求信息放到请求队列中:
在KafkaServer的startup方法中实例化KafkaRequestHandlerPool,该类会立即初始化numIoThreads个线程用于执行KafkaRequestHandler处理请求的逻辑。
KafkaRequestHandlerPool以多线程的方式启动多个KafkaRequestHandler:
KafkaRequestHandler的run方法中,receiveRequest方法从请求队列获取请求:
具体实现:
KafkaRequestHandler的run方法中使用模式匹配:
上图中,apis的handle方法处理请求:
/**
* 处理所有请求的顶级方法,使用模式匹配,交给具体的api来处理
*/
def handle(request: RequestChannel.Request) {
try {
trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" + s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN =>handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.FIND_COORDINATOR =>handleFindCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
case ApiKeys.INIT_PRODUCER_ID =>handleInitProducerIdRequest(request)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>handleOffsetForLeaderEpochRequest(request)
case ApiKeys.ADD_PARTITIONS_TO_TXN =>handleAddPartitionToTxnRequest(request)
case ApiKeys.ADD_OFFSETS_TO_TXN =>handleAddOffsetsToTxnRequest(request)
case ApiKeys.END_TXN => handleEndTxnRequest(request)
case ApiKeys.WRITE_TXN_MARKERS =>handleWriteTxnMarkersRequest(request)
case ApiKeys.TXN_OFFSET_COMMIT =>handleTxnOffsetCommitRequest(request)
case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
case ApiKeys.DESCRIBE_CONFIGS =>handleDescribeConfigsRequest(request)
case ApiKeys.ALTER_REPLICA_LOG_DIRS =>handleAlterReplicaLogDirsRequest(request)
case ApiKeys.DESCRIBE_LOG_DIRS =>handleDescribeLogDirsRequest(request)
case ApiKeys.SASL_AUTHENTICATE =>handleSaslAuthenticateRequest(request)
case ApiKeys.CREATE_PARTITIONS =>handleCreatePartitionsRequest(request)
}
}
catch {
case e: FatalExitError => throw e
case e: Throwable => handleError(request, e)
}
finally {
request.apiLocalCompleteTimeNanos = time.nanoseconds
}
}
上次更新: 2023/08/12, 20:54:07