跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

    • RabbitMQ

      • RabbitMQ基础

      • RabbitMQ高级

        • RabbitMQ架构与实战
        • RabbitMQ高级特性解析
        • RabbitMQ集群与运维
        • RabbitMQ源码剖析
          • direct交换器
          • fanout交换器
          • head交换器
          • topic交换器
          • 1 消息入队分析
          • 2 消息出队源码分析
          • 3 总结
          • 两种方式:推拉
          • 拉消息的代码实现
          • 推消息的代码实现
    • RocketMQ

  • Zookeeper

  • java组件
  • 消息队列
  • RabbitMQ
  • RabbitMQ高级
Revin
2023-07-23
目录

RabbitMQ源码剖析

# 4.1 队列

RabbitMQ_Page127_002

声明队列记录:

-record(amqqueue, {
      name :: rabbit_amqqueue:name() | '_', %% immutable
      durable :: boolean() | '_', %% immutable
      auto_delete :: boolean() | '_', %% immutable
      exclusive_owner = none :: pid() | none | '_', %% immutable
      arguments = [] :: rabbit_framing:amqp_table() | '_', %% immutable
      pid :: pid() | ra_server_id() | none | '_', %% durable (just so we
      %% know home node)
      slave_pids = [] :: [pid()] | none | '_', %% transient
      sync_slave_pids = [] :: [pid()] | none| '_',%% transient
      recoverable_slaves = [] :: [atom()] | none | '_', %% durable
      policy :: binary() | none | undefined | '_', %% durable, implicit
      %% update as above
      operator_policy :: binary() | none | undefined | '_', %% durable,
      %% implicit
      %% update
      %% as above
      gm_pids = [] :: [{pid(), pid()} | pid()] | none | '_', %% transient
      decorators :: [atom()] | none | undefined | '_', %% transient,
      %% recalculated
      %% as above
      state = live :: atom() | none | '_', %% durable (have we crashed?)
      policy_version = 0 :: non_neg_integer() | '_',
      slave_pids_pending_shutdown = [] :: [pid()] | '_',
      vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index
      options = #{} :: map() | '_',
      type = ?amqqueue_v1_type :: module() | '_',
      type_state = #{} :: map() | '_'
}).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

类型声明:

-type amqqueue() :: amqqueue_v1:amqqueue_v1() | amqqueue_v2().
-type amqqueue_v2() :: #amqqueue{
    name :: rabbit_amqqueue:name(), %% 队列名称
    durable :: boolean(), %% 是否为持久化队列
    auto_delete :: boolean(), %% 是否自动删除
    exclusive_owner :: pid() | none,
    arguments :: rabbit_framing:amqp_table(), %% 属性参数
    pid :: pid() | ra_server_id() | none,
    slave_pids :: [pid()] | none,
    sync_slave_pids :: [pid()] | none,
    recoverable_slaves :: [atom()] | none,
    policy :: binary() | none | undefined,
    operator_policy :: binary() | none | undefined,
    gm_pids :: [pid()] | none,
    decorators :: [atom()] | none | undefined,
    state :: atom() | none,
    policy_version :: non_neg_integer(), %% 策略版本
    slave_pids_pending_shutdown :: [pid()],
    vhost :: rabbit_types:vhost() | undefined, %% 所在的虚拟主机
    options :: map(),
    type :: atom(), %% 什么类型:disk还是ram的
    type_state :: #{}
}.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

RabbitMQ_Page129_001

消息的确认:

RabbitMQ_Page129_002

消息队列消费函数的声明:

RabbitMQ_Page129_003

消费消息实现1:

RabbitMQ_Page130_001

消费消息实现2:

image-20230723193439440

消费消息实现3:

 basic_consume(Q, _NoAck, _ChPid,
           _LimiterPid, true, _ConsumerPrefetchCount, _ConsumerTag,
           _ExclusiveConsume, _Args, _OkMsg, _ActingUser, _QStates)
          when ?amqqueue_is_quorum(Q) ->
          {error, global_qos_not_supported_for_queue_type};
1
2
3
4
5

主动拉消息函数声明:

RabbitMQ_Page131_001

声明实现1:

RabbitMQ_Page131_002

声明实现2:

RabbitMQ_Page131_003

消息的确认:

RabbitMQ_Page131_004

消息的重新入列:

RabbitMQ_Page132_001

消息失效时间的计算:

RabbitMQ_Page132_002

消息的发送确认:

confirm_messages(MsgIds, MTC) ->
    {CMs, MTC1} =
        lists:foldl(
          fun(MsgId, {CMs, MTC0}) ->
                  case maps:get(MsgId, MTC0, none) of
                      none ->
                          {CMs, MTC0};
                      {SenderPid, MsgSeqNo} ->
                          {maps:update_with(SenderPid,
                                            fun(MsgSeqNos) ->
                                                [MsgSeqNo | MsgSeqNos]
                                            end,
                                            [MsgSeqNo],
                                            CMs),
                           maps:remove(MsgId, MTC0)}
                  end
          end, {#{}, MTC}, MsgIds),
    maps:fold(
        fun(Pid, MsgSeqNos, _) ->
            rabbit_misc:confirm_to_sender(Pid, MsgSeqNos)
        end,
        ok,
        CMs),
  MTC1.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

死信:

RabbitMQ_Page133_001

# 4.2 交换器

# direct交换器

rabbit_exchange_type_direct.erl

看其中的路由方法:

RabbitMQ_Page133_002

# fanout交换器

rabbit_exchange_type_fanout.erl

看其中的路由方法:

RabbitMQ_Page133_003

# head交换器

rabbit_exchange_type_headers.erl

看其中的路由方法:

RabbitMQ_Page133_004

# topic交换器

rabbit_exchange_type_topic.erl

看其中的路由方法:

RabbitMQ_Page134_001

# 4.3 持久化

消息流转示意图:

RabbitMQ_Page134_002

rabbit_channel进程确定了消息将要投递的目标队列,rabbit_amqqueue_process是队列进程,每个队列都有一个对应的进程,实际上rabbit_amqqueue_process进程只是提供了逻辑上对队列的相关操作,他的真正操作是通过调用指定的backing_queue模块提供的相关接口实现的,默认情况该backing_queue的实现模块为rabbit_variable_queue。 RabbitMQ队列中的消息随着系统的负载会不断的变化,一个消息可能会处于以下4种状态:

%% Definitions:
%%
%% alpha: this is a message where both the message itself, and its
%% 		  position within the queue are held in RAM(消息本身和消息位置索引都只在内存中)
%%
%% beta: this is a message where the message itself is only held on
%%       disk (if persisted to the message store) but its position
%%       within the queue is held in RAM.(消息本身存储在磁盘中,但是消息的位置索引存在内存中)
%%
%% gamma: this is a message where the message itself is only held on
%% 		  disk, but its position is both in RAM and on disk.(消息本身存储在磁盘中,但是消息的  	位置索引存在内存中和磁盘中)
%%
%% delta: this is a collection of messages, represented by a single
%%        term, where the messages and their position are only held on
%% 		  disk.(消息本身和消息的位置索引都值存储在磁盘中)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

对于普通的没有设置优先级和镜像的队列来说,backing_queue的默认实现是

rabbit_variable_queue,其内部通过5个子队列Q1、Q2、Delta、Q3和Q4来实现这4个状态的转换,其关系如下图所示:

RabbitMQ_Page135_001

其中Q1、Q4只包含alpha状态的消息,Q2和Q3包含Beta和gamma状态的消息,Delta只包含

delta状态的消息。具体消息的状态转换后续会进行源码分析。

# 1 消息入队分析

rabbit_amqqueue_process对消息的主要处理逻辑位于deliver_or_enqueue函数,该方法将消息直接传递给消费者,或者将消息存储到队列当中。

整体处理逻辑如下:

  1. 首先处理消息的mandory标志,和confirm属性。mandatory标志告诉服务器至少将该消息

route到一个队列中,否则将消息返还给生产者。confirm则是消息的发布确认。

  1. 然后判断队列中是否有消费者正在等待,如果有则直接调用backing_queue的接口给客户端发

送消息。

  1. 如果队列上没有消费者,根据当前相关设置判断消息是否需要丢弃,不需要丢弃的情况下调用

backing_queue的接口将消息入队。

deliver_or_enqueue函数代码:

image

image

image

如果调用到该方法的BQ:publish则说明当前队列没有消费者正在等待,消息将进入到队列。

backing_queue实现了消息的存储,他会尽力会durable=true的消息做持久化存储。初始默认情况下,非持久化消息直接进入内存队列,此时效率最高,当内存占用逐渐达到一个阈值时,消息和消息索引逐渐往磁盘中移动,随着消费者的不断消费,内存占用的减少,消息逐渐又从磁盘中被转到内存队列中。

消息在这些Queue中传递的"一般"过程q1->q2->delta->q3->q4,一般负载较轻的情况消息不需要

走完每个Queue,大部分都可以跳过。rabbit_variable_queue中消息的入队接口源码如下:

image

消息入队时先判断Q3是否为空,如果Q3为空,则直接进入Q4,否则进入Q1,这里思考下为什么?

假如Q3为空,Delta一定为空,因为假如Delta不为空,那么Q3取出最后一个消息的时候Delta已经

把消息转移到Q3了,这样Q3就不是空了,前后矛盾因此Delta一定是空的。同理可以推测出Q2、Q1都是空的,直接把消息放入Q4即可。

消息入队后,需要判断内存使用,调用reduce_memory_use函数:

image

image

image

image

image

每次入队消息后,判断RabbitMQ系统中使用的内存是否过多,此操作是尝试将内存中的队列数据

写入到磁盘中. 内存中的消息数量(RamMsgCount)及内存中的等待ack的消息数量(RamAckIndex)的和大于允许的内存消息数量(TargetRamCount)时,多余数量的消息内容会被写到磁盘中.

# 2 消息出队源码分析

获取消息:

  1. 尝试从q4队列中获取一个消息,如果成功,则返回获取到的消息,如果失败,则尝试通过试

用fetch_from_q3/1从q3队列获取消息,成功则返回,如果为空则返回空;

  1. 注意fetch_from_q3从Q3获取消息,如果Q3为空,则说明整个队列都是空的,无消息,消费

者等待即可。

取出消息后:

  1. 如果Q4不为空,取出消息后直接返回;
  2. 如果Q4为空,Q3不为空,从Q3取出消息后,判断Q3是否为空,如果Q3为空,Delta不为

空,则将Delta中的消息转移到Q3中,下次直接从Q3消费;

  1. 如果Q3和Delta都是空的,则可以任务Delta和Q2的消息都是空的,此时将Q1的消息转移到

Q4,下次直接从Q4消费即可。

image

img

image

image

image

# 3 总结

节点消息堆积较多时,这些堆积的消息很快就会进入很深的队列中去,这样会增加处理每个消息的

平均开销,整个系统的处理能力就会降低。因为要花更多的时间和资源处理堆积的消息,后流入的消息

又被挤压到很深的队列中了,系统负载越来越恶化。

因此RabbitMQ使用时一定要注意磁盘占用监控和流控监控,这些在控制台上都可以看到,一般来

说如果消息堆积过多建议增加消费者或者增强每个消费者的消费能力(比如调高prefetch_count消费者一次收到的消息可以提高单个消费者消费能力)。

# 4.4 启动过程

看启动过程源码:

首先我们从一个脚本开始:启动RabbitMQ需要使用脚本:rabbitmq-server

RabbitMQ_Page143_001

在172行调用了start_rabbitmq_server函数

RabbitMQ_Page143_002

RabbitMQ_Page144_001

该函数在92行设置了RABBITMQ_START_RABBIT变量的值。

为什么是92行?因为在81行判断了环境变量USE_RABBIT_BOOT_SCRIPT的值,false。因为没有

USE_RABBIT_BOOT_SCRIPT环境变量。

这里在执行rabbitmq-server脚本的时候,在该脚本执行了一遍rabbitmq-env脚本,在rabbitmq-env中执行了一遍rabbitmq-defaults脚本。看完后发现没有USE_RABBIT_BOOT_SCRIPT。

RabbitMQ_Page144_002

在第110行使用了RABBITMQ_START_RABBIT的扩展,也就是 -s rabbit boot ,它表示erlang要

调用rabbit模块的boot函数。

RabbitMQ_Page145_001

RabbitMQ_Page145_002

模块中的boot函数:

RabbitMQ_Page145_003

在模块中导出了boot/0函数。

boot/0函数的具体实现:

RabbitMQ_Page145_004

调用了start_it(transient)函数,参数的值就是transient。

RabbitMQ_Page145_005

start_it函数首先调用了spawn_boot_marker()函数,然后对其结果做分支匹配。

spawn_boot_marker()函数:

RabbitMQ_Page146_001

该函数什么也不做,仅仅是注册了一个进程,标志着RabbitMQ正在启动中。。。远程RabbitMQ

节点可以访问到这个状态。

该函数中调用了register函数,注册进程。如果注册成功了,则开始启动RabbitMQ,如果注册失

败,则表示RabbitMQ已经在启动中了。

RabbitMQ_Page146_002

RabbitMQ_Page147_001

首先确保该模块已经启动成功了。

Erlang内核的application.erl:

ensure_all_started函数如下:

RabbitMQ_Page147_002

函数中调用了ensure_all_started函数:

RabbitMQ_Page147_003

首先start该应用:rabbitmq_prelaunch

启动成功了,就返回{ok, [Application|Started]}

rabbit应用的ensure_all_started也是这个流程。

如果一切正常,rabbit和rabbitmq_prelaunch就都启动成功了。

启动的时候要回调rabbit的方法:start

RabbitMQ_Page148_001

该方法中:

调用了run_prelaunch_second_phase()函数:

RabbitMQ_Page148_002

RabbitMQ_Page149_001

# 4.5 消息的发送

我的架构梦:(七十二) 消息中间件之RabbitMQ的消息发送的源码分析 (opens new window)

使用channel.basicPublish()方法发送消息:

RabbitMQ_Page149_002

该抽象方法有如下实现:

RabbitMQ_Page150_001

究竟是AutorecoveringChannel还是ChannelN还是PublisherCallbackChannelImpl,要看设置。

我们经常用的是ChannelN:

比如发送消息:

RabbitMQ_Page150_002

要看channel的来源:

RabbitMQ_Page150_003

查看createChannel方法的具体实现:

RabbitMQ_Page150_004

究竟是AMQConnection还是AutorecoveringConnection?

需要看

RabbitMQ_Page150_005

打开该方法的实现:

RabbitMQ_Page151_001

看newConnection方法的实现:

RabbitMQ_Page151_002

看newConnection方法的实现:

RabbitMQ_Page151_003

看newConnection方法的实现:

RabbitMQ_Page151_004

看哪里返回了Connection对象:

RabbitMQ_Page151_005

1131行返回Connection的AutorecoveringConnection对象。

前提是isAutomaticRecoveryEnabled()方法返回true。

该方法何时返回true?

RabbitMQ_Page152_001

如果在创建ConnectioFactory的时候设置了setAutomaticRecoveryEnabled为true,则1130行的

AutorecoveringConnection对象返回。

RabbitMQ_Page152_002

1141行返回AMQConnection对象。 在AMQConnection类中,查看createChannel()方法返回的Channel是哪个实现:

RabbitMQ_Page152_003

上述源码中,需要查看_channelManager的createChannel方法的返回值。

首先需要知道_channelManager是哪个类的对象:

RabbitMQ_Page152_004

通过搜索发现只有414行给_channelManager赋值。通过调用instantiateChannelManager方法赋值的。

看instantiateChannelManager的实现:

RabbitMQ_Page153_001

该方法有两个实现,我们查看AMQConnection中的实现:

RabbitMQ_Page153_002

此处使用的是ChannelManager类。

回到前面:

看该类的createChannel方法返回的是哪个对象:

实现一:

RabbitMQ_Page153_003

实现二:

RabbitMQ_Page153_004

两个实现的区别在于有没有传递通道编号。

回到前面:

我们在发送消息的时候调用basicPublish方法,实际上就是ChannelN的方法:

RabbitMQ_Page154_001

ChannelN中三个重载的basicPublish方法:

第一个方法:

RabbitMQ_Page154_002

第二个方法:

RabbitMQ_Page154_003

第三个方法:

RabbitMQ_Page154_004

最终调用的都是第三个实现。

在第三个实现中,

RabbitMQ_Page155_001

如果没有设置消息头,则设置最基本的消息头设置:

RabbitMQ_Page155_002

其中,

  • contentType表示内容类型,也就是MIME类型。
  • contentEncoding表示编码类型:如UTF-8
  • headers表示用户自定义的消息属性,键值对形式,Map<String, Object>
  • deliveryMode表示消息投递的模式,1表示瞬时消息,2表示持久化消息
  • priority表示消息的优先级,0~9,数字越大,优先级越高。
  • correlationId表示关联ID,一般用在RabbitMQ的请求/响应模式中,关联请求消息的ID
  • replyTo表示RabbitMQ的请求/响应模式中,响应消息要发送到的消息队列。
  • expiration表示消息的过期时间
  • messageId每个消息都有一个消息ID,该ID值要么手动设置,要么由系统自动生成,用于唯一标识消息。
  • timestamp表示消息被发送的时间戳。这个时间戳并不是精确的消息被发送出的时间,而是在消息放到发送队列到发送完成之间的任何时间。
  • type消息的类型,通常用语指定消息的序列化反序列化类型。
  • userId使用user-id属性来标识已登录的用户
  • appId在处理消息之前检查app-id允许应用程序丢弃那些来源不明或者不受支持的消息
  • clusterId:AMQP 0-9-1将cluster-id属性重新命名为reserved,并声明它必须为空,虽然RabbitMQ目前没有根据规范要求它是空的,但是最好规避这个属性。

RabbitMQ_Page156_001

AMQCommand是AMQP规定的命令,用于跟RabbitMQ交互。命令中指定具体的操作,比如上文

中命令的属性是Basic.Publish,也就是AMQP的发布消息。

mandatory表示如果一个消息无法被交换器路由,则如果该值设置为0,则服务器悄无声息的丢

弃,否则使用AMQP的Return退还给发布者。

immediate如果该值设置为0,则当消息一到达交换器,就立即投递给消费者。如果消费者不在线

或不能立即投递给消费者,则服务器无法保证该消息被消费。如果设置为1,则如果消息不能被立即投递给消费者,则使用AMQP的Return命令退还给发布者。

transmit用于执行该命令,发布消息。

最后一行用于发送统计消息。

transmit方法的实现:

RabbitMQ_Page156_002

quiescingTransmit()用于执行AMQP命令:

RabbitMQ_Page157_001

要在通道上执行命令,首先获取通道的共享锁,实际上就是一个Connection可以有多个通道来操

作,每个通道属于一个线程,连接是多线程共享的,因此需要获取该共享锁,以操作Connection。在

获取锁之后,如果此时发送线程需要阻塞,就让共享锁等待,直到被唤醒。

c.transmit(this)用于通过通道执行命令:

RabbitMQ_Page157_002

public void transmit(AMQChannel channel) throws IOException {
    int channelNumber = channel.getChannelNumber();
    AMQConnection connection = channel.getConnection();
    synchronized(this.assembler) {
        com.rabbitmq.client.impl.Method m = this.assembler.getMethod();
        if (m.hasContent()) {
            byte[] body = this.assembler.getContentBody();
            Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, (long)body.length);
            // 获取协议协商的帧大小
            int frameMax = connection.getFrameMax();
            boolean cappedFrameMax = frameMax > 0;
            int bodyPayloadMax = cappedFrameMax ? frameMax - 8 : body.length;
            // 如果消息头帧大小大于协议协商的帧大小,则抛异常。
            if (cappedFrameMax && headerFrame.size() > frameMax) {
                String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
                throw new IllegalArgumentException(msg);
            }
			// 发送要执行的AMQP方法
            connection.writeFrame(m.toFrame(channelNumber));
            // 发送消息头帧
            connection.writeFrame(headerFrame);
			// 封装消息帧,有可能有多个消息帧需要发送
            for(int offset = 0; offset < body.length; offset += bodyPayloadMax) {
                int remaining = body.length - offset;
                int fragmentLength = remaining < bodyPayloadMax ? remaining : bodyPayloadMax;
                Frame frame = Frame.fromBodyFragment(channelNumber, body, offset, fragmentLength);
                // 发送消息体帧,有可能多个
                connection.writeFrame(frame);
            }
        } else {
        	// 如果要执行的AMQP方法没有数据,则只发送命令帧。
            connection.writeFrame(m.toFrame(channelNumber));
        }
    }
    connection.flush();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

connection.writeFrame将消息帧发送到哪里了?

RabbitMQ_Page158_001

注释说直接将消息帧发送给broker,但实际上并非如此。

_frameHandler.writeFrame(f)用于写消息帧,写到哪里了?

RabbitMQ_Page159_001

有两个实现,究竟是哪个?需要判断。且看_frameHandler的源码:

RabbitMQ_Page159_002

该属性只在构造器中初始化过,传过来的frameHandler是哪个?

要看在哪里创建AMQConnection对象的。

RabbitMQ_Page159_003

调用方法createConnection创建AMQConnection对象:

RabbitMQ_Page159_004

下图的FrameHandler对象是哪个?

RabbitMQ_Page159_005

RabbitMQ_Page160_001

两个实现,该用哪个?

看fhFactory是哪个:

RabbitMQ_Page160_002

看createFrameHandlerFactory的实现:

RabbitMQ_Page160_003

如果使用nio,则是SocketChannelFrameHandlerFactory,否则死

SocketFrameHandlerFactory。

看nio的值:

RabbitMQ_Page160_004

究竟调用了什么方法?

useNio还是useBlockingIo?

RabbitMQ_Page161_001

在我们的代码中可以手动调用。默认nio的值是:

RabbitMQ_Page161_002

默认是false,非nio。

默认使用的就是SocketFrameHandlerFactory这个类。

RabbitMQ_Page161_003

首先查看SocketFrameHandler的writeFrame实现:

RabbitMQ_Page161_004

由于是阻塞IO,此处直接使用输出流输出:

输出流:_outputStream的赋值:

RabbitMQ_Page161_005

平淡无奇。

frame.writeTo(_outputStream)的实现:

RabbitMQ_Page162_001

# 4.6 消息的消费

我的架构梦:(七十三) 消息中间件之RabbitMQ的消息消费的源码分析 (opens new window)

# 两种方式:推拉

拉消息:

RabbitMQ_Page162_002

推消息:

RabbitMQ_Page162_003

# 拉消息的代码实现

RabbitMQ_Page162_004

上图中,basicGet的具体实现是哪个?

RabbitMQ_Page163_001

现在的Channel究竟是哪个类型?ChannelN还是AurecoveringChannel?

RabbitMQ_Page163_002

看该方法的返回值

RabbitMQ_Page163_003

该方法在两个类中都存在,需要查看ConnectionFactory的方法返回的是哪个Connection:

RabbitMQ_Page163_004

RabbitMQ_Page163_005

如果isAutomaticRecoveryEnabled()返回true,则返回的Connection是

AutorecoveringConnection的实例。

如果isAutomaticRecoveryEnabled()返回false, 则返回的是:

RabbitMQ_Page163_006

看createConnection方法的返回值是什么类型的:

RabbitMQ_Page163_007

就是AMQConnection类型的对象。

最简单的判断方式就是直接打印connection的class信息:

RabbitMQ_Page164_001

发现connection是AutoreceoveringConnection类型的对象。

isAutomaticRecoveryEnabled()返回的是true还是false?

RabbitMQ_Page164_002

ConnectionFactory类中该属性的默认值是true。

还有两处涉及到变量automaticRecovery的:

RabbitMQ_Page164_003

RabbitMQ_Page164_004

上图中的方法表示可以使用ConnectionFactory对象设置是否启用自动恢复特性。

默认Connection是AutorecoveringConnection类型的对象。

看下面的代码中channel的类型:

RabbitMQ_Page164_005

RabbitMQ_Page165_001

最终的返回值是wrapChannel方法调用的返回值:

RabbitMQ_Page165_002

我们使用的channel的默认类型是AutorecoveringChannel。

RabbitMQ_Page165_003

看AutorecoveringChannel的basicGet实现:

RabbitMQ_Page165_004

delegate是哪个?

RabbitMQ_Page165_005

ChannelN.java中1149行是该方法的实现:

public GetResponse basicGet(String queue, boolean autoAck) throws IOException {
    validateQueueNameLength(queue);
    // 发送RPC请求,返回AMQCommand响应信息。
    AMQCommand replyCommand = this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Basic.Get.Builder()).queue(queue).noAck(autoAck).build());
    // 获取响应的方法
    Method method = replyCommand.getMethod();
    // 如果响应的方法是Basic.Ok类型的,则表示获取消息成功
    if (method instanceof GetOk) {
    	// 向下转型
        GetOk getOk = (GetOk)method;
        // 使用Envelop封装响应的信息,包括消息ID,是否是重发的,交换器,路由键。
        Envelope envelope = new Envelope(getOk.getDeliveryTag(), getOk.getRedelivered(), getOk.getExchange(), getOk.getRoutingKey());
        // 获取消息的属性
        BasicProperties props = (BasicProperties)replyCommand.getContentHeader();
        // 获取消息体内容
        byte[] body = replyCommand.getContentBody();
        // 获取basic.getok.messagecount的值,此处是5
        int messageCount = getOk.getMessageCount();
        this.metricsCollector.consumedMessage(this, getOk.getDeliveryTag(), autoAck);
        // 实例化GetResponse对象并赋值返回。
        return new GetResponse(envelope, props, body, messageCount);
    } else if (method instanceof GetEmpty) {
        return null;
    } else {
        throw new UnexpectedMethodError(method);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

如何获取消息的?发送RPC请求:

RabbitMQ_Page166_001

看该方法的实现:

RabbitMQ_Page167_001

privateRpc(m)的实现:

RabbitMQ_Page167_002

上述代码中,rpc(m, k)发送请求消息。

k.getReply()方法是一个阻塞的方法,等待broker返回响应。

RabbitMQ_Page167_003

rpc方法的具体私实现:

RabbitMQ_Page168_001

quiescingRpc(m, k)的具体实现:

RabbitMQ_Page168_002

enqueueRpc(k)具体实现:

RabbitMQ_Page168_003=

我们使用的channel的默认实现是:AutorecoveringChannel,该类中包含

RabbitMQ_Page168_004

RecoveryAwareChannelN是

RabbitMQ_Page169_001

ChannelN的子类。

ChannelN又是AMQChannel的子类。

所以enqueueRpc方法应该看ChannelN的实现方式:

RabbitMQ_Page169_002

调用了父类的enqueueRpc方法是父类的:

RabbitMQ_Page169_003

看doEnqueueRpc的具体实现:

# 推消息的代码实现

RabbitMQ_Page169_004

看basicConsume的具体实现:

ChannelN中1343行代码:

public String basicConsume(String queue, final boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, final Consumer callback) throws IOException {
	// 构建请求方法
    Method m = (new com.rabbitmq.client.AMQP.Basic.Consume.Builder()).queue(queue).consumerTag(consumerTag).noLocal(noLocal).noAck(autoAck).exclusive(exclusive).arguments(arguments).build();
    BlockingRpcContinuation<String> k = new BlockingRpcContinuation<String>(m) {
        public String transformReply(AMQCommand replyCommand) {
            String actualConsumerTag = ((ConsumeOk)replyCommand.getMethod()).getConsumerTag();
            ChannelN.this._consumers.put(actualConsumerTag, callback);
            // need to register consumer in stats before it actually starts consuming
            ChannelN.this.metricsCollector.basicConsume(ChannelN.this, actualConsumerTag, autoAck);
            ChannelN.this.dispatcher.handleConsumeOk(callback, actualConsumerTag);
            return actualConsumerTag;
        }
    };
    this.rpc(m, k);
    try {
        if (this._rpcTimeout == 0) {
            return (String)k.getReply();
        } else {
            try {
                return (String)k.getReply(this._rpcTimeout);
            } catch (TimeoutException var11) {
                throw this.wrapTimeoutException(m, var11);
            }
        }
    } catch (ShutdownSignalException var12) {
        throw wrap(var12);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
上次更新: 2025/04/03, 11:07:08
RabbitMQ集群与运维
RocketMQ基础入门

← RabbitMQ集群与运维 RocketMQ基础入门→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式