跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

      • Kafka基础使用
      • Kafka深入

        • Kafka架构与实战
        • Kafka高级特性解析
        • Kafka高级特性-消费者
        • Kafka高级特性-主题
        • Kafka高级特性-分区
        • Kafka高级特性-物理存储
        • Kafka高级特性-稳定性
        • Kafka高级特性-延时队列
        • Kafka高级特性-重试队列
        • Kafka集群与运维
        • Kafka源码剖析
        • 源码剖析-Broker启动流程
        • 源码剖析-Topic创建流程
        • 源码剖析-Producer生产者流程
          • 4.4.1 Producer示例
            • 4.4.1.1 同步发送
            • 4.4.1.2 异步发送
          • 4.4.2 KafkaProducer实例化
          • 4.4.2 消息发送过程
            • 4.4.2.1 拦截器
            • 4.4.2.2 拦截器核心逻辑
            • 4.4.2.3 发送五步骤
            • 4.4.2.4 MetaData更新机制
        • 源码剖析-Consumer消费者流程
        • 源码剖析-消息存储机制
        • 源码剖析-SocketServer
        • 源码剖析-KafkaRequestHandlerPool
        • 源码剖析-LogManager
        • 源码剖析-ReplicaManager
        • 源码剖析-OffsetManager
        • 源码剖析-KafkaApis
        • 源码剖析-KafkaController
        • 源码剖析-KafkaHealthcheck
        • 源码剖析-DynamicConfigManager
        • 源码剖析-分区消费模式
        • 源码剖析-组消费模式
        • 源码剖析-同步发送模式
        • 源码剖析-异步发送模式
    • RabbitMQ

    • RocketMQ

  • Zookeeper

  • java组件
  • 消息队列
  • Kafka
  • Kafka深入
Revin
2023-07-31
目录

源码剖析-Producer生产者流程

# 4.4 Kafka源码剖析之Producer生产者流程

# 4.4.1 Producer示例

首先我们先通过一段代码来展示 KafkaProducer 的使用方法。在下面的示例中,我们使用KafkaProducer 实现向kafka发送消息的功能。在示例程序中,首先将 KafkaProduce 使用的配置写入到 Properties 中,每项配置的具体含义在注释中进行解释。之后以此 Properties 对象为参数构造KafkaProducer 对象,最后通过 send 方法完成发送,代码中包含同步发送、异步发送两种情况。

public static void main(String[] args) throws ExecutionException,InterruptedException {
   Properties props = new Properties();
   // 客户端id
   props.put("client.id", "KafkaProducerDemo");
   // kafka地址,列表格式为host1:port1,host2:port2,…,无需添加所有的集群地址,kafka会根据ᨀ供的地址发现其他的地址(建议多ᨀ供几个,以防ᨀ供的服务器关闭)
   props.put("bootstrap.servers", "localhost:9092");
   // 发送返回应答方式
   // 0:Producer 往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
   // 1:Producer 往集群发送数据只要 Leader 应答就可以发送下一条,只确保Leader接收成功。
   // -1或者all:Producer 往集群发送数据需要所有的ISR Follower都完成从Leader的同步才会发送下一条,确保Leader发送成功和所有的副本都成功接收。安全性最高,但是效率最低。
   props.put("acks", "all");
   // 重试次数
   props.put("retries", 0);
   // 重试间隔时间
   props.put("retries.backoff.ms", 100);
   // 批量发送的大小
   props.put("batch.size", 16384);
   // 一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去
   props.put("linger.ms", 10);
   // 缓冲区大小
   props.put("buffer.memory", 33554432);
   // key序列化方式
   props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
   // value序列化方式
   props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
   // topic
   String topic = "lagou_edu";
   Producer<String, String> producer = new KafkaProducer<>(props);
   AtomicInteger count = new AtomicInteger();
   while (true) {
   	int num = count.get();
   	String key = Integer.toString(num);
   	String value = Integer.toString(num);
   	ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
   	if (num % 2 == 0) {
   		// 偶数异步发送
   		// 第一个参数record封装了topic、key、value
   		// 第二个参数是一个callback对象,当生产者接收到kafka发来的ACK确认消息时,会调用此CallBack对象的onComplete方法
   		producer.send(record, (recordMetadata, e) -> {
   			System.out.println("num:" + num + " topic:" +recordMetadata.topic() + " offset:" + recordMetadata.offset());
   		}
   		);
   	} else {
   		// 同步发送
   		// KafkaProducer.send方法返回的类型是Future<RecordMetadata>,通过get方法阻塞当前线程,等待kafka服务端ACK响应
   		producer.send(record).get();
   	}
   	count.incrementAndGet();
   	TimeUnit.MILLISECONDS.sleep(100);
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

# 4.4.1.1 同步发送

  1. KafkaProducer.send方法返回的类型是Future<RecordMetadata>,通过get方法阻塞当前线程,等待kafka服务端ACK响应
producer.send(record).get()
1

# 4.4.1.2 异步发送

  1. 第一个参数record封装了topic、key、value

  2. 第二个参数是一个callback对象,当生产者接收到kafka发来的ACK确认消息时,会调用此

CallBack对象的onComplete方法

producer.send(record, (recordMetadata, e) -> {
 System.out.println("num:" + num + " topic:" +
 recordMetadata.topic() + " offset:" + recordMetadata.offset());
});
1
2
3
4

# 4.4.2 KafkaProducer实例化

了解了 KafkaProducer 的基本使用,开始深入了解的KafkaProducer原理和实现,先看一下构造方法核心逻辑

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer,Serializer<V> valueSerializer) {
   try {
   	// 获取用户的配置
   	Map<String, Object> userProvidedConfigs = config.originals();
   	this.producerConfig = config;
   	// 系统时间
   	this.time = Time.SYSTEM;
   	// 获取client.id配置
   	String clientId =config.getString(ProducerConfig.CLIENT_ID_CONFIG);
   	// 如果client.id为空,设置默认值:producer-1
   	if (clientId.length() <= 0)
   	clientId = "producer-" +PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
   	this.clientId = clientId;
   	// 获取事务id,如果没有配置则为null
   	String transactionalId =userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
   	(String)userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
   	LogContext logContext;
   	if (transactionalId == null)
   	logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId)); else
   	logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
   	log = logContext.logger(KafkaProducer.class);
   	log.trace("Starting the Kafka producer");
   	// 创建client-id的监控map
   	Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
   	// 设置监控配置,包含样本量、取样时间窗口、记录级别
   	MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
   	.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),TimeUnit.MILLISECONDS)
   	.recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
   	.tags(metricTags);
   	// 监控数据上报类
   	List<MetricsReporter> reporters =config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   	MetricsReporter.class);
   	reporters.add(new JmxReporter(JMX_PREFIX));
   	this.metrics = new Metrics(metricConfig, reporters, time);
   	// 生成生产者监控
   	ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
   	// 分区类
   	this.partitioner =config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG,Partitioner.class);
   	// 重试时间 retry.backoff.ms 默认100ms
   	long retryBackoffMs =config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
   	if (keySerializer == null) {
   		// 反射生成key序列化方式
   		this.keySerializer =ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
   		Serializer.class));
   		this.keySerializer.configure(config.originals(), true);
   	} else {
   		config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
   		this.keySerializer = ensureExtended(keySerializer);
   	}
   	if (valueSerializer == null) {
   		// 反射生成key序列化方式
   		this.valueSerializer =ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
   		Serializer.class));
   		this.valueSerializer.configure(config.originals(), false);
   	} else {
   		config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
   		this.valueSerializer = ensureExtended(valueSerializer);
   	}
   	// load interceptors and make sure they get clientId
   	// 确认client.id添加到用户的配置里面
   	userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,clientId);
   	// 获取多个拦截器,为空则不处理
   	List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs,false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
   	ProducerInterceptor.class);
   	this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
   	// 集群资源监听器,在元数据变更时会有通知
   	ClusterResourceListeners clusterResourceListeners =configureClusterResourceListeners(keySerializer, valueSerializer,interceptorList, reporters);
   	// 生产者每隔一段时间都要去更新一下集群的元数据,默认5分钟
   	this.metadata = new Metadata(retryBackoffMs,config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
   	true, true, clusterResourceListeners);
   	// 生产者往服务端发送消息的时候,规定一条消息最大多大?
   	// 如果你超过了这个规定消息的大小,你的消息就不能发送过去。
   	// 默认是1M,这个值偏小,在生产环境中,我们需要修改这个值。
   	// 经验值是10M。但是大家也可以根据自己公司的情况来。
   	this.maxRequestSize =config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
   	//指的是缓存大小
   	//默认值是32M,这个值一般是够用,如果有特殊情况的时候,我们可以去修改这个值。
   	this.totalMemorySize =config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
   	// kafka是支持压缩数据的,可以设置压缩格式,默认是不压缩,支持gzip、snappy、lz4
   	// 一次发送出去的消息就更多。生产者这儿会消耗更多的cpu.
   	this.compressionType =CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
   	// 配置控制了KafkaProducer.send()并将
   	KafkaProducer.partitionsFor()被阻塞多长时间,由于缓冲区已满或元数据不可用,这些方法可能会被阻塞止
   	this.maxBlockTimeMs =config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
   	// 控制客户端等待请求响应的最长时间。如果在超时过去之前未收到响应,客户端将在必要时重新发送请求,或者如果重试耗尽,请求失败
   	this.requestTimeoutMs =config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
   	// 事务管理器
   	this.transactionManager = configureTransactionState(config,logContext, log);
   	// 重试次数
   	int retries = configureRetries(config, transactionManager !=null, log);
   	// 使用幂等性,需要将 enable.idempotence 配置项设置为true。并且它对单个分区的发送,一次性最多发送5条
   	int maxInflightRequests = configureInflightRequests(config,transactionManager != null);
   	// 如果开启了幂等性,但是用户指定的ack不为 -1,则会抛出异常
   	short acks = configureAcks(config, transactionManager != null,log);
   	this.apiVersions = new ApiVersions();
   	// 创建核心组件:记录累加器
   	this.accumulator = new RecordAccumulator(logContext,
   	config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
   	this.totalMemorySize,
   	this.compressionType,
   	config.getLong(ProducerConfig.LINGER_MS_CONFIG),
   	retryBackoffMs,
   	metrics,
   	time,
   	apiVersions,
   	transactionManager);
   	// 获取broker地址列表
   	List<InetSocketAddress> addresses =ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
   	// 更新元数据
   	this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
   	// 创建通道,是否需要加密
   	ChannelBuilder channelBuilder =ClientUtils.createChannelBuilder(config);
   	Sensor throttleTimeSensor =Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
   	// 初始化了一个重要的管理网路的组件
   	// connections.max.idle.ms: 默认值是9分钟, 一个网络连接最多空闲多久,超过这个空闲时间,就关闭这个网络连接。
   	// max.in.flight.requests.per.connection:默认是5, producer向broker发送数据的时候,其实是有多个网络连接。每个网络连接可以忍受 producer端发送给broker 消息然后消息没有响应的个数
   	NetworkClient client = new NetworkClient(
   	new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
   	this.metrics, time, "producer", channelBuilder,logContext),
   	this.metadata,
   	clientId,
   	maxInflightRequests,
   	config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
   	config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
   	config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
   	config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
   	this.requestTimeoutMs,
   	time,
   	true,
   	apiVersions,
   	throttleTimeSensor,
   	logContext);
   	// 发送线程
   	this.sender = new Sender(logContext,
   	client,
   	this.metadata,
   	this.accumulator,
   	maxInflightRequests == 1,
   	config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
   	acks,
   	retries,
   	metricsRegistry.senderMetrics,
   	Time.SYSTEM,
   	this.requestTimeoutMs,
   	config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
   	this.transactionManager,
   	apiVersions);
   	// 线程名称
   	String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
   	// 启动守护线程
   	this.ioThread = new KafkaThread(ioThreadName, this.sender,true);
   	this.ioThread.start();
   	this.errors = this.metrics.sensor("errors");
   	// 把用户配置的参数,但是没有用到的打印出来
   	config.logUnused();
   	AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
   	log.debug("Kafka producer started");
   }
   catch (Throwable t) {
   	// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
   	close(0, TimeUnit.MILLISECONDS, true);
   	// now propagate the exception
   	throw new KafkaException("Failed to construct kafka producer",t);
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165

# 4.4.2 消息发送过程

Kafka消息实际发送以 send 方法为入口:

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
   // intercept the record, which can be potentially modified; this method does not throw exceptions
   ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
   return doSend(interceptedRecord, callback);
}
1
2
3
4
5
6

# 4.4.2.1 拦截器

首先方法会先进入拦截器集合 ProducerInterceptors , onSend 方法是遍历拦截器 onSend 方法,拦截器的目的是将数据处理加工, kafka 本身并没有给出默认的拦截器的实现。如果需要使用拦截器功能,必须自己实现 ProducerInterceptor 接口。

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
   ProducerRecord<K, V> interceptRecord = record;
   // 遍历所有拦截器,顺序执行,如果有异常只打印日志,不会向上抛出
   for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
   	try {
   		interceptRecord = interceptor.onSend(interceptRecord);
   	}
   	catch (Exception e) {
   		// do not propagate interceptor exception, log and continue calling other interceptors
   		// be careful not to throw exception from here
   		if (record != null)
   		log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e); else
   		log.warn("Error executing interceptor onSend callback",e);
   	}
   }
   return interceptRecord;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 4.4.2.2 拦截器核心逻辑

ProducerInterceptor 接口包括三个方法:

  1. onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中的。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算

  2. onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率

  3. close:关闭interceptor,主要用于执行一些资源清理工作

  4. 拦截器可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。

# 4.4.2.3 发送五步骤

下面仔细来看一下 doSend 方法的运行过程:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
   // 首先创建一个主题分区类
   TopicPartition tp = null;
   try {
   	// first make sure the metadata for the topic is available
   	// 首先确保该topic的元数据可用
   	ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
   	long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
   	Cluster cluster = clusterAndWaitTime.cluster;
   	// 序列化 record 的 key 和 value
   	byte[] serializedKey;
   	try {
   		serializedKey = keySerializer.serialize(record.topic(),record.headers(), record.key());
   	}
   	catch (ClassCastException cce) {
   		throw new SerializationException("Can't convert key of class" + record.key().getClass().getName() +
   		" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
   		" specified in key.serializer", cce);
   	}
   	byte[] serializedValue;
   	try {
   		serializedValue = valueSerializer.serialize(record.topic(),record.headers(), record.value());
   	}
   	catch (ClassCastException cce) {
   		throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
   		" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
   		" specified in value.serializer", cce);
   	}
   	// 获取该 record 要发送到的 partition
   	int partition = partition(record, serializedKey,serializedValue, cluster);
   	tp = new TopicPartition(record.topic(), partition);
   	// 给header设置只读
   	setReadOnly(record.headers());
   	Header[] headers = record.headers().toArray();
   	int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
   	compressionType, serializedKey, serializedValue,headers);
   	ensureValidRecordSize(serializedSize);
   	long timestamp = record.timestamp() == null ?time.milliseconds() : record.timestamp();
   	log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
   	// producer callback will make sure to call both 'callback' and interceptor callback
   	Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
   	if (transactionManager != null && transactionManager.isTransactional())
   	transactionManager.maybeAddPartitionToTransaction(tp);
   	// 向 accumulator 中追加 record 数据,数据会先进行缓存
   	RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
   	serializedValue, headers, interceptCallback,remainingWaitMs);
   	// 如果追加完数据后,对应的 RecordBatch 已经达到了 batch.size 的大小(或者batch 的剩余空间不足以添加下一条 Record),则唤醒 sender 线程发送数据。
   	if (result.batchIsFull || result.newBatchCreated) {
   		log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
   		this.sender.wakeup();
   	}
   	return result.future;
   	// handling exceptions and record the errors;
   	// for API exceptions return them in the future,
   	// for other exceptions throw directly
   }
   catch (ApiException e) {
   	log.debug("Exception occurred during message send:", e);
   	if (callback != null)
   	callback.onCompletion(null, e);
   	this.errors.record();
   	if (this.interceptors != null)
   	this.interceptors.onSendError(record, tp, e);
   	return new FutureFailure(e);
   }
   catch (InterruptedException e) {
   	this.errors.record();
   	if (this.interceptors != null)
   	this.interceptors.onSendError(record, tp, e);
   	throw new InterruptException(e);
   }
   catch (BufferExhaustedException e) {
   	this.errors.record();
   	this.metrics.sensor("buffer-exhausted-records").record();
   	if (this.interceptors != null)
   	this.interceptors.onSendError(record, tp, e);
   	throw e;
   }
   catch (KafkaException e) {
   	this.errors.record();
   	if (this.interceptors != null)
   	this.interceptors.onSendError(record, tp, e);
   	throw e;
   }
   catch (Exception e) {
   	// we notify interceptor about all exceptions, since onSend is called before anything else in this method
   	if (this.interceptors != null)
   	this.interceptors.onSendError(record, tp, e);
   	throw e;
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
  1. Producer 通过 waitOnMetadata() 方法来获取对应 topic 的 metadata 信息,需要先该

topic 是可用的

  1. Producer 端对 record 的 key 和 value 值进行序列化操作,在 Consumer 端再进行相应的反序列化

  2. 获取partition值,具体分为下面三种情况:

    • 1.指明 partition 的情况下,直接将指明的值直接作为 partiton 值

    • 2.没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值

    • 3.既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到partition 值,也就是常说的 round-robin 算法

    • Producer 默认使用的 partitioner 是org.apache.kafka.clients.producer.internals.DefaultPartitioner

  3. 向 accumulator 写数据,先将 record 写入到 buffer 中,当达到一个 batch.size 的大小时,再唤起 sender线程去发送 RecordBatch,这里仔细分析一下Producer是如何向buffer写入数据的

    • 1.获取该 topic-partition 对应的 queue,没有的话会创建一个空的 queue
    • 2.向 queue 中追加数据,先获取 queue 中最新加入的那个 RecordBatch,如果不存在或者存在但剩余空余不足以添加本条 record 则返回 null,成功写入的话直接返回结果,写入成功
    • 3.创建一个新的 RecordBatch,初始化内存大小根据 max(batch.size,Records.LOG_OVERHEAD + Record.recordSize(key, value)) 来确定(防止单条record 过大的情况)
    • 4.向新建的 RecordBatch 写入 record,并将 RecordBatch 添加到 queue 中,返回结果,写入成功
  4. 发送 RecordBatch,当 record 写入成功后,如果发现 RecordBatch 已满足发送的条件(通常是 queue 中有多个 batch,那么最先添加的那些 batch 肯定是可以发送了),那么就会唤醒sender 线程,发送 RecordBatch 。sender 线程对 RecordBatch 的处理是在 run() 方法中进行的,该方法具体实现如下:

    • 1.获取那些已经可以发送的 RecordBatch 对应的 nodes
    • 2.如果与node 没有连接(如果可以连接,同时初始化该连接),就证明该 node 暂时不能发送数据,暂时移除该 node
    • 3.返回该 node 对应的所有可以发送的 RecordBatch 组成的 batches(key 是node.id),并将 RecordBatch 从对应的 queue 中移除
    • 4.将由于元数据不可用而导致发送超时的 RecordBatch 移除
    • 5.发送 RecordBatch

# 4.4.2.4 MetaData更新机制

  1. metadata.requestUpdate() 将 metadata 的 needUpdate 变量设置为 true(强制更新),并返回当前的版本号(version),通过版本号来判断 metadata 是否完成更新

  2. sender.wakeup() 唤醒 sender 线程,sender 线程又会去唤醒NetworkClient线程去更新

  3. metadata.awaitUpdate(version, remainingWaitMs) 等待 metadata 的更新

  4. 所以,每次 Producer 请求更新 metadata 时,会有以下几种情况:

    • 1.如果 node 可以发送请求,则直接发送请求
    • 2.如果该 node 正在建立连接,则直接返回
    • 3.如果该 node 还没建立连接,则向 broker 初始化链接
  5. NetworkClient的poll方法中判断是否需要更新meta数据, handleCompletedReceives 处理metadata 的更新,最终是调用的 DefaultMetadataUpdater 中的handleCompletedMetadataResponse 方法处理

上次更新: 2025/04/03, 11:07:08
源码剖析-Topic创建流程
源码剖析-Consumer消费者流程

← 源码剖析-Topic创建流程 源码剖析-Consumer消费者流程→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式