跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

      • Kafka基础使用
      • Kafka深入

        • Kafka架构与实战
        • Kafka高级特性解析
        • Kafka高级特性-消费者
        • Kafka高级特性-主题
        • Kafka高级特性-分区
        • Kafka高级特性-物理存储
        • Kafka高级特性-稳定性
        • Kafka高级特性-延时队列
        • Kafka高级特性-重试队列
        • Kafka集群与运维
        • Kafka源码剖析
        • 源码剖析-Broker启动流程
        • 源码剖析-Topic创建流程
        • 源码剖析-Producer生产者流程
        • 源码剖析-Consumer消费者流程
        • 源码剖析-消息存储机制
        • 源码剖析-SocketServer
        • 源码剖析-KafkaRequestHandlerPool
        • 源码剖析-LogManager
        • 源码剖析-ReplicaManager
        • 源码剖析-OffsetManager
        • 源码剖析-KafkaApis
        • 源码剖析-KafkaController
        • 源码剖析-KafkaHealthcheck
        • 源码剖析-DynamicConfigManager
        • 源码剖析-分区消费模式
        • 源码剖析-组消费模式
        • 源码剖析-同步发送模式
        • 源码剖析-异步发送模式
    • RabbitMQ

    • RocketMQ

  • Zookeeper

  • java组件
  • 消息队列
  • Kafka
  • Kafka深入
Revin
2023-07-31

源码剖析-SocketServer

# 4.7 Kafka源码剖析之SocketServer

线程模型:

  1. 当前broker上配置了多少个listener,就有多少个Acceptor,用于新建连接。
  2. 每个Acceptor对应N个线程的处理器(Processor),用于接收客户端请求。
  3. 处理器们对应M个线程的处理程序(Handler),处理用户请求,并将响应发送给等待给客户写响应的处理器线程。

image-20230801091357460

在启动KakfaServer的startup方法中启动SocketServer:

Kafka_Page283_001

每个listener就是一个端点,每个端点创建多个处理程序。

Kafka_Page283_002

究竟启动多少个处理程序?

processor个数为numProcessorThreads个。上图中for循环为从processorBeginIndex到prodessorEndIndex(不包括)。

numProcessorThread为:

Kafka_Page283_003

Kafka_Page284_001

Kafka_Page284_002

Kafka_Page284_003

acceptor的启动过程:

Kafka_Page284_004

KafkaThread:

Kafka_Page284_005

Kafka_Page284_006

调用Thread的构造器:

Kafka_Page284_007

Kafka_Page285_001

KafkaThread的start方法即是Thread的start方法,此时调用的是acceptor的run方法:

/**
* 使用Java的NIO
* 循环检查是否有新的连接尝试
* 轮询的方式将请求交给各个processor来处理。
*/
def run() {
	serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
	startupComplete()
	try {
		var currentProcessor = 0
		while (isRunning) {
			try {
				val ready = nioSelector.select(500)
				if (ready > 0) {
					val keys = nioSelector.selectedKeys()
					val iter = keys.iterator()
					while (iter.hasNext && isRunning) {
						try {
							val key = iter.next
							iter.remove()
							if (key.isAcceptable)
							// 指定一个processor处理请求
							accept(key, processors(currentProcessor)) else
							throw new IllegalStateException("Unrecognized key state for acceptor thread.")
							// round robin to the next processor thread
							// 通过轮询的方式找到下一个processor线程
							currentProcessor = (currentProcessor + 1) % processors.length
						}
						catch {
							case e: Throwable => error("Error while accepting connection", e)
						}
					}
				}
			}
			catch {
				// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
				// to a select operation on a specific channel or a bad request.We don't want
				// the broker to stop responding to requests from other clients in these scenarios.
				case e: ControlThrowable => throw e
				case e: Throwable => error("Error occurred", e)
			}
		}
	}
	finally {
		debug("Closing server socket and selector.")
		swallowError(serverChannel.close())
		swallowError(nioSelector.close())
		shutdownComplete()
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

Acceptor建立连接,处理请求:

/*
* Accept a new connection
* 建立一个新连接
*/
def accept(key: SelectionKey, processor: Processor) {
	// 服务端
	val serverSocketChannel =key.channel().asInstanceOf[ServerSocketChannel]
	// 客户端
	val socketChannel = serverSocketChannel.accept()
	try {
		connectionQuotas.inc(socketChannel.socket().getInetAddress)
		// 非阻塞
		socketChannel.configureBlocking(false)
		socketChannel.socket().setTcpNoDelay(true)
		socketChannel.socket().setKeepAlive(true)
		// 设置发送缓冲大小
		if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
		socketChannel.socket().setSendBufferSize(sendBufferSize)
		debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: %d|%d"
		.format(socketChannel.socket.getRemoteSocketAddress,socketChannel.socket.getLocalSocketAddress, processor.id,
		socketChannel.socket.getSendBufferSize, sendBufferSize,
		socketChannel.socket.getReceiveBufferSize,recvBufferSize))
		// 调用Processor的accept方法,由processor处理请求
		processor.accept(socketChannel)
	}
	catch {
		case e: TooManyConnectionsException =>
		info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
		close(socketChannel)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

Processor将连接加入缓冲队列,同时唤醒处理线程:

Kafka_Page287_001

Processor的run方法从newConnections中取出请求的channel,解析封装请求,交给handler处理:

Kafka_Page287_002

Kafka_Page288_001

将请求信息放到请求队列中:

Kafka_Page288_002

在KafkaServer的startup方法中实例化KafkaRequestHandlerPool,该类会立即初始化numIoThreads个线程用于执行KafkaRequestHandler处理请求的逻辑。

Kafka_Page288_003

KafkaRequestHandlerPool以多线程的方式启动多个KafkaRequestHandler:

Kafka_Page288_004

KafkaRequestHandler的run方法中,receiveRequest方法从请求队列获取请求:

Kafka_Page289_001

具体实现:

Kafka_Page289_002

KafkaRequestHandler的run方法中使用模式匹配:

Kafka_Page289_003

上图中,apis的handle方法处理请求:

/**
* 处理所有请求的顶级方法,使用模式匹配,交给具体的api来处理
*/
def handle(request: RequestChannel.Request) {
   try {
   	trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" + s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
   	request.header.apiKey match {
   		case ApiKeys.PRODUCE => handleProduceRequest(request)
   		case ApiKeys.FETCH => handleFetchRequest(request)
   		case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
   		case ApiKeys.METADATA => handleTopicMetadataRequest(request)
   		case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
   		case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
   		case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
   		case ApiKeys.CONTROLLED_SHUTDOWN =>handleControlledShutdownRequest(request)
   		case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
   		case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
   		case ApiKeys.FIND_COORDINATOR =>handleFindCoordinatorRequest(request)
   		case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
   		case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
   		case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
   		case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
   		case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
   		case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
   		case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
   		case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
   		case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
   		case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
   		case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
   		case ApiKeys.INIT_PRODUCER_ID =>handleInitProducerIdRequest(request)
   		case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>handleOffsetForLeaderEpochRequest(request)
   		case ApiKeys.ADD_PARTITIONS_TO_TXN =>handleAddPartitionToTxnRequest(request)
   		case ApiKeys.ADD_OFFSETS_TO_TXN =>handleAddOffsetsToTxnRequest(request)
   		case ApiKeys.END_TXN => handleEndTxnRequest(request)
   		case ApiKeys.WRITE_TXN_MARKERS =>handleWriteTxnMarkersRequest(request)
   		case ApiKeys.TXN_OFFSET_COMMIT =>handleTxnOffsetCommitRequest(request)
   		case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
   		case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
   		case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
   		case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
   		case ApiKeys.DESCRIBE_CONFIGS =>handleDescribeConfigsRequest(request)
   		case ApiKeys.ALTER_REPLICA_LOG_DIRS =>handleAlterReplicaLogDirsRequest(request)
   		case ApiKeys.DESCRIBE_LOG_DIRS =>handleDescribeLogDirsRequest(request)
   		case ApiKeys.SASL_AUTHENTICATE =>handleSaslAuthenticateRequest(request)
   		case ApiKeys.CREATE_PARTITIONS =>handleCreatePartitionsRequest(request)
   	}
   }
   catch {
   	case e: FatalExitError => throw e
   	case e: Throwable => handleError(request, e)
   }
   finally {
   	request.apiLocalCompleteTimeNanos = time.nanoseconds
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
上次更新: 2025/04/03, 11:07:08
源码剖析-消息存储机制
源码剖析-KafkaRequestHandlerPool

← 源码剖析-消息存储机制 源码剖析-KafkaRequestHandlerPool→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式