源码剖析-Consumer消费者流程
# 4.5 Kafka源码剖析之Consumer消费者流程
# 4.5.1 Consumer示例
KafkaConsumer
消费者的根本目的是从Kafka服务端拉取消息,并交给业务逻辑进行处理。
开发人员不必关心与Kafka服务端之间网络连接的管理、心跳检测、请求超时重试等底层操作
也不必关心订阅Topic的分区数量、分区Leader副本的网络拓扑以及消费组的Rebalance等细节,另外还提供了自动提交offset的功能。
案例:
public static void main(String[] args) throws InterruptedException {
// 是否自动ᨀ交
Boolean autoCommit = false;
// 是否异步ᨀ交
Boolean isSync = true;
Properties props = new Properties();
// kafka地址,列表格式为host1:port1,host2:port2,…,无需添加所有的集群地址,kafka会根据ᨀ供的地址发现其他的地址(建议多ᨀ供几个,以防ᨀ供的服务器关闭)
props.put("bootstrap.servers", "localhost:9092");
// 消费组
props.put("group.id", "test");
// 开启自动ᨀ交offset
props.put("enable.auto.commit", autoCommit.toString());
// 1s自动ᨀ交
props.put("auto.commit.interval.ms", "1000");
// 消费者和群组协调器的最大心跳时间,如果超过该时间则认为该消费者已经死亡或者故障,需要踢出消费者组
props.put("session.timeout.ms", "60000");
// 一次poll间隔最大时间
props.put("max.poll.interval.ms", "1000");
// 当消费者读取偏移量无效的情况下,需要重置消费起始位置,默认为latest(从消费者启动后生成的记录),另外一个选项值是 earliest,将从有效的最小位移位置开始消费
props.put("auto.offset.reset", "latest");
// consumer端一次拉取数据的最大字节数
props.put("fetch.max.bytes", "1024000");
// key序列化方式
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// value序列化方式
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "lagou_edu";
// 订阅topic列表
consumer.subscribe(Arrays.asList(topic));
while (true) {
// 消息拉取
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());
}
if (!autoCommit) {
if (isSync) {
// 处理完成单次消息以后,ᨀ交当前的offset,如果失败会一直重试直至成功
consumer.commitSync();
} else {
// 异步ᨀ交
consumer.commitAsync((offsets, exception) -> {
exception.printStackTrace();
System.out.println(offsets.size());
}
);
}
}
TimeUnit.SECONDS.sleep(3);
}
}
Kafka服务端并不会记录消费者的消费位置,而是由消费者自己决定如何保存如何记录其消费的offset。在Kafka服务端中添加了一个名为“__consumer_offsets"的内部topic来保存消费者提交的offset,当出现消费者上、下线时会触发Consumer Group进行Rebalance操作,对分区进行重新分配,待Rebalance操作完成后。消费者就可以读取该topic中记录的offset,并从此offset位置继续消费。当然,使用该topic记录消费者的offset只是默认选项,开发人员可以根据业务需求将offset记录在别的存储中。
在消费者消费消息的过程中,提交offset的时机非常重要,因为它决定了消费者故障重启后的消费位置。在上面的示例中,我们通过将 enable.auto.commit 选项设置为true可以起到自动提交offset的功能, auto.commit.interval.ms 选项则设置了自动提交的时间间隔。每次在调用KafkaConsumer.poll() 方法时都会检测是否需要自动提交,并提交上次 poll() 方法返回的最后一个消息的offset。为了避免消息丢失,建议poll()方法之前要处理完上次poll()方法拉取的全部消息。
KafkaConsumer中还提供了两个手动提交offset的方法,分别是 commitSync() 和 commitAsync() ,它们都可以指定提交的offset值,区别在于前者是同步提交,后者是异步提交。
# 4.5.2 KafkaConsumer实例化
了解了 KafkaConsumer 的基本使用,开始深入了解 KafkaConsumer 原理和实现,先看一下构造方法核心逻辑
private KafkaConsumer(ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
try {
// 获取client.id,如果为空则默认生成一个,默认:consumer-1
String clientId =config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (clientId.isEmpty())
clientId = "consumer-" +CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
// 获取消费组名
String groupId =config.getString(ConsumerConfig.GROUP_ID_CONFIG);
LogContext logContext = new LogContext("[Consumer clientId=" +clientId + ", groupId=" + groupId + "] ");
this.log = logContext.logger(getClass());
log.debug("Initializing the Kafka consumer");
this.requestTimeoutMs =config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
int sessionTimeOutMs =config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
int fetchMaxWaitMs =config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
if (this.requestTimeoutMs <= sessionTimeOutMs ||this.requestTimeoutMs <= fetchMaxWaitMs)
throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
this.time = Time.SYSTEM;
// 与生产者逻辑相同
Map<String, String> metricsTags =Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricsTags);
List<MetricsReporter> reporters =config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG
,
MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs =config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
// 消费者拦截器
// load interceptors and make sure they get clientId
Map<String, Object> userProvidedConfigs = config.originals();
userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG,clientId);
List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs,false)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
// key反序列化
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.keyDeserializer.configure(config.originals(), true);
} else {
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
this.keyDeserializer = keyDeserializer;
}
// value反序列化
if (valueDeserializer == null) {
this.valueDeserializer =config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
,
Deserializer.class);
this.valueDeserializer.configure(config.originals(),false);
} else {
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
this.valueDeserializer = valueDeserializer;
}
ClusterResourceListeners clusterResourceListeners =configureClusterResourceListeners(keyDeserializer, valueDeserializer,reporters, interceptorList);
this.metadata = new Metadata(retryBackoffMs,config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
true, false, clusterResourceListeners);
List<InetSocketAddress> addresses =ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
String metricGrpPrefix = "consumer";
ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
ChannelBuilder channelBuilder =ClientUtils.createChannelBuilder(config);
// 事务隔离级别
IsolationLevel isolationLevel = IsolationLevel.valueOf(
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics,metricsRegistry.fetcherMetrics);
// 网络组件
NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),metrics, time, metricGrpPrefix, channelBuilder, logContext),
this.metadata,
clientId,
100, // a fixed large enough value will suffice for max in-flight requests
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
time,
true,
new ApiVersions(),
throttleTimeSensor,
logContext);
// 客户端
this.client = new ConsumerNetworkClient(
logContext,
netClient,
metadata,
time,
retryBackoffMs,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
// offset重置策略,默认是自动ᨀ交
OffsetResetStrategy offsetResetStrategy =OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(offsetResetStrategy);
this.assignors = config.getConfiguredInstances(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
PartitionAssignor.class);
// offset协调者
this.coordinator = new ConsumerCoordinator(logContext,
this.client,
groupId,
config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
assignors,
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
this.time,
retryBackoffMs,
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
// 拉取器
this.fetcher = new Fetcher<>(
logContext,
this.client,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
this.keyDeserializer,
this.valueDeserializer,
this.metadata,
this.subscriptions,
metrics,
metricsRegistry.fetcherMetrics,
this.time,
this.retryBackoffMs,
isolationLevel);
// 打印用户设置,但是没有使用的配置项
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka consumer initialized");
}
catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
close(0, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka consumer",t);
}
}
初始化参数配置
- client.id、group.id、消费者拦截器、key/value序列化、事务隔离级别
初始化网络客户端 NetworkClient
初始化消费者网络客户端 ConsumerNetworkClient
初始化offset提交策略,默认自动提交
初始化消费者协调器 ConsumerCoordinator
初始化拉取器 Fetcher
# 4.5.3 订阅Topic
下面我们先来看一下subscribe方法都有哪些逻辑:
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
// 轻量级锁
acquireAndEnsureOpen();
try {
if (topics == null) {
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
} else if (topics.isEmpty()) {
// topics为空,则开始取消订阅的逻辑
this.unsubscribe();
} else {
// topic合法性判断,包含null或者空字符串直接抛异常
for (String topic : topics) {
if (topic == null || topic.trim().isEmpty())
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
// 如果没有消费协调者直接抛异常
throwIfNoAssignorsConfigured();
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
// 开始订阅
this.subscriptions.subscribe(new HashSet<>(topics), listener);
// 更新元数据,如果metadata当前不包括所有的topics则标记强制更新
metadata.setTopics(subscriptions.groupSubscription());
}
}
finally {
release();
}
}
public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
if (listener == null)
throw new IllegalArgumentException("RebalanceListener cannot be null");
// 按照指定的Topic名字进行订阅,自动分配分区
setSubscriptionType(SubscriptionType.AUTO_TOPICS);
// 监听
this.listener = listener;
// 修改订阅信息
changeSubscription(topics);
}
private void changeSubscription(Set<String> topicsToSubscribe) {
if (!this.subscription.equals(topicsToSubscribe)) {
// 如果使用AUTO_TOPICS或AUTO_PARTITION模式,则使用此集合记录所有订阅的Topic
this.subscription = topicsToSubscribe;
// Consumer Group中会选一个Leader,Leader会使用这个集合记录Consumer Group中所有消费者订阅的Topic,而其他的Follower的这个集合只会保存自身订阅的Topic
this.groupSubscription.addAll(topicsToSubscribe);
}
}
- KafkaConsumer不是线程安全类,开启轻量级锁,topics为空抛异常,topics是空集合开始取消订阅,再次判断topics集合中是否有非法数据,判断消费者协调者是否为空。开始订阅对应topic。listener默认为 NoOpConsumerRebalanceListener ,一个空操作
轻量级锁:分别记录了当前使用KafkaConsumer的线程id和重入次数,KafkaConsumer的acquire()和release()方法实现了一个”轻量级锁“,它并非真正的锁,仅是检测是否有多线程并发操作KafkaConsumer而已
每一个KafkaConsumer实例内部都拥有一个SubscriptionState对象,subscribe内部调用了subscribe方法,subscribe方法订阅信息记录到 SubscriptionState ,多次订阅会覆盖旧数据。
更新metadata,判断如果metadata中不包含当前groupSubscription,开始标记更新(后面会有更新的逻辑),并且消费者侧的topic不会过期
# 4.5.4 消息消费过程
下面KafkaConsumer的核心方法poll是如何拉取消息的,先来看一下下面的代码:
# 4.5.4.1 poll
public ConsumerRecords<K, V> poll(long timeout) {
// 使用轻量级锁检测kafkaConsumer是否被其他线程使用
acquireAndEnsureOpen();
try {
// 超时时间小于0抛异常
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
// 订阅类型为NONE抛异常,表示当前消费者没有订阅任何topic或者没有分配分区
if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
// poll for new data until the timeout expires
long start = time.milliseconds();
long remaining = timeout;
do {
// 核心方法,拉取消息
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
// 如果拉取到了消息,发送一次消息拉取的请求,不会阻塞不会被中断
// 在返回数据之前,发送下次的 fetch 请求,避免用户在下次获取数据时线程 block
if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
client.pollNoWakeup();
// 经过拦截器处理后返回
if (this.interceptors == null)
return new ConsumerRecords<>(records); else
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
long elapsed = time.milliseconds() - start;
// 拉取超时就结束
remaining = timeout - elapsed;
}
while (remaining > 0);
return ConsumerRecords.empty();
}
finally {
release();
}
}
使用轻量级锁检测kafkaConsumer是否被其他线程使用
检查超时时间是否小于0,小于0抛出异常,停止消费
检查这个 consumer 是否订阅的相应的 topic-partition
调用 pollOnce() 方法获取相应的 records
在返回获取的 records 前,发送下一次的 fetch 请求,避免用户在下次请求时线程 block 在pollOnce() 方法中
如果在给定的时间(timeout)内获取不到可用的 records,返回空数据
这里可以看出,poll 方法的真正实现是在 pollOnce 方法中,poll 方法通过 pollOnce 方法获取可用的数据
# 4.5.4.2 pollOnce
// 除了获取新数据外,还会做一些必要的 offset-commit和reset-offset的操作
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
client.maybeTriggerWakeup();
// 1. 获取 GroupCoordinator 地址并连接、加入 Group、sync Group、自动commit, join 及 sync 期间 group 会进行 rebalance
coordinator.poll(time.milliseconds(), timeout);
// 2. 更新订阅的 topic-partition 的 offset(如果订阅的 topic-partition list 没有有效的 offset 的情况下)
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
// 3. 获取 fetcher 已经拉取到的数据
Map<TopicPartition, List<ConsumerRecord<K, V>>> records =fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
// 4. 发送 fetch 请求,会从多个 topic-partition 拉取数据(只要对应的 topic-partition 没有未完成的请求)
fetcher.sendFetches();
long now = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(now),timeout);
// 5. 调用 poll 方法发送请求(底层发送请求的接口)
client.poll(pollTimeout, now, new PollCondition() {
@Override
public boolean shouldBlock() {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
}
}
);
// 6. 如果 group 需要 rebalance,直接返回空数据,这样更快地让 group 进行稳定状态
if (coordinator.needRejoin())
return Collections.emptyMap();
// 获取到请求的结果
return fetcher.fetchedRecords();
}
pollOnce 可以简单分为6步来看,其作用分别如下:
# 4.5.4.2.1 coordinator.poll()
获取 GroupCoordinator 的地址,并建立相应 tcp 连接,发送 join-group、sync-group,之后才真正加入到了一个 group 中,这时会获取其要消费的 topic-partition 列表,如果设置了自动 commit,也会在这一步进行 commit。总之,对于一个新建的 group,group 状态将会从 Empty –>PreparingRebalance –> AwaiSync –> Stable
;
获取 GroupCoordinator 的地址,并建立相应 tcp 连接;
发送 join-group 请求,然后 group 将会进行 rebalance;
发送 sync-group 请求,之后才正在加入到了一个 group 中,这时会通过请求获取其要消费的topic-partition 列表;
如果设置了自动 commit,也会在这一步进行 commit offset
# 4.5.4.2.2 updateFetchPositions()
这个方法主要是用来更新这个 consumer 实例订阅的 topic-partition 列表的 fetch-offset 信息。目的就是为了获取其订阅的每个 topic-partition 对应的 position,这样 Fetcher 才知道从哪个 offset 开始去拉取这个 topic-partition 的数据
private void updateFetchPositions(Set<TopicPartition> partitions) {
// 先重置那些调用 seekToBegin 和 seekToEnd 的 offset 的 tp,设置其 the fetch position 的 offset
fetcher.resetOffsetsIfNeeded(partitions);
if (!subscriptions.hasAllFetchPositions(partitions)) {
// 获取所有分配 tp 的 offset, 即 committed offset, 更新到TopicPartitionState 中的 committed offset 中
coordinator.refreshCommittedOffsetsIfNeeded();
// 如果 the fetch position 值无效,则将上步获取的 committed offset 设置为 the fetch position
fetcher.updateFetchPositions(partitions);
}
}
在 Fetcher 中,这个 consumer 实例订阅的每个 topic-partition 都会有一个对应的TopicPartitionState 对象,在这个对象中会记录以下这些内容:
private static class TopicPartitionState {
// Fetcher 下次去拉取时的 offset,Fecher 在拉取时需要知道这个值
private Long position;
// last consumed position
// 最后一次获取的高水位标记
private Long highWatermark;
// the high watermark from last fetch
private Long lastStableOffset;
// consumer 已经处理完的最新一条消息的 offset,consumer 主动调用 offset-commit 时会更新这个值;
private OffsetAndMetadata committed;
// last committed position
// 是否暂停
private boolean paused;
// whether this partition has been paused by the user
// 这 topic-partition offset 重置的策略,重置之后,这个策略就会改为 null,防止再次操作
private OffsetResetStrategy resetStrategy;
// the strategy to use if the offset needs resetting
}
# 4.5.4.2.3 fetcher.fetchedRecords()
返回其 fetched records,并更新其 fetch-position offset,只有在 offset-commit 时(自动commit 时,是在第一步实现的),才会更新其 committed offset;
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
// 在 max.poll.records 中设置单词最大的拉取条数
int recordsRemaining = maxPollRecords;
try {
while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isFetched) {
// 从队列中获取但不移除此队列的头;如果此队列为空,返回null
CompletedFetch completedFetch = completedFetches.peek();
if (completedFetch == null) break;
// 获取下一个要处理的 nextInLineRecords
nextInLineRecords = parseCompletedFetch(completedFetch);
completedFetches.poll();
} else {
// 拉取records,更新 position
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
TopicPartition partition = nextInLineRecords.partition;
if (!records.isEmpty()) {
List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
if (currentRecords == null) {
fetched.put(partition, records);
} else {
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
fetched.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
}
}
catch (KafkaException e) {
if (fetched.isEmpty())
throw e;
}
return fetched;
}
private List<ConsumerRecord<K, V>> fetchRecords(PartitionRecords partitionRecords, int maxRecords) {
if (!subscriptions.isAssigned(partitionRecords.partition)) {
log.debug("Not returning fetched records for partition {} since it is no longer assigned",
partitionRecords.partition);
} else {
long position = subscriptions.position(partitionRecords.partition);
// 这个 tp 不能来消费了,比如调用 pause方法暂停消费
if (!subscriptions.isFetchable(partitionRecords.partition)) {
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",
partitionRecords.partition);
} else if (partitionRecords.nextFetchOffset == position) {
// 获取该 tp 对应的records,并更新 partitionRecords 的fetchOffset(用于判断是否顺序)
List<ConsumerRecord<K, V>> partRecords = partitionRecords.fetchRecords(maxRecords);
long nextOffset = partitionRecords.nextFetchOffset;
log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
"position to {}", position,partitionRecords.partition, nextOffset);
// 更新消费的到 offset( the fetch position)
subscriptions.position(partitionRecords.partition,nextOffset);
// 获取 Lag(即 position与 hw 之间差值),hw 为 null 时,才返回null
Long partitionLag = subscriptions.partitionLag(partitionRecords.partition, isolationLevel);
if (partitionLag != null)
this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag);
return partRecords;
} else {
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
partitionRecords.partition,partitionRecords.nextFetchOffset, position);
}
}
partitionRecords.drain();
return emptyList();
}
# 4.5.4.2.4 fetcher.sendFetches()
只要订阅的 topic-partition list 没有未处理的 fetch 请求,就发送对这个 topic-partition 的 fetch 请求,在真正发送时,还是会按 node 级别去发送,leader 是同一个 node 的 topic-partition 会合成一个请求去发送;
// 向订阅的所有 partition (只要该 leader 暂时没有拉取请求)所在 leader 发送 fetch请求
public int sendFetches() {
// 1. 创建 Fetch Request
Map<Node, FetchRequest.Builder> fetchRequestMap =createFetchRequests();
for (Map.Entry<Node, FetchRequest.Builder> fetchEntry :fetchRequestMap.entrySet()) {
final FetchRequest.Builder request = fetchEntry.getValue();
final Node fetchTarget = fetchEntry.getKey();
log.debug("Sending {} fetch for partitions {} to broker {}",isolationLevel, request.fetchData().keySet(),
fetchTarget);
// 2 发送 Fetch Request
client.send(fetchTarget, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
FetchResponse response = (FetchResponse) resp.responseBody();
if (!matchesRequestedPartitions(request,response)) {
log.warn("Ignoring fetch response containing partitions {} since it does not match " +
"the requested partitions {}",response.responseData().keySet(),
request.fetchData().keySet());
return;
}
Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
for (Map.Entry<TopicPartition,FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset =request.fetchData().get(partition).fetchOffset;
FetchResponse.PartitionData fetchData =entry.getValue();
log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
isolationLevel, fetchOffset,partition, fetchData);
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion()));
}
sensors.fetchLatency.record(resp.requestLatencyMs());
}
@Override
public void onFailure(RuntimeException e) {
log.debug("Fetch request {} to {} failed",request.fetchData(), fetchTarget, e);
}
}
);
}
return fetchRequestMap.size();
}
createFetchRequests():为订阅的所有 topic-partition list 创建 fetch 请求(只要该topic-partition 没有还在处理的请求),创建的 fetch 请求依然是按照 node 级别创建的;
client.send():发送 fetch 请求,并设置相应的 Listener,请求处理成功的话,就加入到completedFetches 中,在加入这个 completedFetches 集合时,是按照 topic-partition 级别去加入,这样也就方便了后续的处理。
从这里可以看出,在每次发送 fetch 请求时,都会向所有可发送的 topic-partition 发送 fetch 请求,调用一次 fetcher.sendFetches,拉取到的数据,可需要多次 pollOnce 循环才能处理完,因为Fetcher 线程是在后台运行,这也保证了尽可能少地阻塞用户的处理线程,因为如果 Fetcher 中没有可处理的数据,用户的线程是会阻塞在 poll 方法中的
# 4.5.4.2.5 client.poll()
调用底层 NetworkClient 提供的接口去发送相应的请求;
# 4.5.4.2.6 coordinator.needRejoin()
如果当前实例分配的 topic-partition 列表发送了变化,那么这个 consumer group 就需要进行rebalance
# 4.5.5 自动提交
最简单的提交方式是让悄费者自动提交偏移量。如果enable.auto.commit被设为 true,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由auto.commit.interval.ms
控制,默认值是 5s。与消费者里的其他东西 一样,自动提交也是在轮询(poll() )里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那 么就会提交从上一次轮询返回的偏移量。
不过,这种简便的方式也会带来一些问题,来看一下下面的例子:
假设我们仍然使用默认的 5s提交时间间隔,在最近一次提交之后的 3s发生了再均衡,再 均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无也完全避免的
# 4.5.6 手动提交
# 4.5.6.1 同步提交
取消自动提交,把 auto.commit.offset
设为 false,让应用程序决定何时提交 偏 移量。使用commitSync()
提交偏移量最简单也最可靠。这个 API会提交由 poll() 方法返回 的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常
while (true) {
// 消息拉取
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),record.key(), record.value());
}
// 处理完成单次消息以后,ᨀ交当前的offset,如果ᨀ交失败就抛出异常
consumer.commitSync();
}
# 4.5.6.2 异步提交
同步提交有一个不足之处,在 broker对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。我们可以通过降低提交频率来提升吞吐量,但如果发生了再均衡, 会增加重复消息的数量。这个时候可以使用异步提交 API。我们只管发送提交请求,无需等待 broker的响应。
while (true) {
// 消息拉取
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(), record.key(), record.value());
}
// 异步ᨀ交
consumer.commitAsync((offsets, exception) -> {
exception.printStackTrace();
System.out.println(offsets.size());
}
);
}