跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

      • Kafka基础使用
      • Kafka深入

        • Kafka架构与实战
        • Kafka高级特性解析
        • Kafka高级特性-消费者
        • Kafka高级特性-主题
        • Kafka高级特性-分区
        • Kafka高级特性-物理存储
        • Kafka高级特性-稳定性
        • Kafka高级特性-延时队列
        • Kafka高级特性-重试队列
        • Kafka集群与运维
        • Kafka源码剖析
        • 源码剖析-Broker启动流程
        • 源码剖析-Topic创建流程
        • 源码剖析-Producer生产者流程
        • 源码剖析-Consumer消费者流程
          • 4.5.1 Consumer示例
          • 4.5.2 KafkaConsumer实例化
          • 4.5.3 订阅Topic
          • 4.5.4 消息消费过程
            • 4.5.4.1 poll
            • 4.5.4.2 pollOnce
            • 4.5.4.2.1 coordinator.poll()
            • 4.5.4.2.2 updateFetchPositions()
            • 4.5.4.2.3 fetcher.fetchedRecords()
            • 4.5.4.2.4 fetcher.sendFetches()
            • 4.5.4.2.5 client.poll()
            • 4.5.4.2.6 coordinator.needRejoin()
          • 4.5.5 自动提交
          • 4.5.6 手动提交
            • 4.5.6.1 同步提交
            • 4.5.6.2 异步提交
        • 源码剖析-消息存储机制
        • 源码剖析-SocketServer
        • 源码剖析-KafkaRequestHandlerPool
        • 源码剖析-LogManager
        • 源码剖析-ReplicaManager
        • 源码剖析-OffsetManager
        • 源码剖析-KafkaApis
        • 源码剖析-KafkaController
        • 源码剖析-KafkaHealthcheck
        • 源码剖析-DynamicConfigManager
        • 源码剖析-分区消费模式
        • 源码剖析-组消费模式
        • 源码剖析-同步发送模式
        • 源码剖析-异步发送模式
    • RabbitMQ

    • RocketMQ

  • Zookeeper

  • java组件
  • 消息队列
  • Kafka
  • Kafka深入
Revin
2023-07-31
目录

源码剖析-Consumer消费者流程

# 4.5 Kafka源码剖析之Consumer消费者流程

# 4.5.1 Consumer示例

KafkaConsumer

消费者的根本目的是从Kafka服务端拉取消息,并交给业务逻辑进行处理。

开发人员不必关心与Kafka服务端之间网络连接的管理、心跳检测、请求超时重试等底层操作

也不必关心订阅Topic的分区数量、分区Leader副本的网络拓扑以及消费组的Rebalance等细节,另外还提供了自动提交offset的功能。

案例:

public static void main(String[] args) throws InterruptedException {
   // 是否自动ᨀ交
   Boolean autoCommit = false;
   // 是否异步ᨀ交
   Boolean isSync = true;
   Properties props = new Properties();
   // kafka地址,列表格式为host1:port1,host2:port2,…,无需添加所有的集群地址,kafka会根据ᨀ供的地址发现其他的地址(建议多ᨀ供几个,以防ᨀ供的服务器关闭)
   props.put("bootstrap.servers", "localhost:9092");
   // 消费组
   props.put("group.id", "test");
   // 开启自动ᨀ交offset
   props.put("enable.auto.commit", autoCommit.toString());
   // 1s自动ᨀ交
   props.put("auto.commit.interval.ms", "1000");
   // 消费者和群组协调器的最大心跳时间,如果超过该时间则认为该消费者已经死亡或者故障,需要踢出消费者组
   props.put("session.timeout.ms", "60000");
   // 一次poll间隔最大时间
   props.put("max.poll.interval.ms", "1000");
   // 当消费者读取偏移量无效的情况下,需要重置消费起始位置,默认为latest(从消费者启动后生成的记录),另外一个选项值是 earliest,将从有效的最小位移位置开始消费
   props.put("auto.offset.reset", "latest");
   // consumer端一次拉取数据的最大字节数
   props.put("fetch.max.bytes", "1024000");
   // key序列化方式
   props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
   // value序列化方式
   props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
   KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
   String topic = "lagou_edu";
   // 订阅topic列表
   consumer.subscribe(Arrays.asList(topic));
   while (true) {
   	// 消息拉取
   	ConsumerRecords<String, String> records = consumer.poll(100);
   	for (ConsumerRecord<String, String> record : records) {
   		System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());
   	}
   	if (!autoCommit) {
   		if (isSync) {
   			// 处理完成单次消息以后,ᨀ交当前的offset,如果失败会一直重试直至成功
   			consumer.commitSync();
   		} else {
   			// 异步ᨀ交
   			consumer.commitAsync((offsets, exception) -> {
   				exception.printStackTrace();
   				System.out.println(offsets.size());
   			}
   			);
   		}
   	}
   	TimeUnit.SECONDS.sleep(3);
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

Kafka服务端并不会记录消费者的消费位置,而是由消费者自己决定如何保存如何记录其消费的offset。在Kafka服务端中添加了一个名为“__consumer_offsets"的内部topic来保存消费者提交的offset,当出现消费者上、下线时会触发Consumer Group进行Rebalance操作,对分区进行重新分配,待Rebalance操作完成后。消费者就可以读取该topic中记录的offset,并从此offset位置继续消费。当然,使用该topic记录消费者的offset只是默认选项,开发人员可以根据业务需求将offset记录在别的存储中。

在消费者消费消息的过程中,提交offset的时机非常重要,因为它决定了消费者故障重启后的消费位置。在上面的示例中,我们通过将 enable.auto.commit 选项设置为true可以起到自动提交offset的功能, auto.commit.interval.ms 选项则设置了自动提交的时间间隔。每次在调用KafkaConsumer.poll() 方法时都会检测是否需要自动提交,并提交上次 poll() 方法返回的最后一个消息的offset。为了避免消息丢失,建议poll()方法之前要处理完上次poll()方法拉取的全部消息。

KafkaConsumer中还提供了两个手动提交offset的方法,分别是 commitSync() 和 commitAsync() ,它们都可以指定提交的offset值,区别在于前者是同步提交,后者是异步提交。

# 4.5.2 KafkaConsumer实例化

了解了 KafkaConsumer 的基本使用,开始深入了解 KafkaConsumer 原理和实现,先看一下构造方法核心逻辑

private KafkaConsumer(ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
   try {
   	// 获取client.id,如果为空则默认生成一个,默认:consumer-1
   	String clientId =config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
   	if (clientId.isEmpty())
   	clientId = "consumer-" +CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
   	this.clientId = clientId;
   	// 获取消费组名
   	String groupId =config.getString(ConsumerConfig.GROUP_ID_CONFIG);
   	LogContext logContext = new LogContext("[Consumer clientId=" +clientId + ", groupId=" + groupId + "] ");
   	this.log = logContext.logger(getClass());
   	log.debug("Initializing the Kafka consumer");
   	this.requestTimeoutMs =config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
   	int sessionTimeOutMs =config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
   	int fetchMaxWaitMs =config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
   	if (this.requestTimeoutMs <= sessionTimeOutMs ||this.requestTimeoutMs <= fetchMaxWaitMs)
   	throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
   	this.time = Time.SYSTEM;
   	// 与生产者逻辑相同
   	Map<String, String> metricsTags =Collections.singletonMap("client-id", clientId);
   	MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
   	.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),TimeUnit.MILLISECONDS)
   	.recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
   	.tags(metricsTags);
   	List<MetricsReporter> reporters =config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG
   	,
   	MetricsReporter.class);
   	reporters.add(new JmxReporter(JMX_PREFIX));
   	this.metrics = new Metrics(metricConfig, reporters, time);
   	this.retryBackoffMs =config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
   	// 消费者拦截器
   	// load interceptors and make sure they get clientId
   	Map<String, Object> userProvidedConfigs = config.originals();
   	userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG,clientId);
   	List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs,false)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
   	ConsumerInterceptor.class);
   	this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
   	// key反序列化
   	if (keyDeserializer == null) {
   		this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
   		Deserializer.class);
   		this.keyDeserializer.configure(config.originals(), true);
   	} else {
   		config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
   		this.keyDeserializer = keyDeserializer;
   	}
   	// value反序列化
   	if (valueDeserializer == null) {
   		this.valueDeserializer =config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
   		,
   		Deserializer.class);
   		this.valueDeserializer.configure(config.originals(),false);
   	} else {
   		config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
   		this.valueDeserializer = valueDeserializer;
   	}
   	ClusterResourceListeners clusterResourceListeners =configureClusterResourceListeners(keyDeserializer, valueDeserializer,reporters, interceptorList);
   	this.metadata = new Metadata(retryBackoffMs,config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
   	true, false, clusterResourceListeners);
   	List<InetSocketAddress> addresses =ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
   	this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
   	String metricGrpPrefix = "consumer";
   	ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
   	ChannelBuilder channelBuilder =ClientUtils.createChannelBuilder(config);
   	// 事务隔离级别
   	IsolationLevel isolationLevel = IsolationLevel.valueOf(
   	config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
   	Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics,metricsRegistry.fetcherMetrics);
   	// 网络组件
   	NetworkClient netClient = new NetworkClient(
   	new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),metrics, time, metricGrpPrefix, channelBuilder, logContext),
   	this.metadata,
   	clientId,
   	100, // a fixed large enough value will suffice for max in-flight requests
   	config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
   	config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
   	config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
   	config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
   	config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
   	time,
   	true,
   	new ApiVersions(),
   	throttleTimeSensor,
   	logContext);
   	// 客户端
   	this.client = new ConsumerNetworkClient(
   	logContext,
   	netClient,
   	metadata,
   	time,
   	retryBackoffMs,
   	config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
   	// offset重置策略,默认是自动ᨀ交
   	OffsetResetStrategy offsetResetStrategy =OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
   	this.subscriptions = new SubscriptionState(offsetResetStrategy);
   	this.assignors = config.getConfiguredInstances(
   	ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
   	PartitionAssignor.class);
   	// offset协调者
   	this.coordinator = new ConsumerCoordinator(logContext,
   	this.client,
   	groupId,
   	config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
   	config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
   	config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
   	assignors,
   	this.metadata,
   	this.subscriptions,
   	metrics,
   	metricGrpPrefix,
   	this.time,
   	retryBackoffMs,
   	config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
   	config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
   	this.interceptors,
   	config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
   	config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
   	// 拉取器
   	this.fetcher = new Fetcher<>(
   	logContext,
   	this.client,
   	config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
   	config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
   	config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
   	config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
   	config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
   	config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
   	this.keyDeserializer,
   	this.valueDeserializer,
   	this.metadata,
   	this.subscriptions,
   	metrics,
   	metricsRegistry.fetcherMetrics,
   	this.time,
   	this.retryBackoffMs,
   	isolationLevel);
   	// 打印用户设置,但是没有使用的配置项
   	config.logUnused();
   	AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
   	log.debug("Kafka consumer initialized");
   }
   catch (Throwable t) {
   	// call close methods if internal objects are already constructed
   	// this is to prevent resource leak. see KAFKA-2121
   	close(0, true);
   	// now propagate the exception
   	throw new KafkaException("Failed to construct kafka consumer",t);
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
  1. 初始化参数配置

    • client.id、group.id、消费者拦截器、key/value序列化、事务隔离级别
  2. 初始化网络客户端 NetworkClient

  3. 初始化消费者网络客户端 ConsumerNetworkClient

  4. 初始化offset提交策略,默认自动提交

  5. 初始化消费者协调器 ConsumerCoordinator

  6. 初始化拉取器 Fetcher

# 4.5.3 订阅Topic

下面我们先来看一下subscribe方法都有哪些逻辑:

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
   // 轻量级锁
   acquireAndEnsureOpen();
   try {
   	if (topics == null) {
   		throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
   	} else if (topics.isEmpty()) {
   		// topics为空,则开始取消订阅的逻辑
   		this.unsubscribe();
   	} else {
   		// topic合法性判断,包含null或者空字符串直接抛异常
   		for (String topic : topics) {
   			if (topic == null || topic.trim().isEmpty())
   			throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
   		}
   		// 如果没有消费协调者直接抛异常
   		throwIfNoAssignorsConfigured();
   		log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
   		// 开始订阅
   		this.subscriptions.subscribe(new HashSet<>(topics), listener);
   		// 更新元数据,如果metadata当前不包括所有的topics则标记强制更新
   		metadata.setTopics(subscriptions.groupSubscription());
   	}
   }
   finally {
   	release();
   }
}
public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
   if (listener == null)
   throw new IllegalArgumentException("RebalanceListener cannot be null");
   // 按照指定的Topic名字进行订阅,自动分配分区
   setSubscriptionType(SubscriptionType.AUTO_TOPICS);
   // 监听
   this.listener = listener;
   // 修改订阅信息
   changeSubscription(topics);
}
private void changeSubscription(Set<String> topicsToSubscribe) {
   if (!this.subscription.equals(topicsToSubscribe)) {
   	// 如果使用AUTO_TOPICS或AUTO_PARTITION模式,则使用此集合记录所有订阅的Topic
   	this.subscription = topicsToSubscribe;
   	// Consumer Group中会选一个Leader,Leader会使用这个集合记录Consumer Group中所有消费者订阅的Topic,而其他的Follower的这个集合只会保存自身订阅的Topic
   	this.groupSubscription.addAll(topicsToSubscribe);
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
  1. KafkaConsumer不是线程安全类,开启轻量级锁,topics为空抛异常,topics是空集合开始取消订阅,再次判断topics集合中是否有非法数据,判断消费者协调者是否为空。开始订阅对应topic。listener默认为 NoOpConsumerRebalanceListener ,一个空操作

轻量级锁:分别记录了当前使用KafkaConsumer的线程id和重入次数,KafkaConsumer的acquire()和release()方法实现了一个”轻量级锁“,它并非真正的锁,仅是检测是否有多线程并发操作KafkaConsumer而已

  1. 每一个KafkaConsumer实例内部都拥有一个SubscriptionState对象,subscribe内部调用了subscribe方法,subscribe方法订阅信息记录到 SubscriptionState ,多次订阅会覆盖旧数据。

  2. 更新metadata,判断如果metadata中不包含当前groupSubscription,开始标记更新(后面会有更新的逻辑),并且消费者侧的topic不会过期

# 4.5.4 消息消费过程

下面KafkaConsumer的核心方法poll是如何拉取消息的,先来看一下下面的代码:

# 4.5.4.1 poll

public ConsumerRecords<K, V> poll(long timeout) {
   // 使用轻量级锁检测kafkaConsumer是否被其他线程使用
   acquireAndEnsureOpen();
   try {
   	// 超时时间小于0抛异常
   	if (timeout < 0)
   	throw new IllegalArgumentException("Timeout must not be negative");
   	// 订阅类型为NONE抛异常,表示当前消费者没有订阅任何topic或者没有分配分区
   	if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
   	throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
   	// poll for new data until the timeout expires
   	long start = time.milliseconds();
   	long remaining = timeout;
   	do {
   		// 核心方法,拉取消息
   		Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
   		if (!records.isEmpty()) {
   			// before returning the fetched records, we can send off the next round of fetches
   			// and avoid block waiting for their responses to enable pipelining while the user
   			// is handling the fetched records.
   			//
   			// NOTE: since the consumed position has already been updated, we must not allow
   			// wakeups or any other errors to be triggered prior to returning the fetched records.
   			// 如果拉取到了消息,发送一次消息拉取的请求,不会阻塞不会被中断
   			// 在返回数据之前,发送下次的 fetch 请求,避免用户在下次获取数据时线程 block
   			if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
   			client.pollNoWakeup();
   			// 经过拦截器处理后返回
   			if (this.interceptors == null)
   			return new ConsumerRecords<>(records); else
   			return this.interceptors.onConsume(new ConsumerRecords<>(records));
   		}
   		long elapsed = time.milliseconds() - start;
   		// 拉取超时就结束
   		remaining = timeout - elapsed;
   	}
   	while (remaining > 0);
   	return ConsumerRecords.empty();
   }
   finally {
   	release();
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  1. 使用轻量级锁检测kafkaConsumer是否被其他线程使用

  2. 检查超时时间是否小于0,小于0抛出异常,停止消费

  3. 检查这个 consumer 是否订阅的相应的 topic-partition

  4. 调用 pollOnce() 方法获取相应的 records

  5. 在返回获取的 records 前,发送下一次的 fetch 请求,避免用户在下次请求时线程 block 在pollOnce() 方法中

  6. 如果在给定的时间(timeout)内获取不到可用的 records,返回空数据

这里可以看出,poll 方法的真正实现是在 pollOnce 方法中,poll 方法通过 pollOnce 方法获取可用的数据

# 4.5.4.2 pollOnce

// 除了获取新数据外,还会做一些必要的 offset-commit和reset-offset的操作
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
   client.maybeTriggerWakeup();
   // 1. 获取 GroupCoordinator 地址并连接、加入 Group、sync Group、自动commit, join 及 sync 期间 group 会进行 rebalance
   coordinator.poll(time.milliseconds(), timeout);
   // 2. 更新订阅的 topic-partition 的 offset(如果订阅的 topic-partition list 没有有效的 offset 的情况下)
   if (!subscriptions.hasAllFetchPositions())
   updateFetchPositions(this.subscriptions.missingFetchPositions());
   // 3. 获取 fetcher 已经拉取到的数据
   Map<TopicPartition, List<ConsumerRecord<K, V>>> records =fetcher.fetchedRecords();
   if (!records.isEmpty())
   return records;
   // 4. 发送 fetch 请求,会从多个 topic-partition 拉取数据(只要对应的 topic-partition 没有未完成的请求)
   fetcher.sendFetches();
   long now = time.milliseconds();
   long pollTimeout = Math.min(coordinator.timeToNextPoll(now),timeout);
   // 5. 调用 poll 方法发送请求(底层发送请求的接口)
   client.poll(pollTimeout, now, new PollCondition() {
   	@Override
   	public boolean shouldBlock() {
   		// since a fetch might be completed by the background thread, we need this poll condition
   		// to ensure that we do not block unnecessarily in poll()
   		return !fetcher.hasCompletedFetches();
   	}
   }
   );
   // 6. 如果 group 需要 rebalance,直接返回空数据,这样更快地让 group 进行稳定状态
   if (coordinator.needRejoin())
   return Collections.emptyMap();
   // 获取到请求的结果
   return fetcher.fetchedRecords();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

pollOnce 可以简单分为6步来看,其作用分别如下:

# 4.5.4.2.1 coordinator.poll()

获取 GroupCoordinator 的地址,并建立相应 tcp 连接,发送 join-group、sync-group,之后才真正加入到了一个 group 中,这时会获取其要消费的 topic-partition 列表,如果设置了自动 commit,也会在这一步进行 commit。总之,对于一个新建的 group,group 状态将会从 Empty –>PreparingRebalance –> AwaiSync –> Stable;

  1. 获取 GroupCoordinator 的地址,并建立相应 tcp 连接;

  2. 发送 join-group 请求,然后 group 将会进行 rebalance;

  3. 发送 sync-group 请求,之后才正在加入到了一个 group 中,这时会通过请求获取其要消费的topic-partition 列表;

  4. 如果设置了自动 commit,也会在这一步进行 commit offset

# 4.5.4.2.2 updateFetchPositions()

这个方法主要是用来更新这个 consumer 实例订阅的 topic-partition 列表的 fetch-offset 信息。目的就是为了获取其订阅的每个 topic-partition 对应的 position,这样 Fetcher 才知道从哪个 offset 开始去拉取这个 topic-partition 的数据

private void updateFetchPositions(Set<TopicPartition> partitions) {
   // 先重置那些调用 seekToBegin 和 seekToEnd 的 offset 的 tp,设置其 the fetch position 的 offset
   fetcher.resetOffsetsIfNeeded(partitions);
   if (!subscriptions.hasAllFetchPositions(partitions)) {
   	// 获取所有分配 tp 的 offset, 即 committed offset, 更新到TopicPartitionState 中的 committed offset 中
   	coordinator.refreshCommittedOffsetsIfNeeded();
   	// 如果 the fetch position 值无效,则将上步获取的 committed offset 设置为 the fetch position
   	fetcher.updateFetchPositions(partitions);
   }
}
1
2
3
4
5
6
7
8
9
10

在 Fetcher 中,这个 consumer 实例订阅的每个 topic-partition 都会有一个对应的TopicPartitionState 对象,在这个对象中会记录以下这些内容:

private static class TopicPartitionState {
   // Fetcher 下次去拉取时的 offset,Fecher 在拉取时需要知道这个值
   private Long position;
   // last consumed position
   // 最后一次获取的高水位标记
   private Long highWatermark;
   // the high watermark from last fetch
   private Long lastStableOffset;
   // consumer 已经处理完的最新一条消息的 offset,consumer 主动调用 offset-commit 时会更新这个值;
   private OffsetAndMetadata committed;
   // last committed position
   // 是否暂停
   private boolean paused;
   // whether this partition has been paused by the user
   // 这 topic-partition offset 重置的策略,重置之后,这个策略就会改为 null,防止再次操作
   private OffsetResetStrategy resetStrategy;
   // the strategy to use if the offset needs resetting
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 4.5.4.2.3 fetcher.fetchedRecords()

返回其 fetched records,并更新其 fetch-position offset,只有在 offset-commit 时(自动commit 时,是在第一步实现的),才会更新其 committed offset;

public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
   Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
   // 在 max.poll.records 中设置单词最大的拉取条数
   int recordsRemaining = maxPollRecords;
   try {
   	while (recordsRemaining > 0) {
   		if (nextInLineRecords == null || nextInLineRecords.isFetched) {
   			// 从队列中获取但不移除此队列的头;如果此队列为空,返回null
   			CompletedFetch completedFetch = completedFetches.peek();
   			if (completedFetch == null) break;
   			// 获取下一个要处理的 nextInLineRecords
   			nextInLineRecords = parseCompletedFetch(completedFetch);
   			completedFetches.poll();
   		} else {
   			// 拉取records,更新 position
   			List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
   			TopicPartition partition = nextInLineRecords.partition;
   			if (!records.isEmpty()) {
   				List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
   				if (currentRecords == null) {
   					fetched.put(partition, records);
   				} else {
   					List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
   					newRecords.addAll(currentRecords);
   					newRecords.addAll(records);
   					fetched.put(partition, newRecords);
   				}
   				recordsRemaining -= records.size();
   			}
   		}
   	}
   }
   catch (KafkaException e) {
   	if (fetched.isEmpty())
   	throw e;
   }
   return fetched;
}
private List<ConsumerRecord<K, V>> fetchRecords(PartitionRecords partitionRecords, int maxRecords) {
   if (!subscriptions.isAssigned(partitionRecords.partition)) {
   	log.debug("Not returning fetched records for partition {} since it is no longer assigned",
   	partitionRecords.partition);
   } else {
   	long position = subscriptions.position(partitionRecords.partition);
   	// 这个 tp 不能来消费了,比如调用 pause方法暂停消费
   	if (!subscriptions.isFetchable(partitionRecords.partition)) {
   		log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",
   		partitionRecords.partition);
   	} else if (partitionRecords.nextFetchOffset == position) {
   		// 获取该 tp 对应的records,并更新 partitionRecords 的fetchOffset(用于判断是否顺序)
   		List<ConsumerRecord<K, V>> partRecords = partitionRecords.fetchRecords(maxRecords);
   		long nextOffset = partitionRecords.nextFetchOffset;
   		log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
   		"position to {}", position,partitionRecords.partition, nextOffset);
   		// 更新消费的到 offset( the fetch position)
   		subscriptions.position(partitionRecords.partition,nextOffset);
   		// 获取 Lag(即 position与 hw 之间差值),hw 为 null 时,才返回null
   		Long partitionLag = subscriptions.partitionLag(partitionRecords.partition, isolationLevel);
   		if (partitionLag != null)
   		this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag);
   		return partRecords;
   	} else {
   		log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
   		partitionRecords.partition,partitionRecords.nextFetchOffset, position);
   	}
   }
   partitionRecords.drain();
   return emptyList();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

# 4.5.4.2.4 fetcher.sendFetches()

只要订阅的 topic-partition list 没有未处理的 fetch 请求,就发送对这个 topic-partition 的 fetch 请求,在真正发送时,还是会按 node 级别去发送,leader 是同一个 node 的 topic-partition 会合成一个请求去发送;

// 向订阅的所有 partition (只要该 leader 暂时没有拉取请求)所在 leader 发送 fetch请求
public int sendFetches() {
   // 1. 创建 Fetch Request
   Map<Node, FetchRequest.Builder> fetchRequestMap =createFetchRequests();
   for (Map.Entry<Node, FetchRequest.Builder> fetchEntry :fetchRequestMap.entrySet()) {
   	final FetchRequest.Builder request = fetchEntry.getValue();
   	final Node fetchTarget = fetchEntry.getKey();
   	log.debug("Sending {} fetch for partitions {} to broker {}",isolationLevel, request.fetchData().keySet(),
   	fetchTarget);
   	// 2 发送 Fetch Request
   	client.send(fetchTarget, request)
   	.addListener(new RequestFutureListener<ClientResponse>() {
   		@Override
   		public void onSuccess(ClientResponse resp) {
   			FetchResponse response = (FetchResponse) resp.responseBody();
   			if (!matchesRequestedPartitions(request,response)) {
   				log.warn("Ignoring fetch response containing partitions {} since it does not match " +
   				"the requested partitions {}",response.responseData().keySet(),
   				request.fetchData().keySet());
   				return;
   			}
   			Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
   			FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
   			for (Map.Entry<TopicPartition,FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
   				TopicPartition partition = entry.getKey();
   				long fetchOffset =request.fetchData().get(partition).fetchOffset;
   				FetchResponse.PartitionData fetchData =entry.getValue();
   				log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
   				isolationLevel, fetchOffset,partition, fetchData);
   				completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
   				resp.requestHeader().apiVersion()));
   			}
   			sensors.fetchLatency.record(resp.requestLatencyMs());
   		}
   		@Override
   		public void onFailure(RuntimeException e) {
   			log.debug("Fetch request {} to {} failed",request.fetchData(), fetchTarget, e);
   		}
   	}
   	);
   }
   return fetchRequestMap.size();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  1. createFetchRequests():为订阅的所有 topic-partition list 创建 fetch 请求(只要该topic-partition 没有还在处理的请求),创建的 fetch 请求依然是按照 node 级别创建的;

  2. client.send():发送 fetch 请求,并设置相应的 Listener,请求处理成功的话,就加入到completedFetches 中,在加入这个 completedFetches 集合时,是按照 topic-partition 级别去加入,这样也就方便了后续的处理。

从这里可以看出,在每次发送 fetch 请求时,都会向所有可发送的 topic-partition 发送 fetch 请求,调用一次 fetcher.sendFetches,拉取到的数据,可需要多次 pollOnce 循环才能处理完,因为Fetcher 线程是在后台运行,这也保证了尽可能少地阻塞用户的处理线程,因为如果 Fetcher 中没有可处理的数据,用户的线程是会阻塞在 poll 方法中的

# 4.5.4.2.5 client.poll()

调用底层 NetworkClient 提供的接口去发送相应的请求;

# 4.5.4.2.6 coordinator.needRejoin()

如果当前实例分配的 topic-partition 列表发送了变化,那么这个 consumer group 就需要进行rebalance

# 4.5.5 自动提交

最简单的提交方式是让悄费者自动提交偏移量。如果enable.auto.commit被设为 true,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由auto.commit.interval.ms 控制,默认值是 5s。与消费者里的其他东西 一样,自动提交也是在轮询(poll() )里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那 么就会提交从上一次轮询返回的偏移量。

不过,这种简便的方式也会带来一些问题,来看一下下面的例子:

假设我们仍然使用默认的 5s提交时间间隔,在最近一次提交之后的 3s发生了再均衡,再 均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无也完全避免的

# 4.5.6 手动提交

# 4.5.6.1 同步提交

取消自动提交,把 auto.commit.offset设为 false,让应用程序决定何时提交 偏 移量。使用commitSync()提交偏移量最简单也最可靠。这个 API会提交由 poll() 方法返回 的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常

while (true) {
   // 消息拉取
   ConsumerRecords<String, String> records = consumer.poll(100);
   for (ConsumerRecord<String, String> record : records) {
   	System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),record.key(), record.value());
   }
   // 处理完成单次消息以后,ᨀ交当前的offset,如果ᨀ交失败就抛出异常
   consumer.commitSync();
}
1
2
3
4
5
6
7
8
9

# 4.5.6.2 异步提交

同步提交有一个不足之处,在 broker对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。我们可以通过降低提交频率来提升吞吐量,但如果发生了再均衡, 会增加重复消息的数量。这个时候可以使用异步提交 API。我们只管发送提交请求,无需等待 broker的响应。

while (true) {
   // 消息拉取
   ConsumerRecords<String, String> records = consumer.poll(100);
   for (ConsumerRecord<String, String> record : records) {
   	System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());
   }
   // 异步ᨀ交
   consumer.commitAsync((offsets, exception) -> {
   	exception.printStackTrace();
   	System.out.println(offsets.size());
   }
   );
}
1
2
3
4
5
6
7
8
9
10
11
12
13
上次更新: 2025/04/03, 11:07:08
源码剖析-Producer生产者流程
源码剖析-消息存储机制

← 源码剖析-Producer生产者流程 源码剖析-消息存储机制→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式