源码剖析-同步发送模式
# 4.18 Kafka源码剖析之同步发送模式
消息同步发送的代码:
所谓同步,就是调用Future的get方法同步等待。
send方法是异步的:
send方法将消息发送给broker,当前线程同步等待broker返回的消息。
send发的实现:
看doSend:
该方法首先将消息放到累加器中
判断是否需要发起请求,如果需要,则唤醒sender线程发送消息
该方法的返回值:RecordApendResult.future:
RecordApendResult类:
累加器的append方法将消息追加到累加器,并返回追加到累加器的结果:
其中主要实现:
tryAppend的实现:
上述方法的返回值是FutureRecordMetadata,而该类的实现:
上述方法中,await方法等待broker端返回结果。
result实际上是tryAppend方法赋值的produceFuture对象:
produceFuture对象是:
该类中有一个CountDownLatch,future的get方法中的等待实际上就是该CountDownLatch的等待。
最终我们的producer.send方法的返回值就是FutureRecordMetadata对象。
future.get就是在等待该CountDownLatch的countDown的触发:
该方法何时调用?
completeFutureAndFireCallbacks方法调用
(Alt+F7 查看元素的使用位置)
completeFutureAndFireCallbacks方法何时调用?
done方法何时调用?
在completeBatch方法的最后,如果batch.done,则释放累加器的空间。
completeBatch方法何时调用?
在该方法中:
completeBatch何时调用?
在handleProduceResponse中如果有响应,则解析,并调用completeBatch方法
如果没有响应,表示是acks=0的情形,不需要解析响应,直接调用completeBatch方法即可。
handleProduceResponse何时调用?
Sender线程创建回调,回调中调用了handleProduceResponse方法,创建生产请求对象,该对象中封装了回调对象
发送请求,等待回调的触发。
sendProduceRequest的调用:
sendProducerData的调用:
总结:
所谓同步调用,指的是生产者调用 producer.send(record).get() 方法。
该方法首先将要发送的消息发送到消息累加器
判断累加器中的消息批次是否达到,或者当前批次没写满,但是加入当前消息会让消息批大于消息批最大值,则创建新的批次。
如果需要发送消息批次,则唤醒sender线程,让sender线程发送消息。
sender线程会返回一个future对象给生产者客户端线程。
若生产者调用该future的get方法,则该方法使用CountDownLatch阻塞,直到收到broker响应,触发CountDownLatch的countDown方法
此时生产者线程的get方法返回,得到发送的结果。