源码剖析-Producer生产者流程
# 4.4 Kafka源码剖析之Producer生产者流程
# 4.4.1 Producer示例
首先我们先通过一段代码来展示 KafkaProducer 的使用方法。在下面的示例中,我们使用KafkaProducer 实现向kafka发送消息的功能。在示例程序中,首先将 KafkaProduce 使用的配置写入到 Properties 中,每项配置的具体含义在注释中进行解释。之后以此 Properties 对象为参数构造KafkaProducer 对象,最后通过 send 方法完成发送,代码中包含同步发送、异步发送两种情况。
public static void main(String[] args) throws ExecutionException,InterruptedException {
Properties props = new Properties();
// 客户端id
props.put("client.id", "KafkaProducerDemo");
// kafka地址,列表格式为host1:port1,host2:port2,…,无需添加所有的集群地址,kafka会根据ᨀ供的地址发现其他的地址(建议多ᨀ供几个,以防ᨀ供的服务器关闭)
props.put("bootstrap.servers", "localhost:9092");
// 发送返回应答方式
// 0:Producer 往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
// 1:Producer 往集群发送数据只要 Leader 应答就可以发送下一条,只确保Leader接收成功。
// -1或者all:Producer 往集群发送数据需要所有的ISR Follower都完成从Leader的同步才会发送下一条,确保Leader发送成功和所有的副本都成功接收。安全性最高,但是效率最低。
props.put("acks", "all");
// 重试次数
props.put("retries", 0);
// 重试间隔时间
props.put("retries.backoff.ms", 100);
// 批量发送的大小
props.put("batch.size", 16384);
// 一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去
props.put("linger.ms", 10);
// 缓冲区大小
props.put("buffer.memory", 33554432);
// key序列化方式
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
// value序列化方式
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
// topic
String topic = "lagou_edu";
Producer<String, String> producer = new KafkaProducer<>(props);
AtomicInteger count = new AtomicInteger();
while (true) {
int num = count.get();
String key = Integer.toString(num);
String value = Integer.toString(num);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
if (num % 2 == 0) {
// 偶数异步发送
// 第一个参数record封装了topic、key、value
// 第二个参数是一个callback对象,当生产者接收到kafka发来的ACK确认消息时,会调用此CallBack对象的onComplete方法
producer.send(record, (recordMetadata, e) -> {
System.out.println("num:" + num + " topic:" +recordMetadata.topic() + " offset:" + recordMetadata.offset());
}
);
} else {
// 同步发送
// KafkaProducer.send方法返回的类型是Future<RecordMetadata>,通过get方法阻塞当前线程,等待kafka服务端ACK响应
producer.send(record).get();
}
count.incrementAndGet();
TimeUnit.MILLISECONDS.sleep(100);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# 4.4.1.1 同步发送
- KafkaProducer.send方法返回的类型是
Future<RecordMetadata>
,通过get方法阻塞当前线程,等待kafka服务端ACK响应
producer.send(record).get()
# 4.4.1.2 异步发送
第一个参数record封装了topic、key、value
第二个参数是一个callback对象,当生产者接收到kafka发来的ACK确认消息时,会调用此
CallBack对象的onComplete方法
producer.send(record, (recordMetadata, e) -> {
System.out.println("num:" + num + " topic:" +
recordMetadata.topic() + " offset:" + recordMetadata.offset());
});
2
3
4
# 4.4.2 KafkaProducer实例化
了解了 KafkaProducer 的基本使用,开始深入了解的KafkaProducer原理和实现,先看一下构造方法核心逻辑
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer,Serializer<V> valueSerializer) {
try {
// 获取用户的配置
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
// 系统时间
this.time = Time.SYSTEM;
// 获取client.id配置
String clientId =config.getString(ProducerConfig.CLIENT_ID_CONFIG);
// 如果client.id为空,设置默认值:producer-1
if (clientId.length() <= 0)
clientId = "producer-" +PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
// 获取事务id,如果没有配置则为null
String transactionalId =userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
(String)userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
LogContext logContext;
if (transactionalId == null)
logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId)); else
logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
log = logContext.logger(KafkaProducer.class);
log.trace("Starting the Kafka producer");
// 创建client-id的监控map
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
// 设置监控配置,包含样本量、取样时间窗口、记录级别
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
// 监控数据上报类
List<MetricsReporter> reporters =config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
// 生成生产者监控
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
// 分区类
this.partitioner =config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG,Partitioner.class);
// 重试时间 retry.backoff.ms 默认100ms
long retryBackoffMs =config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
if (keySerializer == null) {
// 反射生成key序列化方式
this.keySerializer =ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class));
this.keySerializer.configure(config.originals(), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = ensureExtended(keySerializer);
}
if (valueSerializer == null) {
// 反射生成key序列化方式
this.valueSerializer =ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class));
this.valueSerializer.configure(config.originals(), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = ensureExtended(valueSerializer);
}
// load interceptors and make sure they get clientId
// 确认client.id添加到用户的配置里面
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,clientId);
// 获取多个拦截器,为空则不处理
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs,false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
// 集群资源监听器,在元数据变更时会有通知
ClusterResourceListeners clusterResourceListeners =configureClusterResourceListeners(keySerializer, valueSerializer,interceptorList, reporters);
// 生产者每隔一段时间都要去更新一下集群的元数据,默认5分钟
this.metadata = new Metadata(retryBackoffMs,config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
true, true, clusterResourceListeners);
// 生产者往服务端发送消息的时候,规定一条消息最大多大?
// 如果你超过了这个规定消息的大小,你的消息就不能发送过去。
// 默认是1M,这个值偏小,在生产环境中,我们需要修改这个值。
// 经验值是10M。但是大家也可以根据自己公司的情况来。
this.maxRequestSize =config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
//指的是缓存大小
//默认值是32M,这个值一般是够用,如果有特殊情况的时候,我们可以去修改这个值。
this.totalMemorySize =config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
// kafka是支持压缩数据的,可以设置压缩格式,默认是不压缩,支持gzip、snappy、lz4
// 一次发送出去的消息就更多。生产者这儿会消耗更多的cpu.
this.compressionType =CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
// 配置控制了KafkaProducer.send()并将
KafkaProducer.partitionsFor()被阻塞多长时间,由于缓冲区已满或元数据不可用,这些方法可能会被阻塞止
this.maxBlockTimeMs =config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
// 控制客户端等待请求响应的最长时间。如果在超时过去之前未收到响应,客户端将在必要时重新发送请求,或者如果重试耗尽,请求失败
this.requestTimeoutMs =config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
// 事务管理器
this.transactionManager = configureTransactionState(config,logContext, log);
// 重试次数
int retries = configureRetries(config, transactionManager !=null, log);
// 使用幂等性,需要将 enable.idempotence 配置项设置为true。并且它对单个分区的发送,一次性最多发送5条
int maxInflightRequests = configureInflightRequests(config,transactionManager != null);
// 如果开启了幂等性,但是用户指定的ack不为 -1,则会抛出异常
short acks = configureAcks(config, transactionManager != null,log);
this.apiVersions = new ApiVersions();
// 创建核心组件:记录累加器
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time,
apiVersions,
transactionManager);
// 获取broker地址列表
List<InetSocketAddress> addresses =ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
// 更新元数据
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
// 创建通道,是否需要加密
ChannelBuilder channelBuilder =ClientUtils.createChannelBuilder(config);
Sensor throttleTimeSensor =Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
// 初始化了一个重要的管理网路的组件
// connections.max.idle.ms: 默认值是9分钟, 一个网络连接最多空闲多久,超过这个空闲时间,就关闭这个网络连接。
// max.in.flight.requests.per.connection:默认是5, producer向broker发送数据的时候,其实是有多个网络连接。每个网络连接可以忍受 producer端发送给broker 消息然后消息没有响应的个数
NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, "producer", channelBuilder,logContext),
this.metadata,
clientId,
maxInflightRequests,
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs,
time,
true,
apiVersions,
throttleTimeSensor,
logContext);
// 发送线程
this.sender = new Sender(logContext,
client,
this.metadata,
this.accumulator,
maxInflightRequests == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
retries,
metricsRegistry.senderMetrics,
Time.SYSTEM,
this.requestTimeoutMs,
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);
// 线程名称
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
// 启动守护线程
this.ioThread = new KafkaThread(ioThreadName, this.sender,true);
this.ioThread.start();
this.errors = this.metrics.sensor("errors");
// 把用户配置的参数,但是没有用到的打印出来
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka producer started");
}
catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(0, TimeUnit.MILLISECONDS, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer",t);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# 4.4.2 消息发送过程
Kafka消息实际发送以 send 方法为入口:
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
2
3
4
5
6
# 4.4.2.1 拦截器
首先方法会先进入拦截器集合 ProducerInterceptors , onSend 方法是遍历拦截器 onSend 方法,拦截器的目的是将数据处理加工, kafka 本身并没有给出默认的拦截器的实现。如果需要使用拦截器功能,必须自己实现 ProducerInterceptor 接口。
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
ProducerRecord<K, V> interceptRecord = record;
// 遍历所有拦截器,顺序执行,如果有异常只打印日志,不会向上抛出
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
interceptRecord = interceptor.onSend(interceptRecord);
}
catch (Exception e) {
// do not propagate interceptor exception, log and continue calling other interceptors
// be careful not to throw exception from here
if (record != null)
log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e); else
log.warn("Error executing interceptor onSend callback",e);
}
}
return interceptRecord;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 4.4.2.2 拦截器核心逻辑
ProducerInterceptor 接口包括三个方法:
onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中的。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算
onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率
close:关闭interceptor,主要用于执行一些资源清理工作
拦截器可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。
# 4.4.2.3 发送五步骤
下面仔细来看一下 doSend 方法的运行过程:
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
// 首先创建一个主题分区类
TopicPartition tp = null;
try {
// first make sure the metadata for the topic is available
// 首先确保该topic的元数据可用
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
// 序列化 record 的 key 和 value
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(),record.headers(), record.key());
}
catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class" + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(),record.headers(), record.value());
}
catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
// 获取该 record 要发送到的 partition
int partition = partition(record, serializedKey,serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
// 给header设置只读
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue,headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ?time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
// 向 accumulator 中追加 record 数据,数据会先进行缓存
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback,remainingWaitMs);
// 如果追加完数据后,对应的 RecordBatch 已经达到了 batch.size 的大小(或者batch 的剩余空间不足以添加下一条 Record),则唤醒 sender 线程发送数据。
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
}
catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
}
catch (InterruptedException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
}
catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
}
catch (KafkaException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
}
catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
- Producer 通过 waitOnMetadata() 方法来获取对应 topic 的 metadata 信息,需要先该
topic 是可用的
Producer 端对 record 的 key 和 value 值进行序列化操作,在 Consumer 端再进行相应的反序列化
获取partition值,具体分为下面三种情况:
1.指明 partition 的情况下,直接将指明的值直接作为 partiton 值
2.没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
3.既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到partition 值,也就是常说的 round-robin 算法
Producer 默认使用的 partitioner 是
org.apache.kafka.clients.producer.internals.DefaultPartitioner
向 accumulator 写数据,先将 record 写入到 buffer 中,当达到一个 batch.size 的大小时,再唤起 sender线程去发送 RecordBatch,这里仔细分析一下Producer是如何向buffer写入数据的
- 1.获取该 topic-partition 对应的 queue,没有的话会创建一个空的 queue
- 2.向 queue 中追加数据,先获取 queue 中最新加入的那个 RecordBatch,如果不存在或者存在但剩余空余不足以添加本条 record 则返回 null,成功写入的话直接返回结果,写入成功
- 3.创建一个新的 RecordBatch,初始化内存大小根据 max(batch.size,Records.LOG_OVERHEAD + Record.recordSize(key, value)) 来确定(防止单条record 过大的情况)
- 4.向新建的 RecordBatch 写入 record,并将 RecordBatch 添加到 queue 中,返回结果,写入成功
发送 RecordBatch,当 record 写入成功后,如果发现 RecordBatch 已满足发送的条件(通常是 queue 中有多个 batch,那么最先添加的那些 batch 肯定是可以发送了),那么就会唤醒sender 线程,发送 RecordBatch 。sender 线程对 RecordBatch 的处理是在 run() 方法中进行的,该方法具体实现如下:
- 1.获取那些已经可以发送的 RecordBatch 对应的 nodes
- 2.如果与node 没有连接(如果可以连接,同时初始化该连接),就证明该 node 暂时不能发送数据,暂时移除该 node
- 3.返回该 node 对应的所有可以发送的 RecordBatch 组成的 batches(key 是node.id),并将 RecordBatch 从对应的 queue 中移除
- 4.将由于元数据不可用而导致发送超时的 RecordBatch 移除
- 5.发送 RecordBatch
# 4.4.2.4 MetaData更新机制
metadata.requestUpdate() 将 metadata 的 needUpdate 变量设置为 true(强制更新),并返回当前的版本号(version),通过版本号来判断 metadata 是否完成更新
sender.wakeup() 唤醒 sender 线程,sender 线程又会去唤醒NetworkClient线程去更新
metadata.awaitUpdate(version, remainingWaitMs) 等待 metadata 的更新
所以,每次 Producer 请求更新 metadata 时,会有以下几种情况:
- 1.如果 node 可以发送请求,则直接发送请求
- 2.如果该 node 正在建立连接,则直接返回
- 3.如果该 node 还没建立连接,则向 broker 初始化链接
NetworkClient的poll方法中判断是否需要更新meta数据, handleCompletedReceives 处理metadata 的更新,最终是调用的 DefaultMetadataUpdater 中的handleCompletedMetadataResponse 方法处理