跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

      • Kafka基础使用
      • Kafka深入

        • Kafka架构与实战
        • Kafka高级特性解析
        • Kafka高级特性-消费者
          • 2.2.1 概念入门
            • 2.2.1.1 消费者、消费组
            • 2.2.1.2 心跳机制
          • 2.2.2 消息接收
            • 2.2.2.1 必要参数配置
            • 2.2.2.2 订阅
            • 2.2.2.2.1 主题和分区
            • 2.2.2.3 反序列化
            • 2.2.2.3.1 自定义反序列化
            • 2.2.2.4 位移提交
            • 2.2.2.4.1 自动提交
            • 2.2.2.4.2 异步提交
            • 2.2.2.5 消费者位移管理
            • 2.2.2.6 再均衡
            • 2.2.2.7 消费者拦截器
            • 2.2.2.8 消费者参数配置补充
          • 2.2.3 消费组管理
        • Kafka高级特性-主题
        • Kafka高级特性-分区
        • Kafka高级特性-物理存储
        • Kafka高级特性-稳定性
        • Kafka高级特性-延时队列
        • Kafka高级特性-重试队列
        • Kafka集群与运维
        • Kafka源码剖析
        • 源码剖析-Broker启动流程
        • 源码剖析-Topic创建流程
        • 源码剖析-Producer生产者流程
        • 源码剖析-Consumer消费者流程
        • 源码剖析-消息存储机制
        • 源码剖析-SocketServer
        • 源码剖析-KafkaRequestHandlerPool
        • 源码剖析-LogManager
        • 源码剖析-ReplicaManager
        • 源码剖析-OffsetManager
        • 源码剖析-KafkaApis
        • 源码剖析-KafkaController
        • 源码剖析-KafkaHealthcheck
        • 源码剖析-DynamicConfigManager
        • 源码剖析-分区消费模式
        • 源码剖析-组消费模式
        • 源码剖析-同步发送模式
        • 源码剖析-异步发送模式
    • RabbitMQ

    • RocketMQ

  • Zookeeper

  • java组件
  • 消息队列
  • Kafka
  • Kafka深入
Revin
2023-07-31
目录

Kafka高级特性-消费者

# 2.2 消费者

# 2.2.1 概念入门

# 2.2.1.1 消费者、消费组

消费者从订阅的主题消费消息,消费消息的偏移量保存在Kafka的名字是 __consumer_offsets 的主题中。

消费者还可以将自己的偏移量存储到Zookeeper,需要设置offset.storage=zookeeper。

推荐使用Kafka存储消费者的偏移量。因为Zookeeper不适合高并发。

多个从同一个主题消费的消费者可以加入到一个消费组中。

消费组中的消费者共享group_id。

configs.put("group.id", "xxx");
1

group_id一般设置为应用的逻辑名称。比如多个订单处理程序组成一个消费组,可以设置group_id为"order_process"。

group_id通过消费者的配置指定:group.id=xxxxx

消费组均衡地给消费者分配分区,每个分区只由消费组中一个消费者消费。

Kafka_Page55_001

一个拥有四个分区的主题,包含一个消费者的消费组。

此时,消费组中的消费者消费主题中的所有分区。并且没有重复的可能。

如果在消费组中添加一个消费者2,则每个消费者分别从两个分区接收消息。

Kafka_Page56_001

如果消费组有四个消费者,则每个消费者可以分配到一个分区。

Kafka_Page56_002

如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任何消息。

Kafka_Page57_001

向消费组添加消费者是横向扩展消费能力的主要方式。

必要时,需要为主题创建大量分区,在负载增长时可以加入更多的消费者。但是不要让消费者的数量超过主题分区的数量。

Kafka_Page58_001

除了通过增加消费者来横向扩展单个应用的消费能力之外,经常出现多个应用程序从同一个主题消费的情况。

此时,每个应用都可以获取到所有的消息。只要保证每个应用都有自己的消费组,就可以让它们获取到主题所有的消息。

横向扩展消费者和消费组不会对性能造成负面影响。

为每个需要获取一个或多个主题全部消息的应用创建一个消费组,然后向消费组添加消费者来横向扩展消费能力和应用的处理能力,则每个消费者只处理一部分消息。

# 2.2.1.2 心跳机制

Kafka_Page59_001

消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区。

Kafka_Page59_002

由于broker宕机,主题X的分区3宕机,此时分区3没有Leader副本,触发再平衡,消费者4没有对应的主题分区,则消费者4闲置。

Kafka_Page60_001

Kafka 的心跳是 Kafka Consumer 和 Broker 之间的健康检查,只有当 Broker Coordinator 正常时,Consumer 才会发送心跳。

Consumer 和 Rebalance 相关的 2 个配置参数:

image-20230731112206088

broker 端,sessionTimeoutMs 参数

broker 处理心跳的逻辑在 GroupCoordinator 类中:如果心跳超期, broker coordinator 会把消费者从 group 中移除,并触发 rebalance。

private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata)
{
    // complete current heartbeat expectation
    member.latestHeartbeat = time.milliseconds()
    val memberKey = MemberKey(member.groupId, member.memberId)
    heartbeatPurgatory.checkAndComplete(memberKey)
    // reschedule the next heartbeat expiration deadline
    // 计算心跳截止时刻
    val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
    val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
    heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
}
// 心跳过期
def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long)
{
    group.inLock
    {
        if(!shouldKeepMemberAlive(member, heartbeatDeadline))
        {
            info(s "Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
            removeMemberAndUpdateGroup(group, member)
        }
    }
}
private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) = member.awaitingJoinCallback != null || member.awaitingSyncCallback != null || member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

consumer 端:sessionTimeoutMs,rebalanceTimeoutMs 参数

如果客户端发现心跳超期,客户端会标记 coordinator 为不可用,并阻塞心跳线程;如果超过了poll 消息的间隔超过了 rebalanceTimeoutMs,则 consumer 告知 broker 主动离开消费组,也会触发rebalance

org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread

if(coordinatorUnknown())
{
    if(findCoordinatorFuture != null || lookupCoordinator().failed())
        // the immediate future check ensures that we backoff properly in the case that no
        // brokers are available to connect to.
        AbstractCoordinator.this.wait(retryBackoffMs);
}
else if(heartbeat.sessionTimeoutExpired(now))
{
    // the session timeout has expired without seeing a successful heartbeat, so we should
    // probably make sure the coordinator is still healthy.
    markCoordinatorUnknown();
}
else if(heartbeat.pollTimeoutExpired(now))
{
    // the poll timeout has expired, which means that the foreground thread has stalled
    // in between calls to poll(), so we explicitly leave the group.
    maybeLeaveGroup();
}
else if(!heartbeat.shouldHeartbeat(now))
{
    // poll again after waiting for the retry backoff in case the heartbeat failed or the
    // coordinator disconnected
    AbstractCoordinator.this.wait(retryBackoffMs);
}
else
{
    heartbeat.sentHeartbeat(now);
    sendHeartbeatRequest().addListener(new RequestFutureListener < Void > ()
    {
        @Override
        public void onSuccess(Void value)
        {
            synchronized(AbstractCoordinator.this)
            {
                heartbeat.receiveHeartbeat(time.milliseconds());
            }
        }
        @Override
        public void onFailure(RuntimeException e)
        {
            synchronized(AbstractCoordinator.this)
            {
                if(e instanceof RebalanceInProgressException)
                {
                    // it is valid to continue heartbeating while the group is rebalancing. This
                    // ensures that the coordinator keeps the member in the group for as long
                    // as the duration of the rebalance timeout. If we stop sending heartbeats,
                    // however, then the session timeout may expire before we can rejoin.
                    heartbeat.receiveHeartbeat(time.milliseconds());
                }
                else
                {
                    heartbeat.failHeartbeat();
                    // wake up the thread if it's sleeping to reschedule the heartbeat
                    AbstractCoordinator.this.notify();
                }
            }
        }
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

# 2.2.2 消息接收

# 2.2.2.1 必要参数配置

image-20230731112555539

# 2.2.2.2 订阅

# 2.2.2.2.1 主题和分区

  • Topic,Kafka用于分类管理消息的逻辑单元,类似与MySQL的数据库。

  • Partition,是Kafka下数据存储的基本单元,这个是物理上的概念。同一个topic的数据,会被分散的存储到多个partition中,这些partition可以在同一台机器上,也可以是在多台机器上。优势在于:有利于水平扩展,避免单台机器在磁盘空间和性能上的限制,同时可以通过复制来增加数据冗余性,提高容灾能力。为了做到均匀分布,通常partition的数量通常是Broker Server数量的整数倍。

  • Consumer Group,同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。保证一个消费组获取到特定主题的全部的消息。在消费组内部,若干个消费者消费主题分区的消息,消费组可以保证一个主题的每个分区只被消费组中的一个消费者消费。

Kafka_Page64_001

consumer 采用 pull 模式从 broker 中读取数据。

采用 pull 模式,consumer 可自主控制消费消息的速率, 可以自己控制消费方式(批量消费/逐条消费),还可以选择不同的提交方式从而实现不同的传输语义。

consumer.subscribe("tp_demo_01,tp_demo_02")
1

# 2.2.2.3 反序列化

Kafka的broker中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进行反序列化处理,然后才能交给用户程序消费处理。

消费者的反序列化器包括key的和value的反序列化器。

key.deserializer
value.deserializer
IntegerDeserializer
StringDeserializer
1
2
3
4

需要实现org.apache.kafka.common.serialization.Deserializer<T> 接口。

消费者从订阅的主题拉取消息:

consumer.poll(3_000);
1

在Fetcher类中,对拉取到的消息首先进行反序列化处理。

Kafka_Page65_001

Kafka默认提供了几个反序列化的实现:

org.apache.kafka.common.serialization包下包含了这几个实现:

Kafka_Page65_002

Kafka_Page66_001

Kafka_Page66_002

Kafka_Page67_001

Kafka_Page67_002

Kafka_Page68_001

Kafka_Page68_002

Kafka_Page69_001

Kafka_Page69_002

# 2.2.2.3.1 自定义反序列化

自定义反序列化类,需要实现org.apache.kafka.common.serialization.Deserializer<T> 接口。

com.lagou.kafka.demo.deserializer.UserDeserializer

package com.lagou.kafka.demo.deserializer;
import com.lagou.kafka.demo.entity.User;
import org.apache.kafka.common.serialization.Deserializer;
import java.nio.ByteBuffer;
import java.util.Map;
public class UserDeserializer implements Deserializer < User >
{
    @Override
    public void configure(Map < String, ? > configs, boolean isKey)
    {}
    @Override
    public User deserialize(String topic, byte[] data)
    {
        ByteBuffer allocate = ByteBuffer.allocate(data.length);
        allocate.put(data);
        allocate.flip();
        int userId = allocate.getInt();
        int length = allocate.getInt();
        System.out.println(length);
        String username = new String(data, 8, length);
        return new User(userId, username);
    }
    @Override
    public void close()
    {}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

com.lagou.kafka.demo.consumer.MyConsumer

package com.lagou.kafka.demo.consumer;
import com.lagou.kafka.demo.deserializer.UserDeserializer;
import com.lagou.kafka.demo.entity.User;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
public class MyConsumer
{
    public static void main(String[] args)
    {
        Map < String, Object > configs = new HashMap < > ();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1");
        KafkaConsumer < String, User > consumer = new KafkaConsumer < String, User > (configs);
        consumer.subscribe(Collections.singleton("tp_user_01"));
        ConsumerRecords < String, User > records = consumer.poll(Long.MAX_VALUE);
        records.forEach(new Consumer < ConsumerRecord < String, User >> ()
        {
            @Override
            public void accept(ConsumerRecord < String, User > record)
            {
                System.out.println(record.value());
            }
        });
        // 关闭消费者
        consumer.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# 2.2.2.4 位移提交

  1. Consumer需要向Kafka记录自己的位移数据,这个汇报过程称为 ᨀ交位移(Committing Offsets)

  2. Consumer 需要为分配给它的每个分区提交各自的位移数据

  3. 位移提交的由Consumer端负责的,Kafka只负责保管。__consumer_offsets

  4. 位移提交分为自动提交和手动提交

  5. 位移提交分为同步提交和异步提交

# 2.2.2.4.1 自动提交

Kafka Consumer 后台提交

  • 开启自动提交: enable.auto.commit=true

  • 配置自动提交间隔:Consumer端: auto.commit.interval.ms ,默认 5s

Map < String, Object > configs = new HashMap < > ();
configs.put("bootstrap.servers", "node1:9092");
configs.put("group.id", "mygrp");
// 设置偏移量自动ᨀ交。自动ᨀ交是默认值。这里做示例。
configs.put("enable.auto.commit", "true");
// 偏移量自动ᨀ交的时间间隔
configs.put("auto.commit.interval.ms", "3000");
configs.put("key.deserializer", StringDeserializer.class);
configs.put("value.deserializer", StringDeserializer.class);
KafkaConsumer < String, String > consumer = new KafkaConsumer < String, String > (configs);
consumer.subscribe(Collections.singleton("tp_demo_01"));
while(true)
{
    ConsumerRecords < String, String > records = consumer.poll(100);
    for(ConsumerRecord < String, String > record: records)
    {
        System.out.println(record.topic() + "\t" + record.partition() + "\t" + record.offset() + "\t" + record.key() + "\t" + record.value());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  • 自动提交位移的顺序

    • 配置 enable.auto.commit = true

    • Kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息

    • 因此自动提交不会出现消息丢失,但会 重复消费

  • 重复消费举例

    • Consumer 每 5s 提交 offset

    • 假设提交 offset 后的 3s 发生了 Rebalance

    • Rebalance 之后的所有 Consumer 从上一次提交的 offset 处继续消费

    • 因此 Rebalance 发生前 3s 的消息会被重复消费

# 2.2.2.4.2 异步提交

  • 使用 KafkaConsumer#commitSync():会提交 KafkaConsumer#poll() 返回的最新 offset

  • 该方法为同步操作,等待直到 offset 被成功提交才返回

while(true)
{
    ConsumerRecords < String, String > records = consumer.poll(Duration.ofSeconds(1));
    process(records); // 处理消息
    try
    {
        consumer.commitSync();
    }
    catch (CommitFailedException e)
    {
        handle(e); // 处理ᨀ交失败异常
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
  • commitSync 在处理完所有消息之后

  • 手动同步提交可以控制offset提交的时机和频率

  • 手动同步提交会:

    • 调用 commitSync 时,Consumer 处于阻塞状态,直到 Broker 返回结果

    • 会影响 TPS

    • 可以选择拉长提交间隔,但有以下问题

      • 会导致 Consumer 的提交频率下降
      • Consumer 重启后,会有更多的消息被消费

异步提交

KafkaConsumer#commitAsync()

while(true)
{
    ConsumerRecords < String, String > records = consumer.poll(3_000);
    process(records); // 处理消息
    consumer.commitAsync((offsets, exception) - >
    {
        if(exception != null)
        {
            handle(exception);
        }
    });
}
1
2
3
4
5
6
7
8
9
10
11
12

commitAsync出现问题不会自动重试

处理方式:

try
{
    while(true)
    {
        ConsumerRecords < String, String > records = consumer.poll(Duration.ofSeconds(1));
        process(records); // 处理消息
        commitAysnc(); // 使用异步ᨀ交规避阻塞
    }
}
catch (Exception e)
{
    handle(e); // 处理异常
}
finally
{
    try
    {
        consumer.commitSync(); // 最后一次ᨀ交使用同步阻塞式ᨀ交
    }
    finally
    {
        consumer.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 2.2.2.5 消费者位移管理

Kafka中,消费者根据消息的位移顺序消费消息。

消费者的位移由消费者管理,可以存储于zookeeper中,也可以存储于Kafka主题__consumer_offsets中。

Kafka提供了消费者API,让消费者可以管理自己的位移。

API如下:KafkaConsumer<K, V>

image-20230731113814356

image-20230731113825428

  1. 准备数据
# 生成消息文件
[root@node1 ~]# for i in `seq 60`; do echo "hello lagou $i" >> nm.txt; done
# 创建主题,三个分区,每个分区一个副本
[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_demo_01 --partitions 3 --replication-factor 1
# 将消息生产到主题中
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_demo_01 < nm.txt
1
2
3
4
5
6
  1. API实战
package com.lagou.kafka.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
/**
 * # 生成消息文件
 * [root@node1 ~]# for i in `seq 60`; do echo "hello lagou $i" >> nm.txt;done
 * # 创建主题,三个分区,每个分区一个副本
 * [root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_demo_01 --partitions 3 --replication-factor 1
 * # 将消息生产到主题中
 * [root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_demo_01 < nm.txt
 *
 * 消费者位移管理
 */
public class MyConsumerMgr1
{
    public static void main(String[] args)
    {
        Map < String, Object > configs = new HashMap < > ();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        KafkaConsumer < String, String > consumer = new KafkaConsumer < String, String > (configs);
        /**
         * 给当前消费者手动分配一系列主题分区。
         * 手动分配分区不支持增量分配,如果先前有分配分区,则该操作会覆盖之前的分配。
         * 如果给出的主题分区是空的,则等价于调用unsubscribe方法。
         * 手动分配主题分区的方法不使用消费组管理功能。当消费组成员变了,或者集群或主题的元数据改变了,不会触发分区分配的再平衡。
         *
         * 手动分区分配assign(Collection)不能和自动分区分配subscribe(Collection,ConsumerRebalanceListener)一起使用。
         * 如果启用了自动ᨀ交偏移量,则在新的分区分配替换旧的分区分配之前,会对旧的分区分配中的消费偏移量进行异步ᨀ交。
         *
         */
        // consumer.assign(Arrays.asList(new TopicPartition("tp_demo_01",0)));
        //
        // Set<TopicPartition> assignment = consumer.assignment();
        // for (TopicPartition topicPartition : assignment) {
        // System.out.println(topicPartition);
        // }
        // 获取对用户授权的所有主题分区元数据。该方法会对服务器发起远程调用。
        // Map<String, List<PartitionInfo>> stringListMap =consumer.listTopics();
        //
        // stringListMap.forEach((k, v) -> {
        // System.out.println("主题:" + k);
        // v.forEach(info -> {
        // System.out.println(info);
        // });
        // });
        // Set<String> strings = consumer.listTopics().keySet();
        //
        // strings.forEach(topicName -> {
        // System.out.println(topicName);
        // });
        // List<PartitionInfo> partitionInfos =consumer.partitionsFor("tp_demo_01");
        // for (PartitionInfo partitionInfo : partitionInfos) {
        // Node leader = partitionInfo.leader();
        // System.out.println(leader);
        // System.out.println(partitionInfo);
        // // 当前分区在线副本
        // Node[] nodes = partitionInfo.inSyncReplicas();
        // // 当前分区下线副本
        // Node[] nodes1 = partitionInfo.offlineReplicas();
        // }
        // 手动分配主题分区给当前消费者
        consumer.assign(Arrays.asList(new TopicPartition("tp_demo_01", 0), new TopicPartition("tp_demo_01", 1), new TopicPartition("tp_demo_01", 2)));
        // 列出当前主题分配的所有主题分区
        // Set<TopicPartition> assignment = consumer.assignment();
        // assignment.forEach(k -> {
        // System.out.println(k);
        // });
        // 对于给定的主题分区,列出它们第一个消息的偏移量。
        // 注意,如果指定的分区不存在,该方法可能会永远阻塞。
        // 该方法不改变分区的当前消费者偏移量。
        // Map<TopicPartition, Long> topicPartitionLongMap =consumer.beginningOffsets(consumer.assignment());
        //
        // topicPartitionLongMap.forEach((k, v) -> {
        // System.out.println("主题:" + k.topic() + "\t分区:" +k.partition() + "偏移量\t" + v);
        // });
        // 将偏移量移动到每个给定分区的最后一个。
        // 该方法延迟执行,只有当调用过poll方法或position方法之后才可以使用。
        // 如果没有指定分区,则将当前消费者分配的所有分区的消费者偏移量移动到最后。
        // 如果设置了隔离级别为:isolation.level=read_committed,则会将分区的消费偏移量移动到
        // 最后一个稳定的偏移量,即下一个要消费的消息现在还是未ᨀ交状态的事务消息。
        // consumer.seekToEnd(consumer.assignment());
        // 将给定主题分区的消费偏移量移动到指定的偏移量,即当前消费者下一条要消费的消息偏移量。
        // 若该方法多次调用,则最后一次的覆盖前面的。
        // 如果在消费中间随意使用,可能会丢失数据。
        // consumer.seek(new TopicPartition("tp_demo_01", 1), 10);
        //
        // // 检查指定主题分区的消费偏移量
        // long position = consumer.position(new TopicPartition("tp_demo_01", 1));
        // System.out.println(position);
        consumer.seekToEnd(Arrays.asList(new TopicPartition("tp_demo_01", 1)));
        // 检查指定主题分区的消费偏移量
        long position = consumer.position(new TopicPartition("tp_demo_01", 1));
        System.out.println(position);
        // 关闭生产者
        consumer.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104

# 2.2.2.6 再均衡

重平衡可以说是kafka为人诟病最多的一个点了。

重平衡其实就是一个协议,它规定了如何让消费者组下的所有消费者来分配topic中的每一个分区。

比如一个topic有100个分区,一个消费者组内有20个消费者,在协调者的控制下让组内每一个消费者分配到5个分区,这个分配的过程就是重平衡。

重平衡的触发条件主要有三个:

  1. 消费者组内成员发生变更,这个变更包括了增加和减少消费者,比如消费者宕机退出消费组。
  2. 主题的分区数发生变更,kafka目前只支持增加分区,当增加的时候就会触发重平衡
  3. 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡

Kafka_Page78_001

消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区。

Kafka_Page79_001

由于broker宕机,主题X的分区3宕机,此时分区3没有Leader副本,触发再平衡,消费者4没有对应的主题分区,则消费者4闲置。

Kafka_Page79_002

主题增加分区,需要主题分区和消费组进行再均衡。

Kafka_Page80_001

由于使用正则表达式订阅主题,当增加的主题匹配正则表达式的时候,也要进行再均衡。

Kafka_Page81_001

为什么说重平衡为人诟病呢?**因为重平衡过程中,消费者无法从kafka消费消息,这对kafka的TPS 影响极大,而如果kafka集内节点较多,比如数百个,那重平衡可能会耗时极多。数分钟到数小时都有 可能,而这段时间kafka基本处于不可用状态。**所以在实际环境中,应该尽量避免重平衡发生。

避免重平衡

要说完全避免重平衡,是不可能,因为你无法完全保证消费者不会故障。而消费者故障其实也是最常见的引发重平衡的地方,所以我们需要保证尽力避免消费者故障。

而其他几种触发重平衡的方式,增加分区,或是增加订阅的主题,抑或是增加消费者,更多的是主动控制。

如果消费者真正挂掉了,就没办法了,但实际中,会有一些情况,kafka错误地认为一个正常的消费者已经挂掉了,我们要的就是避免这样的情况出现。

首先要知道哪些情况会出现错误判断挂掉的情况。

在分布式系统中,通常是通过心跳来维持分布式系统的,kafka也不例外。

在分布式系统中,由于网络问题你不清楚没接收到心跳,是因为对方真正挂了还是只是因为负载过

重没来得及发生心跳或是网络堵塞。所以一般会约定一个时间,超时即判定对方挂了。而在kafka消费者场景中,session.timout.ms参数就是规定这个超时时间是多少。

还有一个参数,heartbeat.interval.ms**,这个参数控制发送心跳的频率**,频率越高越不容易被误判,但也会消耗更多资源。

此外,还有最后一个参数,max.poll.interval.ms,消费者poll数据后,需要一些处理,再进行拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。也就是说,拉取,然后处理,这个处理的时间不能超过 max.poll.interval.ms 这个参数的值。这个参数的默认值是5分钟,而如果消费者接收到数据后会执行耗时的操作,则应该将其设置得大一些。

三个参数,

session.timout.ms控制心跳超时时间,

heartbeat.interval.ms控制心跳发送频率,

max.poll.interval.ms控制poll的间隔。

这里给出一个相对较为合理的配置,如下:

  • session.timout.ms:设置为6s

  • heartbeat.interval.ms:设置2s

  • max.poll.interval.ms:推荐为消费者处理消息最长耗时再加1分钟

# 2.2.2.7 消费者拦截器

消费者在拉取了分区消息之后,要首先经过反序列化器对key和value进行反序列化处理。

处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应用程序进行处理。

Kafka_Page82_001

消费端定义消息拦截器,需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor<K, V>接口。

  1. 一个可插拔接口,允许拦截甚至更改消费者接收到的消息。首要的用例在于将第三方组件引入消费者应用程序,用于定制的监控、日志处理等。

  2. 该接口的实现类通过configre方法获取消费者配置的属性,如果消费者配置中没有指定clientID,还可以获取KafkaConsumer生成的clientId。获取的这个配置是跟其他拦截器共享的,需要保证不会在各个拦截器之间产生冲突。

  3. ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。

  4. ConsumerInterceptor回调发生在org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)方法同一个线程。

该接口中有如下方法:

package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
public interface ConsumerInterceptor < K, V > extends Configurable
{
    /**
     *
     * 该方法在poll方法返回之前调用。调用结束后poll方法就返回消息了。
     *
     * 该方法可以修改消费者消息,返回新的消息。拦截器可以过滤收到的消息或生成新的消息。
     * 如果有多个拦截器,则该方法按照KafkaConsumer的configs中配置的顺序调用。
     *
     * @param records 由上个拦截器返回的由客户端消费的消息。
     */
    public ConsumerRecords < K, V > onConsume(ConsumerRecords < K, V > records);
    /**
     * 当消费者ᨀ交偏移量时,调用该方法。
     * 该方法抛出的任何异常调用者都会忽略。
     */
    public void onCommit(Map < TopicPartition, OffsetAndMetadata > offsets);
    public void close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

代码实现:

package com.lagou.kafka.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class MyConsumer
{
    public static void main(String[] args)
    {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mygrp");
        // props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "myclient");
        // 如果在kafka中找不到当前消费者的偏移量,则设置为最旧的
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 配置拦截器
        // One -> Two -> Three,接收消息和发送偏移量确认都是这个顺序
        props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.lagou.kafka.demo.interceptor.OneInterceptor" + ",com.lagou.kafka.demo.interceptor.TwoInterceptor" + ",com.lagou.kafka.demo.interceptor.ThreeInterceptor");
        KafkaConsumer < String, String > consumer = new KafkaConsumer < String,
            String > (props);
        // 订阅主题
        consumer.subscribe(Collections.singleton("tp_demo_01"));
        while(true)
        {
            final ConsumerRecords < String, String > records = consumer.poll(3_000);
            records.forEach(record - >
            {
                System.out.println(record.topic() + "\t" + record.partition() + "\t" + record.offset() + "\t" + record.key() + "\t" + record.value());
            });
            // consumer.commitAsync();
            // consumer.commitSync();
        }
        // consumer.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.lagou.kafka.demo.interceptor;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
public class OneInterceptor implements ConsumerInterceptor < String, String >
{
    @Override
    public ConsumerRecords < String,
    String > onConsume(ConsumerRecords < String, String > records)
    {
        // poll方法返回结果之前最后要调用的方法
        System.out.println("One -- 开始");
        // 消息不做处理,直接返回
        return records;
    }
    @Override
    public void onCommit(Map < TopicPartition, OffsetAndMetadata > offsets)
    {
        // 消费者ᨀ交偏移量的时候,经过该方法
        System.out.println("One -- 结束");
    }
    @Override
    public void close()
    {
        // 用于关闭该拦截器用到的资源,如打开的文件,连接的数据库等
    }
    @Override
    public void configure(Map < String, ? > configs)
    {
        // 用于获取消费者的设置参数
        configs.forEach((k, v) - >
        {
            System.out.println(k + "\t" + v);
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.lagou.kafka.demo.interceptor;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
public class TwoInterceptor implements ConsumerInterceptor < String, String >
{
    @Override
    public ConsumerRecords < String,
    String > onConsume(ConsumerRecords < String, String > records)
    {
        // poll方法返回结果之前最后要调用的方法
        System.out.println("Two -- 开始");
        // 消息不做处理,直接返回
        return records;
    }
    @Override
    public void onCommit(Map < TopicPartition, OffsetAndMetadata > offsets)
    {
        // 消费者ᨀ交偏移量的时候,经过该方法
        System.out.println("Two -- 结束");
    }
    @Override
    public void close()
    {
        // 用于关闭该拦截器用到的资源,如打开的文件,连接的数据库等
    }
    @Override
    public void configure(Map < String, ? > configs)
    {
        // 用于获取消费者的设置参数
        configs.forEach((k, v) - >
        {
            System.out.println(k + "\t" + v);
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.lagou.kafka.demo.interceptor;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
public class ThreeInterceptor implements ConsumerInterceptor < String, String >
{
    @Override
    public ConsumerRecords < String,
    String > onConsume(ConsumerRecords < String, String > records)
    {
        // poll方法返回结果之前最后要调用的方法
        System.out.println("Three -- 开始");
        // 消息不做处理,直接返回
        return records;
    }
    @Override
    public void onCommit(Map < TopicPartition, OffsetAndMetadata > offsets)
    {
        // 消费者ᨀ交偏移量的时候,经过该方法
        System.out.println("Three -- 结束");
    }
    @Override
    public void close()
    {
        // 用于关闭该拦截器用到的资源,如打开的文件,连接的数据库等
    }
    @Override
    public void configure(Map < String, ? > configs)
    {
        // 用于获取消费者的设置参数
        configs.forEach((k, v) - >
        {
            System.out.println(k + "\t" + v);
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# 2.2.2.8 消费者参数配置补充

image-20230731115859644

image-20230731123918818

image-20230731123931408

# 2.2.3 消费组管理

一、消费者组(Consumer Group)

1什么是消费者组

consumer group是kafka提供的可扩展且具有容错性的消费者机制。

三个特性:

  1. 消费组有一个或多个消费者,消费者可以是一个进程,也可以是一个线程
  2. group.id是一个字符串,唯一标识一个消费组
  3. 消费组订阅的主题每个分区只能分配给消费组一个消费者。

2消费者位移(consumer position)

消费者在消费的过程中记录已消费的数据,即消费位移(offset)信息。

每个消费组保存自己的位移信息,那么只需要简单的一个整数表示位置就够了;同时可以引入checkpoint机制定期持久化。

3位移管理(offset management)

3.1自动VS手动

Kafka默认定期自动提交位移( enable.auto.commit = true ),也手动提交位移。另外kafka会定期把group消费情况保存起来,做成一个offset map,如下图所示:

Kafka_Page91_001

3.2位移提交

位移是提交到Kafka中的 __consumer_offsets 主题。 __consumer_offsets 中的消息保存了每个消费组某一时刻提交的offset信息。

[root@node1 __consumer_offsets-0]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config /opt/kafka_2.12-1.0.2/config/consumer.properties --from-beginning | head
1

Kafka_Page91_002

上图中,标出来的,表示消费组为 test-consumer-group ,消费的主题为 __consumer_offsets ,消费的分区是4,偏移量为5。

__consumers_offsets 主题配置了compact策略,使得它总是能够保存最新的位移信息,既控制了该topic总体的日志容量,也能实现保存最新offset的目的。

4再谈再均衡

4.1什么是再均衡?

再均衡(Rebalance)本质上是一种协议,规定了一个消费组中所有消费者如何达成一致来分配订阅主题的每个分区。

比如某个消费组有20个消费组,订阅了一个具有100个分区的主题。正常情况下,Kafka平均会为每个消费者分配5个分区。这个分配的过程就叫再均衡。

4.2什么时候再均衡?

再均衡的触发条件:

  1. 组成员发生变更(新消费者加入消费组组、已有消费者主动离开或崩溃了)
  2. 订阅主题数发生变更。如果正则表达式进行订阅,则新建匹配正则表达式的主题触发再均衡。
  3. 订阅主题的分区数发生变更

4.3如何进行组内分区分配?

三种分配策略:RangeAssignor和RoundRobinAssignor以及StickyAssignor。后面讲。

4.4谁来执行再均衡和消费组管理?

Kafka提供了一个角色:Group Coordinator来执行对于消费组的管理。

Group Coordinator——每个消费组分配一个消费组协调器用于组管理和位移管理。当消费组的第一个消费者启动的时候,它会去和Kafka Broker确定谁是它们组的组协调器。之后该消费组内所有消费者和该组协调器协调通信。

4.5如何确定coordinator?

两步:

  1. 确定消费组位移信息写入 __consumers_offsets 的哪个分区。具体计算公式:
__consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount) 
1

注意:groupMetadataTopicPartitionCount由 offsets.topic.num.partitions 指定,默认是50个分区。

  1. 该分区leader所在的broker就是组协调器。

4.6 Rebalance Generation

它表示Rebalance之后主题分区到消费组中消费者映射关系的一个版本,主要是用于保护消费组,隔离无效偏移量提交的。如上一个版本的消费者无法提交位移到新版本的消费组中,因为映射关系变了,你消费的或许已经不是原来的那个分区了。每次group进行Rebalance之后,Generation号都会加1,表示消费组和分区的映射关系到了一个新版本,如下图所示: Generation 1时group有3个成员,随后成员2退出组,消费组协调器触发Rebalance,消费组进入Generation 2,之后成员4加入,再次触发Rebalance,消费组进入Generation 3.

Kafka_Page93_001

4.7协议(protocol)

kafka提供了5个协议来处理与消费组协调相关的问题:

  • Heartbeat请求:consumer需要定期给组协调器发送心跳来表明自己还活着

  • LeaveGroup请求:主动告诉组协调器我要离开消费组

  • SyncGroup请求:消费组Leader把分配方案告诉组内所有成员

  • JoinGroup请求:成员请求加入组

  • DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用

组协调器在再均衡的时候主要用到了前面4种请求。

4.8 liveness

消费者如何向消费组协调器证明自己还活着? 通过定时向消费组协调器发送Heartbeat请求。如果超过了设定的超时时间,那么协调器认为该消费者已经挂了。一旦协调器认为某个消费者挂了,那么它就会开启新一轮再均衡,并且在当前其他消费者的心跳响应中添加“REBALANCE_IN_PROGRESS”,告诉其他消费者:重新分配分区。

4.9再均衡过程

再均衡分为2步:Join和Sync

  1. Join, 加入组。所有成员都向消费组协调器发送JoinGroup请求,请求加入消费组。一旦所有成员都发送了JoinGroup请求,协调i器从中选择一个消费者担任Leader的角色,并把组成员信息以及订阅信息发给Leader。

  2. Sync,Leader开始分配消费方案,即哪个消费者负责消费哪些主题的哪些分区。一旦完成分配,Leader会将这个方案封装进SyncGroup请求中发给消费组协调器,非Leader也会发SyncGroup请求,只是内容为空。消费组协调器接收到分配方案之后会把方案塞进SyncGroup的response中发给各个消费者。

Kafka_Page94_001

注意:在协调器收集到所有成员请求前,它会把已收到请求放入一个叫purgatory(炼狱)的地方。然后是分发分配方案的过程,即SyncGroup请求:

Kafka_Page94_002

注意:消费组的分区分配方案在客户端执行。Kafka交给客户端可以有更好的灵活性。Kafka默认提供三种分配策略:range和round-robin和sticky。可以通过消费者的参数:partition.assignment.strategy来实现自己分配策略。

4.10消费组状态机

消费组组协调器根据状态机对消费组做不同的处理:

Kafka_Page95_001

说明:

  1. Dead:组内已经没有任何成员的最终状态,组的元数据也已经被组协调器移除了。这种状态响应各种请求都是一个response: UNKNOWN_MEMBER_ID

  2. Empty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求

  3. PreparingRebalance:组准备开启新的rebalance,等待成员加入

  4. AwaitingSync:正在等待leader consumer将分配方案传给各个成员

  5. Stable:再均衡完成,可以开始消费。

上次更新: 2025/04/03, 11:07:08
Kafka高级特性解析
Kafka高级特性-主题

← Kafka高级特性解析 Kafka高级特性-主题→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式