RabbitMQ源码剖析
# 4.1 队列
声明队列记录:
-record(amqqueue, {
name :: rabbit_amqqueue:name() | '_', %% immutable
durable :: boolean() | '_', %% immutable
auto_delete :: boolean() | '_', %% immutable
exclusive_owner = none :: pid() | none | '_', %% immutable
arguments = [] :: rabbit_framing:amqp_table() | '_', %% immutable
pid :: pid() | ra_server_id() | none | '_', %% durable (just so we
%% know home node)
slave_pids = [] :: [pid()] | none | '_', %% transient
sync_slave_pids = [] :: [pid()] | none| '_',%% transient
recoverable_slaves = [] :: [atom()] | none | '_', %% durable
policy :: binary() | none | undefined | '_', %% durable, implicit
%% update as above
operator_policy :: binary() | none | undefined | '_', %% durable,
%% implicit
%% update
%% as above
gm_pids = [] :: [{pid(), pid()} | pid()] | none | '_', %% transient
decorators :: [atom()] | none | undefined | '_', %% transient,
%% recalculated
%% as above
state = live :: atom() | none | '_', %% durable (have we crashed?)
policy_version = 0 :: non_neg_integer() | '_',
slave_pids_pending_shutdown = [] :: [pid()] | '_',
vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index
options = #{} :: map() | '_',
type = ?amqqueue_v1_type :: module() | '_',
type_state = #{} :: map() | '_'
}).
类型声明:
-type amqqueue() :: amqqueue_v1:amqqueue_v1() | amqqueue_v2().
-type amqqueue_v2() :: #amqqueue{
name :: rabbit_amqqueue:name(), %% 队列名称
durable :: boolean(), %% 是否为持久化队列
auto_delete :: boolean(), %% 是否自动删除
exclusive_owner :: pid() | none,
arguments :: rabbit_framing:amqp_table(), %% 属性参数
pid :: pid() | ra_server_id() | none,
slave_pids :: [pid()] | none,
sync_slave_pids :: [pid()] | none,
recoverable_slaves :: [atom()] | none,
policy :: binary() | none | undefined,
operator_policy :: binary() | none | undefined,
gm_pids :: [pid()] | none,
decorators :: [atom()] | none | undefined,
state :: atom() | none,
policy_version :: non_neg_integer(), %% 策略版本
slave_pids_pending_shutdown :: [pid()],
vhost :: rabbit_types:vhost() | undefined, %% 所在的虚拟主机
options :: map(),
type :: atom(), %% 什么类型:disk还是ram的
type_state :: #{}
}.
消息的确认:
消息队列消费函数的声明:
消费消息实现1:
消费消息实现2:
消费消息实现3:
basic_consume(Q, _NoAck, _ChPid,
_LimiterPid, true, _ConsumerPrefetchCount, _ConsumerTag,
_ExclusiveConsume, _Args, _OkMsg, _ActingUser, _QStates)
when ?amqqueue_is_quorum(Q) ->
{error, global_qos_not_supported_for_queue_type};
主动拉消息函数声明:
声明实现1:
声明实现2:
消息的确认:
消息的重新入列:
消息失效时间的计算:
消息的发送确认:
confirm_messages(MsgIds, MTC) ->
{CMs, MTC1} =
lists:foldl(
fun(MsgId, {CMs, MTC0}) ->
case maps:get(MsgId, MTC0, none) of
none ->
{CMs, MTC0};
{SenderPid, MsgSeqNo} ->
{maps:update_with(SenderPid,
fun(MsgSeqNos) ->
[MsgSeqNo | MsgSeqNos]
end,
[MsgSeqNo],
CMs),
maps:remove(MsgId, MTC0)}
end
end, {#{}, MTC}, MsgIds),
maps:fold(
fun(Pid, MsgSeqNos, _) ->
rabbit_misc:confirm_to_sender(Pid, MsgSeqNos)
end,
ok,
CMs),
MTC1.
死信:
# 4.2 交换器
# direct交换器
rabbit_exchange_type_direct.erl
看其中的路由方法:
# fanout交换器
rabbit_exchange_type_fanout.erl
看其中的路由方法:
# head交换器
rabbit_exchange_type_headers.erl
看其中的路由方法:
# topic交换器
rabbit_exchange_type_topic.erl
看其中的路由方法:
# 4.3 持久化
消息流转示意图:
rabbit_channel进程确定了消息将要投递的目标队列,rabbit_amqqueue_process是队列进程,每个队列都有一个对应的进程,实际上rabbit_amqqueue_process进程只是提供了逻辑上对队列的相关操作,他的真正操作是通过调用指定的backing_queue模块提供的相关接口实现的,默认情况该backing_queue的实现模块为rabbit_variable_queue。 RabbitMQ队列中的消息随着系统的负载会不断的变化,一个消息可能会处于以下4种状态:
%% Definitions:
%%
%% alpha: this is a message where both the message itself, and its
%% position within the queue are held in RAM(消息本身和消息位置索引都只在内存中)
%%
%% beta: this is a message where the message itself is only held on
%% disk (if persisted to the message store) but its position
%% within the queue is held in RAM.(消息本身存储在磁盘中,但是消息的位置索引存在内存中)
%%
%% gamma: this is a message where the message itself is only held on
%% disk, but its position is both in RAM and on disk.(消息本身存储在磁盘中,但是消息的 位置索引存在内存中和磁盘中)
%%
%% delta: this is a collection of messages, represented by a single
%% term, where the messages and their position are only held on
%% disk.(消息本身和消息的位置索引都值存储在磁盘中)
对于普通的没有设置优先级和镜像的队列来说,backing_queue的默认实现是
rabbit_variable_queue,其内部通过5个子队列Q1、Q2、Delta、Q3和Q4来实现这4个状态的转换,其关系如下图所示:
其中Q1、Q4只包含alpha状态的消息,Q2和Q3包含Beta和gamma状态的消息,Delta只包含
delta状态的消息。具体消息的状态转换后续会进行源码分析。
# 1 消息入队分析
rabbit_amqqueue_process对消息的主要处理逻辑位于deliver_or_enqueue函数,该方法将消息直接传递给消费者,或者将消息存储到队列当中。
整体处理逻辑如下:
- 首先处理消息的mandory标志,和confirm属性。mandatory标志告诉服务器至少将该消息
route到一个队列中,否则将消息返还给生产者。confirm则是消息的发布确认。
- 然后判断队列中是否有消费者正在等待,如果有则直接调用backing_queue的接口给客户端发
送消息。
- 如果队列上没有消费者,根据当前相关设置判断消息是否需要丢弃,不需要丢弃的情况下调用
backing_queue的接口将消息入队。
deliver_or_enqueue函数代码:
如果调用到该方法的BQ:publish则说明当前队列没有消费者正在等待,消息将进入到队列。
backing_queue实现了消息的存储,他会尽力会durable=true的消息做持久化存储。初始默认情况下,非持久化消息直接进入内存队列,此时效率最高,当内存占用逐渐达到一个阈值时,消息和消息索引逐渐往磁盘中移动,随着消费者的不断消费,内存占用的减少,消息逐渐又从磁盘中被转到内存队列中。
消息在这些Queue中传递的"一般"过程q1->q2->delta->q3->q4,一般负载较轻的情况消息不需要
走完每个Queue,大部分都可以跳过。rabbit_variable_queue中消息的入队接口源码如下:
消息入队时先判断Q3是否为空,如果Q3为空,则直接进入Q4,否则进入Q1,这里思考下为什么?
假如Q3为空,Delta一定为空,因为假如Delta不为空,那么Q3取出最后一个消息的时候Delta已经
把消息转移到Q3了,这样Q3就不是空了,前后矛盾因此Delta一定是空的。同理可以推测出Q2、Q1都是空的,直接把消息放入Q4即可。
消息入队后,需要判断内存使用,调用reduce_memory_use函数:
每次入队消息后,判断RabbitMQ系统中使用的内存是否过多,此操作是尝试将内存中的队列数据
写入到磁盘中. 内存中的消息数量(RamMsgCount)及内存中的等待ack的消息数量(RamAckIndex)的和大于允许的内存消息数量(TargetRamCount)时,多余数量的消息内容会被写到磁盘中.
# 2 消息出队源码分析
获取消息:
- 尝试从q4队列中获取一个消息,如果成功,则返回获取到的消息,如果失败,则尝试通过试
用fetch_from_q3/1从q3队列获取消息,成功则返回,如果为空则返回空;
- 注意fetch_from_q3从Q3获取消息,如果Q3为空,则说明整个队列都是空的,无消息,消费
者等待即可。
取出消息后:
- 如果Q4不为空,取出消息后直接返回;
- 如果Q4为空,Q3不为空,从Q3取出消息后,判断Q3是否为空,如果Q3为空,Delta不为
空,则将Delta中的消息转移到Q3中,下次直接从Q3消费;
- 如果Q3和Delta都是空的,则可以任务Delta和Q2的消息都是空的,此时将Q1的消息转移到
Q4,下次直接从Q4消费即可。
# 3 总结
节点消息堆积较多时,这些堆积的消息很快就会进入很深的队列中去,这样会增加处理每个消息的
平均开销,整个系统的处理能力就会降低。因为要花更多的时间和资源处理堆积的消息,后流入的消息
又被挤压到很深的队列中了,系统负载越来越恶化。
因此RabbitMQ使用时一定要注意磁盘占用监控和流控监控,这些在控制台上都可以看到,一般来
说如果消息堆积过多建议增加消费者或者增强每个消费者的消费能力(比如调高prefetch_count消费者一次收到的消息可以提高单个消费者消费能力)。
# 4.4 启动过程
看启动过程源码:
首先我们从一个脚本开始:启动RabbitMQ需要使用脚本:rabbitmq-server
在172行调用了start_rabbitmq_server函数
该函数在92行设置了RABBITMQ_START_RABBIT变量的值。
为什么是92行?因为在81行判断了环境变量USE_RABBIT_BOOT_SCRIPT的值,false。因为没有
USE_RABBIT_BOOT_SCRIPT环境变量。
这里在执行rabbitmq-server脚本的时候,在该脚本执行了一遍rabbitmq-env脚本,在rabbitmq-env中执行了一遍rabbitmq-defaults脚本。看完后发现没有USE_RABBIT_BOOT_SCRIPT。
在第110行使用了RABBITMQ_START_RABBIT的扩展,也就是 -s rabbit boot ,它表示erlang要
调用rabbit模块的boot函数。
模块中的boot函数:
在模块中导出了boot/0函数。
boot/0函数的具体实现:
调用了start_it(transient)函数,参数的值就是transient。
start_it函数首先调用了spawn_boot_marker()函数,然后对其结果做分支匹配。
spawn_boot_marker()函数:
该函数什么也不做,仅仅是注册了一个进程,标志着RabbitMQ正在启动中。。。远程RabbitMQ
节点可以访问到这个状态。
该函数中调用了register函数,注册进程。如果注册成功了,则开始启动RabbitMQ,如果注册失
败,则表示RabbitMQ已经在启动中了。
首先确保该模块已经启动成功了。
Erlang内核的application.erl:
ensure_all_started函数如下:
函数中调用了ensure_all_started函数:
首先start该应用:rabbitmq_prelaunch
启动成功了,就返回{ok, [Application|Started]}
rabbit应用的ensure_all_started也是这个流程。
如果一切正常,rabbit和rabbitmq_prelaunch就都启动成功了。
启动的时候要回调rabbit的方法:start
该方法中:
调用了run_prelaunch_second_phase()函数:
# 4.5 消息的发送
我的架构梦:(七十二) 消息中间件之RabbitMQ的消息发送的源码分析 (opens new window)
使用channel.basicPublish()方法发送消息:
该抽象方法有如下实现:
究竟是AutorecoveringChannel还是ChannelN还是PublisherCallbackChannelImpl,要看设置。
我们经常用的是ChannelN:
比如发送消息:
要看channel的来源:
查看createChannel方法的具体实现:
究竟是AMQConnection还是AutorecoveringConnection?
需要看
打开该方法的实现:
看newConnection方法的实现:
看newConnection方法的实现:
看newConnection方法的实现:
看哪里返回了Connection对象:
1131行返回Connection的AutorecoveringConnection对象。
前提是isAutomaticRecoveryEnabled()方法返回true。
该方法何时返回true?
如果在创建ConnectioFactory的时候设置了setAutomaticRecoveryEnabled为true,则1130行的
AutorecoveringConnection对象返回。
1141行返回AMQConnection对象。 在AMQConnection类中,查看createChannel()方法返回的Channel是哪个实现:
上述源码中,需要查看_channelManager的createChannel方法的返回值。
首先需要知道_channelManager是哪个类的对象:
通过搜索发现只有414行给_channelManager赋值。通过调用instantiateChannelManager方法赋值的。
看instantiateChannelManager的实现:
该方法有两个实现,我们查看AMQConnection中的实现:
此处使用的是ChannelManager类。
回到前面:
看该类的createChannel方法返回的是哪个对象:
实现一:
实现二:
两个实现的区别在于有没有传递通道编号。
回到前面:
我们在发送消息的时候调用basicPublish方法,实际上就是ChannelN的方法:
ChannelN中三个重载的basicPublish方法:
第一个方法:
第二个方法:
第三个方法:
最终调用的都是第三个实现。
在第三个实现中,
如果没有设置消息头,则设置最基本的消息头设置:
其中,
- contentType表示内容类型,也就是MIME类型。
- contentEncoding表示编码类型:如UTF-8
- headers表示用户自定义的消息属性,键值对形式,Map<String, Object>
- deliveryMode表示消息投递的模式,1表示瞬时消息,2表示持久化消息
- priority表示消息的优先级,0~9,数字越大,优先级越高。
- correlationId表示关联ID,一般用在RabbitMQ的请求/响应模式中,关联请求消息的ID
- replyTo表示RabbitMQ的请求/响应模式中,响应消息要发送到的消息队列。
- expiration表示消息的过期时间
- messageId每个消息都有一个消息ID,该ID值要么手动设置,要么由系统自动生成,用于唯一标识消息。
- timestamp表示消息被发送的时间戳。这个时间戳并不是精确的消息被发送出的时间,而是在消息放到发送队列到发送完成之间的任何时间。
- type消息的类型,通常用语指定消息的序列化反序列化类型。
- userId使用user-id属性来标识已登录的用户
- appId在处理消息之前检查app-id允许应用程序丢弃那些来源不明或者不受支持的消息
- clusterId:AMQP 0-9-1将cluster-id属性重新命名为reserved,并声明它必须为空,虽然RabbitMQ目前没有根据规范要求它是空的,但是最好规避这个属性。
AMQCommand是AMQP规定的命令,用于跟RabbitMQ交互。命令中指定具体的操作,比如上文
中命令的属性是Basic.Publish,也就是AMQP的发布消息。
mandatory表示如果一个消息无法被交换器路由,则如果该值设置为0,则服务器悄无声息的丢
弃,否则使用AMQP的Return退还给发布者。
immediate如果该值设置为0,则当消息一到达交换器,就立即投递给消费者。如果消费者不在线
或不能立即投递给消费者,则服务器无法保证该消息被消费。如果设置为1,则如果消息不能被立即投递给消费者,则使用AMQP的Return命令退还给发布者。
transmit用于执行该命令,发布消息。
最后一行用于发送统计消息。
transmit方法的实现:
quiescingTransmit()用于执行AMQP命令:
要在通道上执行命令,首先获取通道的共享锁,实际上就是一个Connection可以有多个通道来操
作,每个通道属于一个线程,连接是多线程共享的,因此需要获取该共享锁,以操作Connection。在
获取锁之后,如果此时发送线程需要阻塞,就让共享锁等待,直到被唤醒。
c.transmit(this)用于通过通道执行命令:
public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();
synchronized(this.assembler) {
com.rabbitmq.client.impl.Method m = this.assembler.getMethod();
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, (long)body.length);
// 获取协议协商的帧大小
int frameMax = connection.getFrameMax();
boolean cappedFrameMax = frameMax > 0;
int bodyPayloadMax = cappedFrameMax ? frameMax - 8 : body.length;
// 如果消息头帧大小大于协议协商的帧大小,则抛异常。
if (cappedFrameMax && headerFrame.size() > frameMax) {
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
throw new IllegalArgumentException(msg);
}
// 发送要执行的AMQP方法
connection.writeFrame(m.toFrame(channelNumber));
// 发送消息头帧
connection.writeFrame(headerFrame);
// 封装消息帧,有可能有多个消息帧需要发送
for(int offset = 0; offset < body.length; offset += bodyPayloadMax) {
int remaining = body.length - offset;
int fragmentLength = remaining < bodyPayloadMax ? remaining : bodyPayloadMax;
Frame frame = Frame.fromBodyFragment(channelNumber, body, offset, fragmentLength);
// 发送消息体帧,有可能多个
connection.writeFrame(frame);
}
} else {
// 如果要执行的AMQP方法没有数据,则只发送命令帧。
connection.writeFrame(m.toFrame(channelNumber));
}
}
connection.flush();
}
connection.writeFrame将消息帧发送到哪里了?
注释说直接将消息帧发送给broker,但实际上并非如此。
_frameHandler.writeFrame(f)用于写消息帧,写到哪里了?
有两个实现,究竟是哪个?需要判断。且看_frameHandler的源码:
该属性只在构造器中初始化过,传过来的frameHandler是哪个?
要看在哪里创建AMQConnection对象的。
调用方法createConnection创建AMQConnection对象:
下图的FrameHandler对象是哪个?
两个实现,该用哪个?
看fhFactory是哪个:
看createFrameHandlerFactory的实现:
如果使用nio,则是SocketChannelFrameHandlerFactory,否则死
SocketFrameHandlerFactory。
看nio的值:
究竟调用了什么方法?
useNio还是useBlockingIo?
在我们的代码中可以手动调用。默认nio的值是:
默认是false,非nio。
默认使用的就是SocketFrameHandlerFactory这个类。
首先查看SocketFrameHandler的writeFrame实现:
由于是阻塞IO,此处直接使用输出流输出:
输出流:_outputStream的赋值:
平淡无奇。
frame.writeTo(_outputStream)的实现:
# 4.6 消息的消费
我的架构梦:(七十三) 消息中间件之RabbitMQ的消息消费的源码分析 (opens new window)
# 两种方式:推拉
拉消息:
推消息:
# 拉消息的代码实现
上图中,basicGet的具体实现是哪个?
现在的Channel究竟是哪个类型?ChannelN还是AurecoveringChannel?
看该方法的返回值
该方法在两个类中都存在,需要查看ConnectionFactory的方法返回的是哪个Connection:
如果isAutomaticRecoveryEnabled()返回true,则返回的Connection是
AutorecoveringConnection的实例。
如果isAutomaticRecoveryEnabled()返回false, 则返回的是:
看createConnection方法的返回值是什么类型的:
就是AMQConnection类型的对象。
最简单的判断方式就是直接打印connection的class信息:
发现connection是AutoreceoveringConnection类型的对象。
isAutomaticRecoveryEnabled()返回的是true还是false?
ConnectionFactory类中该属性的默认值是true。
还有两处涉及到变量automaticRecovery的:
上图中的方法表示可以使用ConnectionFactory对象设置是否启用自动恢复特性。
默认Connection是AutorecoveringConnection类型的对象。
看下面的代码中channel的类型:
最终的返回值是wrapChannel方法调用的返回值:
我们使用的channel的默认类型是AutorecoveringChannel。
看AutorecoveringChannel的basicGet实现:
delegate是哪个?
ChannelN.java中1149行是该方法的实现:
public GetResponse basicGet(String queue, boolean autoAck) throws IOException {
validateQueueNameLength(queue);
// 发送RPC请求,返回AMQCommand响应信息。
AMQCommand replyCommand = this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Basic.Get.Builder()).queue(queue).noAck(autoAck).build());
// 获取响应的方法
Method method = replyCommand.getMethod();
// 如果响应的方法是Basic.Ok类型的,则表示获取消息成功
if (method instanceof GetOk) {
// 向下转型
GetOk getOk = (GetOk)method;
// 使用Envelop封装响应的信息,包括消息ID,是否是重发的,交换器,路由键。
Envelope envelope = new Envelope(getOk.getDeliveryTag(), getOk.getRedelivered(), getOk.getExchange(), getOk.getRoutingKey());
// 获取消息的属性
BasicProperties props = (BasicProperties)replyCommand.getContentHeader();
// 获取消息体内容
byte[] body = replyCommand.getContentBody();
// 获取basic.getok.messagecount的值,此处是5
int messageCount = getOk.getMessageCount();
this.metricsCollector.consumedMessage(this, getOk.getDeliveryTag(), autoAck);
// 实例化GetResponse对象并赋值返回。
return new GetResponse(envelope, props, body, messageCount);
} else if (method instanceof GetEmpty) {
return null;
} else {
throw new UnexpectedMethodError(method);
}
}
如何获取消息的?发送RPC请求:
看该方法的实现:
privateRpc(m)的实现:
上述代码中,rpc(m, k)发送请求消息。
k.getReply()方法是一个阻塞的方法,等待broker返回响应。
rpc方法的具体私实现:
quiescingRpc(m, k)的具体实现:
enqueueRpc(k)具体实现:
=
我们使用的channel的默认实现是:AutorecoveringChannel,该类中包含
RecoveryAwareChannelN是
ChannelN的子类。
ChannelN又是AMQChannel的子类。
所以enqueueRpc方法应该看ChannelN的实现方式:
调用了父类的enqueueRpc方法是父类的:
看doEnqueueRpc的具体实现:
# 推消息的代码实现
看basicConsume的具体实现:
ChannelN中1343行代码:
public String basicConsume(String queue, final boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, final Consumer callback) throws IOException {
// 构建请求方法
Method m = (new com.rabbitmq.client.AMQP.Basic.Consume.Builder()).queue(queue).consumerTag(consumerTag).noLocal(noLocal).noAck(autoAck).exclusive(exclusive).arguments(arguments).build();
BlockingRpcContinuation<String> k = new BlockingRpcContinuation<String>(m) {
public String transformReply(AMQCommand replyCommand) {
String actualConsumerTag = ((ConsumeOk)replyCommand.getMethod()).getConsumerTag();
ChannelN.this._consumers.put(actualConsumerTag, callback);
// need to register consumer in stats before it actually starts consuming
ChannelN.this.metricsCollector.basicConsume(ChannelN.this, actualConsumerTag, autoAck);
ChannelN.this.dispatcher.handleConsumeOk(callback, actualConsumerTag);
return actualConsumerTag;
}
};
this.rpc(m, k);
try {
if (this._rpcTimeout == 0) {
return (String)k.getReply();
} else {
try {
return (String)k.getReply(this._rpcTimeout);
} catch (TimeoutException var11) {
throw this.wrapTimeoutException(m, var11);
}
}
} catch (ShutdownSignalException var12) {
throw wrap(var12);
}
}