跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

      • Kafka基础使用
      • Kafka深入

        • Kafka架构与实战
        • Kafka高级特性解析
          • 2.1.1 消息发送
            • 2.1.1.1 数据生产流程解析
            • 2.1.1.2 必要参数配置
            • 2.1.1.2.1 broker配置
            • 2.1.1.3 序列化器
            • 2.1.1.3.1 自定义序列化器
            • 2.1.1.4 分区器
            • 2.1.1.5 拦截器
          • 2.1.2 原理剖析
          • 2.1.3 生产者参数配置补充
        • Kafka高级特性-消费者
        • Kafka高级特性-主题
        • Kafka高级特性-分区
        • Kafka高级特性-物理存储
        • Kafka高级特性-稳定性
        • Kafka高级特性-延时队列
        • Kafka高级特性-重试队列
        • Kafka集群与运维
        • Kafka源码剖析
        • 源码剖析-Broker启动流程
        • 源码剖析-Topic创建流程
        • 源码剖析-Producer生产者流程
        • 源码剖析-Consumer消费者流程
        • 源码剖析-消息存储机制
        • 源码剖析-SocketServer
        • 源码剖析-KafkaRequestHandlerPool
        • 源码剖析-LogManager
        • 源码剖析-ReplicaManager
        • 源码剖析-OffsetManager
        • 源码剖析-KafkaApis
        • 源码剖析-KafkaController
        • 源码剖析-KafkaHealthcheck
        • 源码剖析-DynamicConfigManager
        • 源码剖析-分区消费模式
        • 源码剖析-组消费模式
        • 源码剖析-同步发送模式
        • 源码剖析-异步发送模式
    • RabbitMQ

    • RocketMQ

  • Zookeeper

  • java组件
  • 消息队列
  • Kafka
  • Kafka深入
Revin
2023-07-23
目录

Kafka高级特性解析

# 2.1 生产者

# 2.1.1 消息发送

# 2.1.1.1 数据生产流程解析

Kafka_Page27_001

  1. Producer创建时,会创建一个Sender线程并设置为守护线程。

  2. 生产消息时,内部其实是异步流程;生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。

  3. 批次发送的条件为:缓冲区数据大小达到batch.size或者linger.ms达到上限,哪个先达到就算哪个。

  4. 批次发送后,发往指定分区,然后落盘到broker;如果生产者配置了retrires参数大于0并且失败原因允许重试,那么客户端内部会对该消息进行重试。

  5. 落盘到broker成功,返回生产元数据给生产者。

  6. 元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回。

# 2.1.1.2 必要参数配置

# 2.1.1.2.1 broker配置

  1. 配置条目的使用方式:

Kafka_Page27_002

  1. 配置参数:

image-20230731095758653

# 2.1.1.3 序列化器

Kafka_Page29_001

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组。

序列化器的作用就是用于序列化要发送的消息的。

Kafka使用 org.apache.kafka.common.serialization.Serializer接口用于定义序列化器,将

泛型指定类型的数据转换为字节数组。

package org.apache.kafka.common.serialization;
import java.io.Closeable;
import java.util.Map;
/**
 * 将对象转换为byte数组的接口
 *
 * 该接口的实现类需要ᨀ供无参构造器
 * @param <T> 从哪个类型转换
 */
public interface Serializer < T > extends Closeable
{
    /**
     * 类的配置信息
     * @param configs key/value pairs
     * @param isKey key的序列化还是value的序列化
     */
    void configure(Map < String, ? > configs, boolean isKey);
    /**
     * 将对象转换为字节数组
     *
     * @param topic 主题名称
     * @param data 需要转换的对象
     * @return 序列化的字节数组
     */
    byte[] serialize(String topic, T data);
    /**
     * 关闭序列化器
     * 该方法需要ᨀ供幂等性,因为可能调用多次。
     */
    @Override
    void close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

系统提供了该接口的子接口以及实现类:

org.apache.kafka.common.serialization.ByteArraySerializer
1

Kafka_Page30_001

org.apache.kafka.common.serialization.ByteBufferSerializer
1

Kafka_Page31_001

org.apache.kafka.common.serialization.BytesSerializer
1

Kafka_Page31_002

org.apache.kafka.common.serialization.DoubleSerializer
1

Kafka_Page32_001

org.apache.kafka.common.serialization.FloatSerializer
1

Kafka_Page33_001

org.apache.kafka.common.serialization.IntegerSerializer
1

Kafka_Page33_002

org.apache.kafka.common.serialization.StringSerializer
1

Kafka_Page34_001

org.apache.kafka.common.serialization.LongSerializer
1

Kafka_Page34_002

org.apache.kafka.common.serialization.ShortSerializer
1

Kafka_Page35_001

# 2.1.1.3.1 自定义序列化器

数据的序列化一般生产中使用avro。

自定义序列化器需要实现org.apache.kafka.common.serialization.Serializer<T>接口,并实现其中的 serialize 方法。

案例:

实体类:

package com.lagou.kafka.demo.entity;
public class User
{
    private Integer userId;
    private String username;
    public Integer getUserId()
    {
        return userId;
    }
    public void setUserId(Integer userId)
    {
        this.userId = userId;
    }
    public String getUsername()
    {
        return username;
    }
    public void setUsername(String username)
    {
        this.username = username;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

序列化类:

package com.lagou.kafka.demo.serializer;
import com.lagou.kafka.demo.entity.User;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.io.UnsupportedEncodingException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Map;
public class UserSerializer implements Serializer < User >
{
    @Override
    public void configure(Map < String, ? > configs, boolean isKey)
    {
        // do nothing
    }
    @Override
    public byte[] serialize(String topic, User data)
    {
        try
        {
            // 如果数据是null,则返回null
            if(data == null) return null;
            Integer userId = data.getUserId();
            String username = data.getUsername();
            int length = 0;
            byte[] bytes = null;
            if(null != username)
            {
                bytes = username.getBytes("utf-8");
                length = bytes.length;
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
            buffer.putInt(userId);
            buffer.putInt(length);
            buffer.put(bytes);
            return buffer.array();
        }
        catch (UnsupportedEncodingException e)
        {
            throw new SerializationException("序列化数据异常");
        }
    }
    @Override
    public void close()
    {
        // do nothing
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

生产者:

package com.lagou.kafka.demo.producer;
import com.lagou.kafka.demo.entity.User;
import com.lagou.kafka.demo.serializer.UserSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class MyProducer
{
    public static void main(String[] args)
    {
        Map < String, Object > configs = new HashMap < > ();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 设置自定义的序列化类
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);
        KafkaProducer < String, User > producer = new KafkaProducer < String, User > (configs);
        User user = new User();
        user.setUserId(1001);
        user.setUsername("张三");
        ProducerRecord < String, User > record = new ProducerRecord < > ("tp_user_01", 0, user.getUsername(), user);
        producer.send(record, (metadata, exception) - >
        {
            if(exception == null)
            {
                System.out.println("消息发送成功:" + metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());
            }
            else
            {
                System.out.println("消息发送异常");
            }
        });
        // 关闭生产者
        producer.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# 2.1.1.4 分区器

Kafka_Page38_001

默认(DefaultPartitioner)分区计算:

  1. 如果record提供了分区号,则使用record提供的分区号

  2. 如果record没有提供分区号,则使用key的序列化后的值的hash值对分区数量取模

  3. 如果record没有提供分区号,也没有提供key,则使用轮询的方式分配分区号。

    • 会首先在可用的分区中分配分区号

    • 如果没有可用的分区,则在该主题所有分区中分配分区号。

Kafka_Page38_002

Kafka_Page39_001

如果要自定义分区器,则需要

  1. 首先开发Partitioner接口的实现类
  2. 在KafkaProducer中进行设置:configs.put("partitioner.class", "xxx.xx.Xxx.class")

位于 org.apache.kafka.clients.producer 中的分区器接口:

package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Cluster;
import java.io.Closeable;
/**
 * 分区器接口
 */
public interface Partitioner extends Configurable, Closeable
{
    /**
     * 为指定的消息记录计算分区值
     *
     * @param topic 主题名称
     * @param key 根据该key的值进行分区计算,如果没有则为null。
     * @param keyBytes key的序列化字节数组,根据该数组进行分区计算。如果没有key,则为null
     * @param value 根据value值进行分区计算,如果没有,则为null
     * @param valueBytes value的序列化字节数组,根据此值进行分区计算。如果没有,则为null
     * @param cluster 当前集群的元数据
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
    /**
     * 关闭分区器的时候调用该方法
     */
    public void close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

包 org.apache.kafka.clients.producer.internals 中分区器的默认实现:

package org.apache.kafka.clients.producer.internals;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
/**
 * 默认的分区策略:
 *
 * 如果在记录中指定了分区,则使用指定的分区
 * 如果没有指定分区,但是有key的值,则使用key值的散列值计算分区
 * 如果没有指定分区也没有key的值,则使用轮询的方式选择一个分区
 */
public class DefaultPartitioner implements Partitioner
{
    private final ConcurrentMap < String, AtomicInteger > topicCounterMap = new ConcurrentHashMap < > ();
    public void configure(Map < String, ? > configs)
    {}
    /**
     * 为指定的消息记录计算分区值
     *
     * @param topic 主题名称
     * @param key 根据该key的值进行分区计算,如果没有则为null。
     * @param keyBytes key的序列化字节数组,根据该数组进行分区计算。如果没有key,则为null
     * @param value 根据value值进行分区计算,如果没有,则为null
     * @param valueBytes value的序列化字节数组,根据此值进行分区计算。如果没有,则为null
     * @param cluster 当前集群的元数据
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
    {
        // 获取指定主题的所有分区信息
        List < PartitionInfo > partitions = cluster.partitionsForTopic(topic);
        // 分区的数量
        int numPartitions = partitions.size();
        // 如果没有ᨀ供key
        if(keyBytes == null)
        {
            int nextValue = nextValue(topic);
            List < PartitionInfo > availablePartitions = cluster.availablePartitionsForTopic(topic);
            if(availablePartitions.size() > 0)
            {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            }
            else
            {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        }
        else
        {
            // hash the keyBytes to choose a partition
            // 如果有,就计算keyBytes的哈希值,然后对当前主题的个数取模
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
    private int nextValue(String topic)
    {
        AtomicInteger counter = topicCounterMap.get(topic);
        if(null == counter)
        {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if(currentCounter != null)
            {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }
    public void close()
    {}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

Kafka_Page41_001

可以实现Partitioner接口自定义分区器:

Kafka_Page42_001

然后在生产者中配置:

Kafka_Page42_002

# 2.1.1.5 拦截器

Kafka_Page42_003

Producer拦截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要用于实现Client端的定制化控制逻辑。

对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,Producer允许用户指定多个Interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是

org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

  • onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即运行在用户主线程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。

  • onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运行在Producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。

  • close:关闭Interceptor,主要用于执行一些资源清理工作。

如前所述,Interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。

另外倘若指定了多个Interceptor,则Producer将按照指定顺序调用它们,并仅仅是捕获每个Interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

自定义拦截器:

  1. 实现ProducerInterceptor接口
  2. 在KafkaProducer的设置中设置自定义的拦截器

Kafka_Page43_001

案例:

  1. 消息实体类:
package com.lagou.kafka.demo.entity;
public class User
{
    private Integer userId;
    private String username;
    public Integer getUserId()
    {
        return userId;
    }
    public void setUserId(Integer userId)
    {
        this.userId = userId;
    }
    public String getUsername()
    {
        return username;
    }
    public void setUsername(String username)
    {
        this.username = username;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
  1. 自定义序列化器
package com.lagou.kafka.demo.serializer;
import com.lagou.kafka.demo.entity.User;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.io.UnsupportedEncodingException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Map;
public class UserSerializer implements Serializer < User >
{
    @Override
    public void configure(Map < String, ? > configs, boolean isKey)
    {
        // do nothing
    }
    @Override
    public byte[] serialize(String topic, User data)
    {
        try
        {
            // 如果数据是null,则返回null
            if(data == null) return null;
            Integer userId = data.getUserId();
            String username = data.getUsername();
            int length = 0;
            byte[] bytes = null;
            if(null != username)
            {
                bytes = username.getBytes("utf-8");
                length = bytes.length;
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
            buffer.putInt(userId);
            buffer.putInt(length);
            buffer.put(bytes);
            return buffer.array();
        }
        catch (UnsupportedEncodingException e)
        {
            throw new SerializationException("序列化数据异常");
        }
    }
    @Override
    public void close()
    {
        // do nothing
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
  1. 自定义分区器
package com.lagou.kafka.demo.partitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner
{
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
    {
        return 2;
    }
    @Override
    public void close()
    {}
    @Override
    public void configure(Map < String, ? > configs)
    {}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  1. 自定义拦截器1
package com.lagou.kafka.demo.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class InterceptorOne < KEY, VALUE > implements ProducerInterceptor < KEY, VALUE >
{
    private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);
    @Override
    public ProducerRecord < KEY,
    VALUE > onSend(ProducerRecord < KEY, VALUE > record)
    {
        System.out.println("拦截器1---go");
        // 此处根据业务需要对相关的数据作修改
        String topic = record.topic();
        Integer partition = record.partition();
        Long timestamp = record.timestamp();
        KEY key = record.key();
        VALUE value = record.value();
        Headers headers = record.headers();
        // 添加消息头
        headers.add("interceptor", "interceptorOne".getBytes());
        ProducerRecord < KEY, VALUE > newRecord = new ProducerRecord < KEY, VALUE > (topic, partition, timestamp, key, value, headers);
        return newRecord;
    }
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception)
    {
        System.out.println("拦截器1---back");
        if(exception != null)
        {
            // 如果发生异常,记录日志中
            LOGGER.error(exception.getMessage());
        }
    }
    @Override
    public void close()
    {}
    @Override
    public void configure(Map < String, ? > configs)
    {}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
  1. 自定义拦截器2
package com.lagou.kafka.demo.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class InterceptorTwo < KEY, VALUE > implements ProducerInterceptor < KEY, VALUE >
{
    private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);
    @Override
    public ProducerRecord < KEY,
    VALUE > onSend(ProducerRecord < KEY, VALUE > record)
    {
        System.out.println("拦截器2---go");
        // 此处根据业务需要对相关的数据作修改
        String topic = record.topic();
        Integer partition = record.partition();
        Long timestamp = record.timestamp();
        KEY key = record.key();
        VALUE value = record.value();
        Headers headers = record.headers();
        // 添加消息头
        headers.add("interceptor", "interceptorTwo".getBytes());
        ProducerRecord < KEY, VALUE > newRecord = new ProducerRecord < KEY, VALUE > (topic, partition, timestamp, key, value, headers);
        return newRecord;
    }
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception)
    {
        System.out.println("拦截器2---back");
        if(exception != null)
        {
            // 如果发生异常,记录日志中
            LOGGER.error(exception.getMessage());
        }
    }
    @Override
    public void close()
    {}
    @Override
    public void configure(Map < String, ? > configs)
    {}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
  1. 自定义拦截器3
package com.lagou.kafka.demo.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class InterceptorThree < KEY, VALUE > implements ProducerInterceptor < KEY, VALUE >
{
    private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorThree.class);
    @Override
    public ProducerRecord < KEY,
    VALUE > onSend(ProducerRecord < KEY, VALUE > record)
    {
        System.out.println("拦截器3---go");
        // 此处根据业务需要对相关的数据作修改
        String topic = record.topic();
        Integer partition = record.partition();
        Long timestamp = record.timestamp();
        KEY key = record.key();
        VALUE value = record.value();
        Headers headers = record.headers();
        // 添加消息头
        headers.add("interceptor", "interceptorThree".getBytes());
        ProducerRecord < KEY, VALUE > newRecord = new ProducerRecord < KEY, VALUE > (topic, partition, timestamp, key, value, headers);
        return newRecord;
    }
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception)
    {
        System.out.println("拦截器3---back");
        if(exception != null)
        {
            // 如果发生异常,记录日志中
            LOGGER.error(exception.getMessage());
        }
    }
    @Override
    public void close()
    {}
    @Override
    public void configure(Map < String, ? > configs)
    {}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
  1. 生产者
package com.lagou.kafka.demo.producer;
import com.lagou.kafka.demo.entity.User;
import com.lagou.kafka.demo.serializer.UserSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class MyProducer
{
    public static void main(String[] args)
    {
        Map < String, Object > configs = new HashMap < > ();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        // 设置自定义分区器
        // configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class);
        configs.put("partitioner.class", "com.lagou.kafka.demo.partitioner.MyPartitioner");
        // 设置拦截器
        configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.lagou.kafka.demo.interceptor.InterceptorOne," + "com.lagou.kafka.demo.interceptor.InterceptorTwo," + "com.lagou.kafka.demo.interceptor.InterceptorThree");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 设置自定义的序列化类
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);
        KafkaProducer < String, User > producer = new KafkaProducer < String, User > (configs);
        User user = new User();
        user.setUserId(1001);
        user.setUsername("张三");
        ProducerRecord < String, User > record = new ProducerRecord < > ("tp_user_01", 0, user.getUsername(), user);
        producer.send(record, (metadata, exception) - >
        {
            if(exception == null)
            {
                System.out.println("消息发送成功:" + metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());
            }
            else
            {
                System.out.println("消息发送异常");
            }
        });
        // 关闭生产者
        producer.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  1. 运行结果:

Kafka_Page50_001

# 2.1.2 原理剖析

由上图可以看出:KafkaProducer有两个基本线程:

Kafka_Page51_001

  • 主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器RecoderAccumulator中;

    • 消息收集器RecoderAccumulator为每个分区都维护了一个Deque<ProducerBatch> 类型的双端队列。

    • ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低网络影响;

    • 由于生产者客户端使用 java.io.ByteBuffer 在发送消息之前进行消息保存,并维护了一个 BufferPool 实现 ByteBuffer 的复用;该缓存池只针对特定大小( batch.size指定)的 ByteBuffer进行管理,对于消息过大的缓存,不能做到重复利用。

    • 每次追加一条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取一个ProducerBatch,判断当前消息的大小是否可以写入该批次中。若可以写入则写入;若不可以写入,则新建一个ProducerBatch,判断该消息大小是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size建立新的ProducerBatch,这样方便进行缓存重复利用;若超过,则以计算的消息大小建立对应的 ProducerBatch ,缺点就是该内存不能被复用了。

  • Sender线程:

    • 该线程从消息收集器获取缓存的消息,将其处理为 <Node, List<ProducerBatch>的形式, Node 表示集群的broker节点。

    • 进一步将<Node, List<ProducerBatch>转化为<Node, Request>形式,此时才可以向服务端发送数据。

    • 在发送之前,Sender线程将消息以 Map<NodeId, Deque<Request>> 的形式保存到InFlightRequests 中进行缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压力最小的一个,以实现消息的尽快发出。

# 2.1.3 生产者参数配置补充

  1. 参数设置方式:

Kafka_Page52_001

Kafka_Page52_002

  1. 补充参数:

image-20230731111717671

image-20230731111736267

上次更新: 2025/04/03, 11:07:08
Kafka架构与实战
Kafka高级特性-消费者

← Kafka架构与实战 Kafka高级特性-消费者→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式