跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

      • Kafka基础使用
      • Kafka深入

        • Kafka架构与实战
        • Kafka高级特性解析
        • Kafka高级特性-消费者
        • Kafka高级特性-主题
          • 2.3.1 管理
            • 2.3.1.1 创建主题
            • 2.3.1.2 查看主题
            • 2.3.1.3 修改主题
            • 2.3.1.4 删除主题
          • 2.3.2 增加分区
          • 2.3.3 分区副本的分配-了解
          • 2.3.4 必要参数配置
          • 2.3.5 KafkaAdminClient应用
          • 2.3.6 偏移量管理
        • Kafka高级特性-分区
        • Kafka高级特性-物理存储
        • Kafka高级特性-稳定性
        • Kafka高级特性-延时队列
        • Kafka高级特性-重试队列
        • Kafka集群与运维
        • Kafka源码剖析
        • 源码剖析-Broker启动流程
        • 源码剖析-Topic创建流程
        • 源码剖析-Producer生产者流程
        • 源码剖析-Consumer消费者流程
        • 源码剖析-消息存储机制
        • 源码剖析-SocketServer
        • 源码剖析-KafkaRequestHandlerPool
        • 源码剖析-LogManager
        • 源码剖析-ReplicaManager
        • 源码剖析-OffsetManager
        • 源码剖析-KafkaApis
        • 源码剖析-KafkaController
        • 源码剖析-KafkaHealthcheck
        • 源码剖析-DynamicConfigManager
        • 源码剖析-分区消费模式
        • 源码剖析-组消费模式
        • 源码剖析-同步发送模式
        • 源码剖析-异步发送模式
    • RabbitMQ

    • RocketMQ

  • Zookeeper

  • java组件
  • 消息队列
  • Kafka
  • Kafka深入
Revin
2023-07-31
目录

Kafka高级特性-主题

# 2.3 主题

# 2.3.1 管理

使用kafka-topics.sh脚本:

image-20230731124848608

image-20230731124908994

主题中可以使用的参数定义:

image-20230731124936563

image-20230731124950346

# 2.3.1.1 创建主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_x --partitions 1 --replication-factor 1

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_02 --partitions 3 --replication-factor 1 --config max.message.bytes=1048576 --config segment.bytes=10485760
1
2
3

# 2.3.1.2 查看主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --list
kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_x
kafka-topics.sh --zookeeper localhost:2181/myKafka --topics-with-overrides --describe
1
2
3

# 2.3.1.3 修改主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_01 --partitions 2 --replication-factor 1
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config max.message.bytes=1048576
kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_test_01
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config segment.bytes=10485760
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --delete-config max.message.bytes --topic topic_test_01
1
2
3
4
5

# 2.3.1.4 删除主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_x
1

Kafka_Page100_001

给主题添加删除的标记:

Kafka_Page100_002

要过一段时间删除。

# 2.3.2 增加分区

通过命令行工具操作,主题的分区只能增加,不能减少。否则报错:

ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic myTop1 currently has 2 partitions, 1 would not be an increase.
1

通过--alter修改主题的分区数,增加分区。

kafka-topics.sh --zookeeper localhost/myKafka --alter --topic myTop1 --partitions 2
1

# 2.3.3 分区副本的分配-了解

副本分配的三个目标:

  1. 均衡地将副本分散于各个broker上
  2. 对于某个broker上分配的分区,它的其他副本在其他broker上
  3. 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker。

在不考虑机架信息的情况下:

  1. 第一个副本分区通过轮询的方式挑选一个broker,进行分配。该轮询从broker列表的随机位置进行轮询。

  2. 其余副本通过增加偏移进行分配。

分配案例:

image-20230731125502971

Kafka_Page101_001

考虑到机架信息,首先为每个机架创建一个broker列表。如:

三个机架(rack1,rack2,rack3),六个broker(0,1,2,3,4,5)

brokerID -> rack

  • 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1"

rack1:0,5

rack2:3,4

rack3:1,2

这broker列表为rack1的0,rack2的3,rack3的1,rack1的5,rack2的4,rack3的2 即:0, 3, 1, 5, 4, 2

通过简单的轮询将分区分配给不同机架上的broker:

Kafka_Page102_001

每个分区副本在分配的时候在上一个分区第一个副本开始分配的位置右移一位。

六个broker,六个分区,正好最后一个分区的第一个副本分配的位置是该broker列表的最后一个。

如果有更多的分区需要分配,则算法开始对follower副本进行移位分配。

这主要是为了避免每次都得到相同的分配序列。

此时,如果有一个分区等待分配(分区6),这按照如下方式分配:

6 -> 0,4,2 (而不是像分区0那样重复0,3,1)

跟机架相关的副本分配中,永远在机架相关的broker列表中轮询地分配第一个副本。

其余的副本,倾向于机架上没有副本的broker进行副本分配,除非每个机架有一个副本。

然后其他的副本又通过轮询的方式分配给broker。

结果是,如果副本的个数大于等于机架数,保证每个机架最少有一个副本。

否则每个机架最多保有一个副本。

如果副本的个数和机架的个数相同,并且每个机架包含相同个数的broker,可以保证副本在机架和broker之间均匀分布。

Kafka_Page102_002

上图,tp_eagle_01主题的分区0分配信息:leader分区在broker1上,同步副本分区是1和2,也就是在broker1和broker2上的两个副本分区是同步副本分区,其中一个是leader分区。

# 2.3.4 必要参数配置

kafka-topics.sh --config xx=xx --config yy=yy
1

image-20230731125709518

# 2.3.5 KafkaAdminClient应用

说明

除了使用Kafka的bin目录下的脚本工具来管理Kafka,还可以使用管理Kafka的API将某些管理查看的功能集成到系统中。在Kafka0.11.0.0版本之前,可以通过kafka-core包(Kafka的服务端,采用Scala编写)中的AdminClient和AdminUtils来实现部分的集群管理操作。Kafka0.11.0.0之后,又多了一个AdminClient,在kafka-client包下,一个抽象类,具体的实现是org.apache.kafka.clients.admin.KafkaAdminClient。

功能与原理介绍

Kafka官网:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects。

KafkaAdminClient包含了一下几种功能(以Kafka1.0.2版本为准):

  1. 创建主题:

    createTopics(final Collection<NewTopic> newTopics, final CreateTopicsOptions options)
    
    1
  2. 删除主题:

    deleteTopics(final Collection<String> topicNames, DeleteTopicsOptions options)
    
    1
  3. 列出所有主题:

    listTopics(final ListTopicsOptions options)
    
    1
  4. 查询主题:

    1. describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options)
    
    1
  5. 查询集群信息:

    describeCluster(DescribeClusterOptions options)
    
    1
  6. 查询配置信息:

    describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options)
    
    1
  7. 修改配置信息:

    alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options)
    
    1
  8. 修改副本的日志目录:

    alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options)
    
    1
  9. 查询节点的日志目录信息:

    describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options)
    
    1
  10. 查询副本的日志目录信息:

    describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas,DescribeReplicaLogDirsOptions options)
    
    1
  11. 增加分区:

    createPartitions(Map<String, NewPartitions> newPartitions, final CreatePartitionsOptions options)
    
    1

其内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议。

用到的参数:

image-20230731130027881

image-20230731130038758

主要操作步骤:

客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送CreateTopicRequest请求。

客户端发送请求至Kafka Broker。

Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。

客户端接收相应的回执并进行解析处理。

和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和响应类的两个父类。

综上,如果要自定义实现一个功能,只需要三个步骤:

  1. 自定义XXXOptions;
  2. 自定义XXXResult返回值;
  3. 自定义Call,然后挑选合适的XXXRequest和XXXResponse来实现Call类中的3个抽象方法。
package com.lagou.kafka.demo;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class MyAdminClient
{
    private KafkaAdminClient client;
    @Before
    public void before()
    {
        Map < String, Object > conf = new HashMap < > ();
        conf.put("bootstrap.servers", "node1:9092");
        conf.put("client.id", "adminclient-1");
        client = (KafkaAdminClient) KafkaAdminClient.create(conf);
    }
    @After
    public void after()
    {
        client.close();
    }
    @Test
    public void testListTopics1() throws ExecutionException, InterruptedException
    {
        ListTopicsResult listTopicsResult = client.listTopics();
        // KafkaFuture<Collection<TopicListing>> listings =listTopicsResult.listings();
        // Collection<TopicListing> topicListings = listings.get();
        //
        // topicListings.forEach(new Consumer<TopicListing>() {
        // @Override
        // public void accept(TopicListing topicListing) {
        // boolean internal = topicListing.isInternal();
        // String name = topicListing.name();
        // String s = topicListing.toString();
        // System.out.println(s + "\t" + name + "\t" + internal);
        // }
        // });
        // KafkaFuture<Set<String>> names = listTopicsResult.names();
        // Set<String> strings = names.get();
        //
        // strings.forEach(name -> {
        // System.out.println(name);
        // });
        // KafkaFuture<Map<String, TopicListing>> mapKafkaFuture =listTopicsResult.namesToListings();
        // Map<String, TopicListing> stringTopicListingMap =mapKafkaFuture.get();
        //
        // stringTopicListingMap.forEach((k, v) -> {
        // System.out.println(k + "\t" + v);
        // });
        ListTopicsOptions options = new ListTopicsOptions();
        options.listInternal(false);
        options.timeoutMs(500);
        ListTopicsResult listTopicsResult1 = client.listTopics(options);
        Map < String, TopicListing > stringTopicListingMap = listTopicsResult1.namesToListings().get();
        stringTopicListingMap.forEach((k, v) - >
        {
            System.out.println(k + "\t" + v);
        });
        // 关闭管理客户端
        client.close();
    }
    @Test
    public void testCreateTopic() throws ExecutionException, InterruptedException
    {
        Map < String, String > configs = new HashMap < > ();
        configs.put("max.message.bytes", "1048576");
        configs.put("segment.bytes", "1048576000");
        NewTopic newTopic = new NewTopic("adm_tp_01", 2, (short) 1);
        newTopic.configs(configs);
        CreateTopicsResult topics = client.createTopics(Collections.singleton(newTopic));
        KafkaFuture < Void > all = topics.all();
        Void aVoid = all.get();
        System.out.println(aVoid);
    }
    @Test
    public void testDeleteTopic() throws ExecutionException, InterruptedException
    {
        DeleteTopicsOptions options = new DeleteTopicsOptions();
        options.timeoutMs(500);
        DeleteTopicsResult deleteResult = client.deleteTopics(Collections.singleton("adm_tp_01"), options);
        deleteResult.all().get();
    }
    @Test
    public void testAlterTopic() throws ExecutionException, InterruptedException
    {
        NewPartitions newPartitions = NewPartitions.increaseTo(5);
        Map < String, NewPartitions > newPartitionsMap = new HashMap < > ();
        newPartitionsMap.put("adm_tp_01", newPartitions);
        CreatePartitionsOptions option = new CreatePartitionsOptions();
        // Set to true if the request should be validated without creating new partitions.
        // 如果只是验证,而不创建分区,则设置为true
        // option.validateOnly(true);
        CreatePartitionsResult partitionsResult = client.createPartitions(newPartitionsMap, option);
        Void aVoid = partitionsResult.all().get();
    }
    @Test
    public void testDescribeTopics() throws ExecutionException, InterruptedException
    {
        DescribeTopicsOptions options = new DescribeTopicsOptions();
        options.timeoutMs(3000);
        DescribeTopicsResult topicsResult = client.describeTopics(Collections.singleton("adm_tp_01"), options);
        Map < String, TopicDescription > stringTopicDescriptionMap = topicsResult.all().get();
        stringTopicDescriptionMap.forEach((k, v) - >
        {
            System.out.println(k + "\t" + v);
            System.out.println("=======================================");
            System.out.println(k);
            boolean internal = v.isInternal();
            String name = v.name();
            List < TopicPartitionInfo > partitions = v.partitions();
            String partitionStr = Arrays.toString(partitions.toArray());
            System.out.println("内部的?" + internal);
            System.out.println("topic name = " + name);
            System.out.println("分区:" + partitionStr);
            partitions.forEach(partition - >
            {
                System.out.println(partition);
            });
        });
    }
    @Test
    public void testDescribeCluster() throws ExecutionException, InterruptedException
    {
        DescribeClusterResult describeClusterResult = client.describeCluster();
        KafkaFuture < String > stringKafkaFuture = describeClusterResult.clusterId();
        String s = stringKafkaFuture.get();
        System.out.println("cluster name = " + s);
        KafkaFuture < Node > controller = describeClusterResult.controller();
        Node node = controller.get();
        System.out.println("集群控制器:" + node);
        Collection < Node > nodes = describeClusterResult.nodes().get();
        nodes.forEach(node1 - >
        {
            System.out.println(node1);
        });
    }
    @Test
    public void testDescribeConfigs() throws ExecutionException, InterruptedException, TimeoutException
    {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
        DescribeConfigsResult describeConfigsResult = client.describeConfigs(Collections.singleton(configResource));
        Map < ConfigResource, Config > configMap = describeConfigsResult.all().get(15, TimeUnit.SECONDS);
        configMap.forEach(new BiConsumer < ConfigResource, Config > ()
        {
            @Override
            public void accept(ConfigResource configResource, Config config)
            {
                ConfigResource.Type type = configResource.type();
                String name = configResource.name();
                System.out.println("资源名称:" + name);
                Collection < ConfigEntry > entries = config.entries();
                entries.forEach(new Consumer < ConfigEntry > ()
                {
                    @Override
                    public void accept(ConfigEntry configEntry)
                    {
                        boolean aDefault = configEntry.isDefault();
                        boolean readOnly = configEntry.isReadOnly();
                        boolean sensitive = configEntry.isSensitive();
                        String name1 = configEntry.name();
                        String value = configEntry.value();
                        System.out.println("是否默认:" + aDefault + "\t是否只读?" + readOnly + "\t是否敏感?" + sensitive + "\t" + name1 + " --> " + value);
                    }
                });
                ConfigEntry retries = config.get("retries");
                if(retries != null)
                {
                    System.out.println(retries.name() + " -->" + retries.value());
                }
                else
                {
                    System.out.println("没有这个属性");
                }
            }
        });
    }
    @Test
    public void testAlterConfig() throws ExecutionException, InterruptedException
    {
        // 这里设置后,原来资源中不冲突的属性也会丢失,直接按照这里的配置设置
        Map < ConfigResource, Config > configMap = new HashMap < > ();
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "adm_tp_01");
        Config config = new Config(Collections.singleton(new ConfigEntry("segment.bytes", "1048576000")));
        configMap.put(resource, config);
        AlterConfigsResult alterConfigsResult = client.alterConfigs(configMap);
        Void aVoid = alterConfigsResult.all().get();
    }
    @Test
    public void testDescribeLogDirs() throws ExecutionException, InterruptedException
    {
        DescribeLogDirsOptions option = new DescribeLogDirsOptions();
        option.timeoutMs(1000);
        DescribeLogDirsResult describeLogDirsResult = client.describeLogDirs(Collections.singleton(0), option);
        Map < Integer, Map < String, DescribeLogDirsResponse.LogDirInfo >> integerMapMap = describeLogDirsResult.all().get();
        integerMapMap.forEach(new BiConsumer < Integer, Map < String, DescribeLogDirsResponse.LogDirInfo >> ()
        {
            @Override
            public void accept(Integer integer, Map < String, DescribeLogDirsResponse.LogDirInfo > stringLogDirInfoMap)
            {
                System.out.println("broker.id = " + integer);
                stringLogDirInfoMap.forEach(new BiConsumer < String, DescribeLogDirsResponse.LogDirInfo > ()
                {
                    @Override
                    public void accept(String s, DescribeLogDirsResponse.LogDirInfo logDirInfo)
                    {
                        System.out.println("log.dirs:" + s);
                        // 查看该broker上的主题/分区/偏移量等信息
                        // logDirInfo.replicaInfos.forEach(new BiConsumer<TopicPartition, DescribeLogDirsResponse.ReplicaInfo>() {
                        // @Override
                        // public void accept(TopicPartition topicPartition, DescribeLogDirsResponse.ReplicaInfo replicaInfo) {
                        // int partition =topicPartition.partition();
                        // String topic = topicPartition.topic();
                        // boolean isFuture = replicaInfo.isFuture;
                        // long offsetLag = replicaInfo.offsetLag;
                        // long size = replicaInfo.size;
                        // System.out.println("partition:" +partition + "\ttopic:" + topic
                        // + "\tisFuture:" + isFuture
                        // + "\toffsetLag:" + offsetLag
                        // + "\tsize:" + size);
                        // }
                        // });
                    }
                });
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238

# 2.3.6 偏移量管理

Kafka 1.0.2,__consumer_offsets主题中保存各个消费组的偏移量。

早期由zookeeper管理消费组的偏移量。

查询方法:

通过原生 kafka 提供的工具脚本进行查询。

工具脚本的位置与名称为 bin/kafka-consumer-groups.sh

首先运行脚本,查看帮助:

image-20230731135950833

image-20230731140010857

这里我们先编写一个生产者,消费者的例子:

我们先启动消费者,再启动生产者, 再通过 bin/kafka-consumer-groups.sh 进行消费偏移量查询,

由于kafka 消费者记录group的消费偏移量有两种方式 :

1)kafka 自维护 (新)

2)zookpeer 维护 (旧) ,已经逐渐被废弃

所以 ,脚本只查看由broker维护的,由zookeeper维护的可以将--bootstrap-server换成 --zookeeper 即可。

1**.查看有那些group ID正在进行消费:**

[root@node11 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
Note: This will not show information about old Zookeeper-based consumers.
group
1
2
3

Kafka_Page116_001

注意:

  1. 这里面是没有指定topic,查看的是所有topic消费者的group.id的列表。
  2. 注意:重名的group.id只会显示一次

2.查看指定group.id的消费者消费情况

[root@node11 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group group
Note: This will not show information about old Zookeeper-based consumers.

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
tp_demo_02 0 923 923 0 consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1 consumer-1
tp_demo_02 1 872 872 0 consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1 consumer-1
tp_demo_02 2 935 935 0 consumer-1-6d88cc72-1bf1-4ad7-8c6c-060d26dc1c49 /192.168.100.1 consumer-1
[root@node11 ~]#
1
2
3
4
5
6
7
8

如果消费者停止,查看偏移量信息:

Kafka_Page117_001

将偏移量设置为最早的:

Kafka_Page117_002

将偏移量设置为最新的:

Kafka_Page117_003

分别将指定主题的指定分区的偏移量向前移动10个消息:

Kafka_Page117_004

代码:

KafkaProducerSingleton.java

package com.lagou.kafka.demo.producer;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
public class KafkaProducerSingleton
{
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerSingleton.class);
    private static KafkaProducer < String, String > kafkaProducer;
    private Random random = new Random();
    private String topic;
    private int retry;
    private KafkaProducerSingleton()
    {}
    /**
     * 静态内部类
     *
     * @author tanjie
     */
    private static class LazyHandler
    {
        private static final KafkaProducerSingleton instance = new KafkaProducerSingleton();
    }
    /**
     * 单例模式,kafkaProducer是线程安全的,可以多线程共享一个实例
     * @return
     */
    public static final KafkaProducerSingleton getInstance()
    {
        return LazyHandler.instance;
    }
    /**
     * kafka生产者进行初始化
     *
     * @return KafkaProducer
     */
    public void init(String topic, int retry)
    {
        this.topic = topic;
        this.retry = retry;
        if(null == kafkaProducer)
        {
            Properties props = new Properties();
            props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
            props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.setProperty(ProducerConfig.ACKS_CONFIG, "1");
            kafkaProducer = new KafkaProducer < String, String > (props);
        }
    }
    /**
     * 通过kafkaProducer发送消息
     * @param message
     */
    public void sendKafkaMessage(final String message)
    {
        ProducerRecord < String, String > record = new ProducerRecord < String, String > (topic, random.nextInt(3), "", message);
        kafkaProducer.send(record, new Callback()
        {
            public void onCompletion(RecordMetadata recordMetadata, Exception exception)
            {
                if(null != exception)
                {
                    LOGGER.error("kafka发送消息失败:" + exception.getMessage(), exception);
                    retryKakfaMessage(message);
                }
            }
        });
    }
    /**
     * 当kafka消息发送失败后,重试
     *
     * @param retryMessage
     */
    private void retryKakfaMessage(final String retryMessage)
    {
        ProducerRecord < String, String > record = new ProducerRecord < String, String > (topic, random.nextInt(3), "", retryMessage);
        for(int i = 1; i <= retry; i++)
        {
            try
            {
                kafkaProducer.send(record);
                return;
            }
            catch (Exception e)
            {
                LOGGER.error("kafka发送消息失败:" + e.getMessage(), e);
                retryKakfaMessage(retryMessage);
            }
        }
    }
    /**
     * kafka实例销毁
     */
    public void close()
    {
        if(null != kafkaProducer)
        {
            kafkaProducer.close();
        }
    }
    public String getTopic()
    {
        return topic;
    }
    public void setTopic(String topic)
    {
        this.topic = topic;
    }
    public int getRetry()
    {
        return retry;
    }
    public void setRetry(int retry)
    {
        this.retry = retry;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121

ProducerHandler.java

package com.lagou.kafka.demo.producer;
public class ProducerHandler implements Runnable
{
    private String message;
    public ProducerHandler(String message)
    {
        this.message = message;
    }
    @Override
    public void run()
    {
        KafkaProducerSingleton kafkaProducerSingleton = KafkaProducerSingleton.getInstance();
        kafkaProducerSingleton.init("tp_demo_02", 3);
        int i = 0;
        while(true)
        {
            try
            {
                System.out.println("当前线程:" + Thread.currentThread().getName() + "\t获取的kafka实例:" + kafkaProducerSingleton);
                kafkaProducerSingleton.sendKafkaMessage("发送消息: " + message + " " + (++i));
                Thread.sleep(100);
            }
            catch (Exception e)
            {}
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

MyProducer.java

package com.lagou.kafka.demo.producer;
public class MyProducer
{
    public static void main(String[] args)
    {
        Thread thread = new Thread(new ProducerHandler("hello lagou "));
        thread.start();
    }
}
1
2
3
4
5
6
7
8
9

KafkaConsumerAuto.java

package com.lagou.kafka.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class KafkaConsumerAuto
{
    /**
     * kafka消费者不是线程安全的
     */
    private final KafkaConsumer < String, String > consumer;
    private ExecutorService executorService;
    public KafkaConsumerAuto()
    {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        // 关闭自动ᨀ交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put("auto.commit.interval.ms", "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer < String, String > (props);
        // 订阅主题
        consumer.subscribe(Collections.singleton("tp_demo_02"));
    }
    public void execute() throws InterruptedException
    {
        executorService = Executors.newFixedThreadPool(2);
        while(true)
        {
            ConsumerRecords < String, String > records = consumer.poll(2_000);
            if(null != records)
            {
                executorService.submit(new ConsumerThreadAuto(records, consumer));
            }
            Thread.sleep(1000);
        }
    }
    public void shutdown()
    {
        try
        {
            if(consumer != null)
            {
                consumer.close();
            }
            if(executorService != null)
            {
                executorService.shutdown();
            }
            if(!executorService.awaitTermination(10, TimeUnit.SECONDS))
            {
                System.out.println("关闭线程池超时。。。");
            }
        }
        catch (InterruptedException ex)
        {
            Thread.currentThread().interrupt();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

ConsumerThreadAuto.java

package com.lagou.kafka.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerThreadAuto implements Runnable
{
    private ConsumerRecords < String, String > records;
    private KafkaConsumer < String, String > consumer;
    public ConsumerThreadAuto(ConsumerRecords < String, String > records, KafkaConsumer < String, String > consumer)
    {
        this.records = records;
        this.consumer = consumer;
    }
    @Override
    public void run()
    {
        for(ConsumerRecord < String, String > record: records)
        {
            System.out.println("当前线程:" + Thread.currentThread() + "\t主题:" + record.topic() + "\t偏移量:" + record.offset() + "\t分区:" + record.partition() + "\t获取的消息:" + record.value());
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

ConsumerAutoMain.java

package com.lagou.kafka.demo.consumer;
public class ConsumerAutoMain
{
    public static void main(String[] args)
    {
        KafkaConsumerAuto kafka_consumerAuto = new KafkaConsumerAuto();
        try
        {
            kafka_consumerAuto.execute();
            Thread.sleep(20000);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        finally
        {
            kafka_consumerAuto.shutdown();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
上次更新: 2025/04/03, 11:07:08
Kafka高级特性-消费者
Kafka高级特性-分区

← Kafka高级特性-消费者 Kafka高级特性-分区→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式