RabbitMQ架构与实战
lg
# 1.1 RabbitMQ介绍、概念、基本架构
# 1.1.1 RabbitMQ介绍
RabbitMQ,俗称“兔子MQ”(可见其轻巧,敏捷),是目前非常热门的一款开源消息中间件,不管
是互联网行业还是传统行业都广泛使用(最早是为了解决电信行业系统之间的可靠通信而设计)。
- 高可靠性、易扩展、高可用、功能丰富等
- 支持大多数(甚至冷门)的编程语言客户端。
- RabbitMQ遵循AMQP协议,自身采用Erlang(一种由爱立信开发的通用面向并发编程的语
言)编写。
- RabbitMQ也支持MQTT等其他协议。
RabbitMQ具有很强大的插件扩展能力,官方和社区提供了非常丰富的插件可供选择:
https://www.rabbitmq.com/community-plugins.html
# 1.1.2 RabbitMQ整体逻辑架构
# 1.1.3 RabbitMQ Exchange类型
RabbitMQ常用的交换器类型有: fanout 、 direct 、 topic 、 headers 四种。
# Fanout
会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中,如图:
direct类型的交换器路由规则很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中,如下图:
# Topic
topic类型的交换器在direct匹配规则上进行了扩展,也是将消息路由到BindingKey和RoutingKey相匹配的队列中,这里的匹配规则稍微不同,它约定:
BindingKey和RoutingKey一样都是由"."分隔的字符串;BindingKey中可以存在两种特殊字符“”和“#”,用于模糊匹配,其中""用于匹配一个单词,"#"用于匹配多个单词(可以是0个)。
# Headers
headers类型的交换器不依赖于路由键的匹配规则来路由信息,而是根据发送的消息内容中的
headers属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送的消息到交换器时,
RabbitMQ会获取到该消息的headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键
值对,如果匹配,消息就会路由到该队列。headers类型的交换器性能很差,不实用。
# 1.1.4 RabbitMQ数据存储
# 存储机制
RabbitMQ消息有两种类型:
- 持久化消息和非持久化消息。
- 这两种消息都会被写入磁盘。
持久化消息在到达队列时写入磁盘,同时会内存中保存一份备份,当内存吃紧时,消息从内存中清
除。这会提高一定的性能。
非持久化消息一般只存于内存中,当内存压力大时数据刷盘处理,以节省内存空间。
RabbitMQ存储层包含两个部分:队列索引和消息存储。
# 队列索引:rabbit_queue_index
索引维护队列的落盘消息的信息,如存储地点、是否已被给消费者接收、是否已被消费者ack等。
每个队列都有相对应的索引。
索引使用顺序的段文件来存储,后缀为.idx,文件名从0开始累加,每个段文件中包含固定的
segment_entry_count 条记录,默认值是16384。每个index从磁盘中读取消息的时候,**至少要在内存中维护一个段文件,**所以设置 queue_index_embed_msgs_below 值得时候要格外谨慎,一点点增大也可能会引起内存爆炸式增长。
# 消息存储:rabbit_msg_store
消息以键值对的形式存储到文件中,一个虚拟主机上的所有队列使用同一块存储,每个节点只有一
个。存储分为持久化存储(msg_store_persistent)和短暂存储(msg_store_transient)。持久化存
储的内容在broker重启后不会丢失,短暂存储的内容在broker重启后丢失。
store使用文件来存储,后缀为.rdq,经过store处理的所有消息都会以追加的方式写入到该文件
中,当该文件的大小超过指定的限制(file_size_limit)后,将会关闭该文件并创建一个新的文件以供新的消息写入。文件名从0开始进行累加。在进行消息的存储时,RabbitMQ会在ETS(Erlang Term Storage)表中记录消息在文件中的位置映射和文件的相关信息。
channel.queueDeclare("q.demo1", true, false, false, null);
channel.exchangeDeclare("ex.demo1", BuiltinExchangeType.DIRECT,
true, false, null);
channel.queueBind("q.demo1", "ex.demo1", "key.demo1", null);
Map<String, Object> headers = new HashMap<>();
headers.put("mykey", "myvalue");
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.contentEncoding("utf-8")
.deliveryMode(2)
.headers(headers).build();
// 4096个数字
String message =
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"
+
"123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456";
// message = message.substring(0, message.length() - 25);
// message = message.substring(0, message.length() - 38);
message = message.substring(0, message.length() - 45);
// message = message.substring(0, message.length() - 50);
// message = message.substring(0, message.length() - 100);
channel.basicPublish("ex.demo1", "key.demo1", props,
message.getBytes("UTF-8"));
消息(包括消息头、消息体、属性)可以直接存储在index中,也可以存储在store中。最佳的方式
是较小的消息存在index中,而较大的消息存在store中。这个消息大小的界定可以通过
queue_index_embed_msgs_below 来配置,默认值为4096B。当一个消息小于设定的大小阈值时,就可以存储在index中,这样性能上可以得到优化。一个完整的消息大小小于这个值,就放到索引中,否则放到持久化消息文件中。
rabbitmq.conf中的配置信息:
## Size in bytes below which to embed messages in the queue index.
## Related doc guide: https://rabbitmq.com/persistence-conf.html
##
# queue_index_embed_msgs_below = 4096
## You can also set this size in memory units
##
# queue_index_embed_msgs_below = 4kb
如果消息小于这个值,就在索引中存储,如果消息大于这个值就在store中存储:
大于这个值的消息存储于msg_store_persistent目录中的<num>.rdq
文件中:
小于这个值的消息存储于<num>.idx
索引文件中:
读取消息时,先根据消息的ID(msg_id)找到对应存储的文件,如果文件存在并且未被锁住,则直
接打开文件,从指定位置读取消息内容。如果文件不存在或者被锁住了,则发送请求由store进行处
理。
删除消息时,只是从ETS表删除指定消息的相关信息,同时更新消息对应的存储文件和相关信息。
在执行消息删除操作时,并不立即对文件中的消息进行删除,也就是说消息依然在文件中,仅仅是标记为垃圾数据而已。当一个文件中都是垃圾数据时可以将这个文件删除。当检测到前后两个文件中的有效数据可以合并成一个文件,并且所有的垃圾数据的大小和所有文件(至少有3个文件存在的情况下)的数据大小的比值超过设置的阈值garbage_fraction(默认值0.5)时,才会触发垃圾回收,将这两个文件合并,执行合并的两个文件一定是逻辑上相邻的两个文件。合并逻辑:
- 锁定这两个文件
- 先整理前面的文件的有效数据,再整理后面的文件的有效数据
- 将后面文件的有效数据写入到前面的文件中
- 更新消息在ETS表中的记录
- 删除后面文件
# 队列结构
通常队列由rabbit_amqqueue_process和backing_queue这两部分组成,
rabbit_amqqueue_process负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的confirm和消费端的ack)等。backing_queue是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用。
如果消息投递的目的队列是空的,并且有消费者订阅了这个队列,那么该消息会直接发送给消费
者,不会经过队列这一步。当消息无法直接投递给消费者时,需要暂时将消息存入队列,以便重新投
递。
rabbit_variable_queue.erl 源码中定义了RabbitMQ队列的4种状态:
- alpha:消息索引和消息内容都存内存,最耗内存,很少消耗CPU
- beta:消息索引存内存,消息内存存磁盘
- gama:消息索引内存和磁盘都有,消息内容存磁盘
- delta:消息索引和内容都存磁盘,基本不消耗内存,消耗更多CPU和I/O操作
消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断流动,消息的状态会不断发
送变化。
持久化的消息,索引和内容都必须先保存在磁盘上,才会处于上述状态中的一种
gama状态只有持久化消息才会有的状态。
在运行时,RabbitMQ会根据消息传递的速度定期计算一个当前内存中能够保存的最大消息数量
(target_ram_count),如果alpha状态的消息数量大于此值,则会引起消息的状态转换,多余的消息
可能会转换到beta、gama或者delta状态。区分这4种状态的主要作用是满足不同的内存和CPU需求。
对于普通没有设置优先级和镜像的队列来说,backing_queue的默认实现是
rabbit_variable_queue,其内部通过5个子队列Q1、Q2、delta、Q3、Q4来体现消息的各个状态。
消费者获取消息也会引起消息的状态转换。
当消费者获取消息时
- 首先会从Q4中获取消息,如果获取成功则返回。
- 如果Q4为空,则尝试从Q3中获取消息,系统首先会判断Q3是否为空,如果为空则返回队列
为空,即此时队列中无消息。
- 如果Q3不为空,则取出Q3中的消息;进而再判断此时Q3和Delta中的长度,如果都为空,则
可以认为 Q2、Delta、 Q3、Q4 全部为空,此时将Q1中的消息直接转移至Q4,下次直接从
Q4 中获取消息。
- 如果Q3为空,Delta不为空,则将Delta的消息转移至Q3中,下次可以直接从Q3中获取消息。
在将消息从Delta转移到Q3的过程中,是按照索引分段读取的,首先读取某一段,然后判断读
取的消息的个数与Delta中消息的个数是否相等,如果相等,则可以判定此时Delta中己无消
息,则直接将Q2和刚读取到的消息一并放入到Q3中,如果不相等,仅将此次读取到的消息转
移到Q3。
这里就有两处疑问,第一个疑问是:为什么Q3为空则可以认定整个队列为空?
- 试想一下,如果Q3为空,Delta不为空,那么在Q3取出最后一条消息的时候,Delta 上的消息
就会被转移到Q3这样与 Q3 为空矛盾;
- 如果Delta 为空且Q2不为空,则在Q3取出最后一条消息时会将Q2的消息并入到Q3中,这样
也与Q3为空矛盾;
- 在Q3取出最后一条消息之后,如果Q2、Delta、Q3都为空,且Q1不为空时,则Q1的消息会
被转移到Q4,这与Q4为空矛盾。
其实这一番论述也解释了另一个问题:为什么Q3和Delta都为空时,则可以认为 Q2、Delta、Q3、
Q4全部为空?
通常在负载正常时,如果消费速度大于生产速度,对于不需要保证可靠不丢失的消息来说,极有可
能只会处于alpha状态。
对于持久化消息,它一定会进入gamma状态,在开启publisher confirm机制时,只有到了
gamma 状态时才会确认该消息己被接收,若消息消费速度足够快、内存也充足,这些消息也不会继续走到下一个状态。
为什么消息的堆积导致性能下降?
在系统负载较高时,消息若不能很快被消费掉,这些消息就会进入到很深的队列中去,这样会增加
处理每个消息的平均开销。因为要花更多的时间和资源处理“堆积”的消息,如此用来处理新流入的消息的能力就会降低,使得后流入的消息又被积压到很深的队列中,继续增大处理每个消息的平均开销,继而情况变得越来越恶化,使得系统的处理能力大大降低。
应对这一问题一般有3种措施:
- 增加prefetch_count的值,即一次发送多条消息给消费者,加快消息被消费的速度。
- 采用multiple ack,降低处理 ack 带来的开销
- 流量控制
# 1.2 安装和配置RabbitMQ
# 安装环境:
- 虚拟机软件:VMWare 15.1.0
- 操作系统:CentOS Linux release 7.7.1908
- Erlang:erlang-23.0.2-1.el7.x86_64
- RabbitMQ:rabbitmq-server-3.8.4-1.el7.noarch
RabbitMQ的安装需要首先安装Erlang,因为它是基于Erlang的VM运行的。
RabbitMQ需要的依赖:socat和logrotate,logrotate操作系统中已经存在了,只需要安装socat就
可以了。
RabbitMQ与Erlang的兼容关系详见:https://www.rabbitmq.com/which-erlang.html
# 安装过程:
1、安装依赖:
yum install socat -y
2、安装Erlang
erlang-23.0.2-1.el7.x86_64.rpm下载地址:
https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.2/erlang-23.0.2-1.el7.x86_64.rpm
首先将erlang-23.0.2-1.el7.x86_64.rpm上传至服务器,然后执行下述命令:
rpm -ivh erlang-23.0.2-1.el7.x86_64.rpm
3、安装RabbitMQ
rabbitmq-server-3.8.4-1.el7.noarch.rpm下载地址:
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm
首先将rabbitmq-server-3.8.4-1.el7.noarch.rpm上传至服务器,然后执行下述命令:
rpm -ivh rabbitmq-server-3.8.4-1.el7.noarch.rpm
4、启用RabbitMQ的管理插件
rabbitmq-plugins enable rabbitmq_management
5、开启RabbitMQ
systemctl start rabbitmq-server
或者
rabbitmq-server
或者后台启动
rabbitmq-server -detached
6、添加用户
rabbitmqctl add_user root 123456
7、给用户添加权限
给root用户在虚拟主机"/"上的配置、写、读的权限
rabbitmqctl set_permissions root -p / ".*" ".*" ".*"
8、给用户设置标签
rabbitmqctl set_user_tags root administrator
用户的标签和权限:
Tag | Capabilities |
---|---|
(None) | 没有访问management插件的权限 |
management | 可以使用消息协议做任何操作的权限,加上:1. 可以使用AMQP协议登录的虚拟主机的权限management2. 查看它们能登录的所有虚拟主机中所有队列、交换器和绑定的权限3. 查看和关闭它们自己的通道和连接的权限4. 查看它们能访问的虚拟主机中的全局统计信息,包括其他用户的活动 |
policymaker | 所有management标签可以做的,加上:1. 在它们能通过AMQP协议登录的虚拟主机上,查看、创建和删除策略以及虚拟主机参数的权限 |
monitoring | 所有management能做的,加上:1. 列出所有的虚拟主机,包括列出不能使用消息协议访问的虚拟主机的权限2. 查看其他用户连接和通道的权限3. 查看节点级别的数据如内存使用和集群的权限4. 查看真正的全局所有虚拟主机统计数据的权限 |
administrator | 所有policymaker和monitoring能做的,加上:1. 创建删除虚拟主机的权限2. 查看、创建和删除用户的权限3. 查看、创建和删除权限的权限4. 关闭其他用户连接的权限 |
9、打开浏览器,访问http://<安装了CentOS的VMWare虚拟机IP地址>:15672
10、使用刚才创建的用户登录:
# 1.3 RabbitMQ常用操作命令
# 前台启动Erlang VM和RabbitMQ
rabbitmq-server
# 后台启动
rabbitmq-server -detached
# 停止RabbitMQ和Erlang VM
rabbitmqctl stop
# 查看所有队列
rabbitmqctl list_queues
# 查看所有虚拟主机
rabbitmqctl list_vhosts
# 在Erlang VM运行的情况下启动RabbitMQ应用
rabbitmqctl start_app
rabbitmqctl stop_app
# 查看节点状态
rabbitmqctl status
# 查看所有可用的插件
rabbitmq-plugins list
# 启用插件
rabbitmq-plugins enable <plugin-name>
# 停用插件
rabbitmq-plugins disable <plugin-name>
# 添加用户
rabbitmqctl add_user username password
# 列出所有用户:
rabbitmqctl list_users
# 删除用户:
rabbitmqctl delete_user username
# 清除用户权限:
rabbitmqctl clear_permissions -p vhostpath username
# 列出用户权限:
rabbitmqctl list_user_permissions username
# 修改密码:
rabbitmqctl change_password username newpassword
# 设置用户权限:
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
# 创建虚拟主机:
rabbitmqctl add_vhost vhostpath
# 列出所以虚拟主机:
rabbitmqctl list_vhosts
# 列出虚拟主机上的所有权限:
rabbitmqctl list_permissions -p vhostpath
# 删除虚拟主机:
rabbitmqctl delete_vhost vhost vhostpath
# 移除所有数据,要在 rabbitmqctl stop_app 之后使用:
rabbitmqctl reset
# 1.4 RabbitMQ工作流程详解
# 1.4.1 生产者发送消息的流程
- 生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel)
- 生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等
- 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
- 生产者通过 bindingKey (绑定Key)将交换器和队列绑定( binding )起来
- 生产者发送消息至RabbitMQ Broker,其中包含 routingKey (路由键)、交换器等信息
- 相应的交换器根据接收到的 routingKey 查找相匹配的队列。
- 如果找到,则将从生产者发送过来的消息存入相应的队列中。
- 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
- 关闭信道。
- 关闭连接。
# 1.4.2 消费者接收消息的过程
- 消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。
- 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及
做一些准备工作
- 等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
- 消费者确认( ack) 接收到的消息。
- RabbitMQ 从队列中删除相应己经被确认的消息。
- 关闭信道。
- 关闭连接。
# 1.4.3 案例
Hello World一对一的简单模式。生产者直接发送消息给RabbitMQ,另一端消费。未定义和指定
Exchange的情况下,使用的是AMQP default这个内置的Exchange。
HelloWorldSender.java
package com.lagou.rabbitmq.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* Rabbitmq是一个消息broker:接收消息,传递给下游应用
*
* 术语:
* Producing就是指发送消息,发送消息的程序是Producer
* Queue指的是RabbitMQ内部的一个组件,消息存储于queue中。queue使用主机的内存和磁盘存储,收到内存和磁盘空间的限制
* 可以想象为一个大的消息缓冲。很多Producer可以向同一个queue发送消息,很多消费者可以从同一个queue消费消息。
*
*
*
Consuming就是接收消息。一个等待消费消息的应用程序称为Consumer
生产者、消费者、队列不必在同一台主机,一般都是在不同的主机上的应用。一个应用可以同时是
生产者和消费者。
*
*/
public class HelloWorldSender {
private static String QUEUE_NAME = "hello";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("node1");
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setPort(5672);
try (Connection conn = factory.newConnection();
Channel channel = conn.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
}
}
}
HelloWorldReceiver.java
package com.lagou.rabbitmq.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class HelloWorldReceiver {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务器主机名或IP地址
factory.setHost("node1");
// 设置Erlang的虚拟主机名称
factory.setVirtualHost("/");
// 设置用户名
factory.setUsername("root");
// 设置密码
factory.setPassword("123456");
// 设置客户端与服务器的通信端口,默认值为5672
factory.setPort(5672);
// 获取连接
Connection connection = factory.newConnection();
// 从连接获取通道
Channel channel = connection.createChannel();
// 声明一个队列
// 第一个参数是队列名称,第二个参数false表示在rabbitmq-server重启后就没有了
// 第三个参数表示该队列不是一个排外队列,否则一旦客户端断开,队列就删除了
// 第四个参数表示该队列是否自动删除,true表示一旦不使用了,系统删除该队列
// 第五个参数表示该队列的参数,该参数是Map集合,用于指定队列的属性
// channel.queueDeclare(QUEUE_NAME, false, false, true, null);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 消息的推送回调函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
/*
使用服务器生成的consumerTag启动本地,非排他的使用者。
启动一个
仅提供了basic.deliver和basic.cancel AMQP方法(对大多数情形够用了)
第一个参数:队列名称
autoAck – true 只要服务器发送了消息就表示消息已经被消费者确认; false服务端等待客户端显式地发送确认消息
deliverCallback – 服务端推送过来的消息回调函数
cancelCallback – 客户端忽略该消息的回调方法
Returns:
服务端生成的consumerTag
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
# 1.4.4 Connection 和Channel关系
生产者和消费者,需要与RabbitMQ Broker 建立TCP连接,也就是Connection 。一旦TCP 连接建
立起来,客户端紧接着创建一个AMQP 信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection 之上的虚拟连接, RabbitMQ 处理的每条AMQP 指令都是通过信道完成的。
为什么不直接使用TCP连接,而是使用信道?
RabbitMQ 采用类似NIO的做法,复用TCP 连接,减少性能开销,便于管理。
当每个信道的流量不是很大时,复用单一的Connection 可以在产生性能瓶颈的情况下有效地节省
TCP 连接资源。
当信道本身的流量很大时,一个Connection 就会产生性能瓶颈,流量被限制。需要建立多个
Connection ,分摊信道。具体的调优看业务需要。
信道在AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面进行的。
channel.exchangeDeclare
channel.queueDeclare
channel.basicPublish
channel.basicConsume
// ...
RabbitMQ 相关的API与AMQP紧密相连,比如channel.basicPublish 对应AMQP 的Basic.Publish
命令。
# 1.5 RabbitMQ工作模式详解
官网地址:https://www.rabbitmq.com/getstarted.htm
# 1.5.1 Work Queue
生产者发消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可达到负载均衡的
效果。
NewTask.java
package com.lagou.rabbitmq.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class NewTask {
private static final String QUEUE_NAME = "";
private static final String[] works = {
"hello.",
"hello..",
"hello...",
"hello....",
"hello.....",
"hello......",
"hello.......",
"hello........",
"hello.........",
"hello.........."
};
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("node1");
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setPort(5672);
try (Connection conn = factory.newConnection();
Channel channel = conn.createChannel()) {
channel.queueDeclare("taskQueue", false, false, false, null);
for (String work : works) {
// BuiltinExchangeType.DIRECT
// BuiltinExchangeType.FANOUT
// BuiltinExchangeType.HEADERS
// BuiltinExchangeType.TOPIC
// channel.exchangeDeclare("ex1", BuiltinExchangeType.DIRECT);
// 将消息路由到taskQueue队列
channel.basicPublish("", "taskQueue", null,
work.getBytes("UTF-8"));
// channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, work.getBytes());
System.out.println(" [x] Sent '" + work + "'");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Worker.java
package com.lagou.rabbitmq.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Worker {
private static final String TASK_QUEUE_NAME = "taskQueue";
public static void main(String[] args) throws IOException,TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("node1");
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setPort(5672);
// true表示不需要手动确认消息,false表示需要手动确认消息: channel.basicAck(xxx, yyy);
boolean autoAck = true;
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
// 预取指定个数的消息。此处每次获取一个消息
// channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String task = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + task + "'");
try {
doWork(task);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
// 手动确认消息
// channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
};
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
}
private static void doWork(String task) throws InterruptedException {
System.out.println("task = " + task);
for (char ch : task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
}
# 1.5.2 发布订阅模式
使用fanout类型交换器,routingKey忽略。每个消费者定义生成一个队列并绑定到同一个
Exchange,每个消费者都可以消费到完整的消息。
消息广播给所有订阅该消息的消费者。
在RabbitMQ中,生产者不是将消息直接发送给消息队列,实际上生产者根本不知道一个消息被发
送到哪个队列。
生产者将消息发送给交换器。交换器非常简单,从生产者接收消息,将消息推送给消息队列。交换
器必须清楚地知道要怎么处理接收到的消息。应该是追加到一个指定的队列,还是追加到多个队列,还是丢弃。规则就是交换器类型。
交换器的类型前面已经介绍过了: direct 、 topic 、 headers 和 fanout 四种类型。发布订阅使
用fanout。创建交换器,名字叫 logs :
channel.exchangeDeclare("logs", "fanout");
fanout 交换器很简单,从名字就可以看出来(用风扇吹出去),将所有收到的消息发送给它知道
的所有的队列。
rabbitmqctl list_exchanges
列出RabbitMQ的交换器,包括了 amq.* 的和默认的(未命名)的交换器。
未命名交换器
在前面的那里中我们没有指定交换器,但是依然可以向队列发送消息。这是因为我们使用了默认的交换器。
channel.basicPublish("", "hello", null, message.getBytes());
第一个参数就是交换器名称,为空字符串。直接使用routingKey向队列发送消息,如果该
routingKey指定的队列存在的话。
现在,向指定的交换器发布消息:
channel.basicPublish("logs", "", null, message.getBytes());
临时队列
前面我们使用队列的名称,生产者和消费者都是用该名称来发送和接收该队列中的消息。
首先,我们无论何时连接RabbitMQ的时候,都需要一个新的,空的队列。我们可以使用随机的名
字创建队列,也可以让服务器帮我们生成随机的消息队列名字。
其次,一旦我们断开到消费者的连接,该队列应该自动删除。
String queueName = channel.queueDeclare().getQueue();
上述代码我们声明了一个非持久化的、排他的、自动删除的队列,并且名字是服务器随机生成的。
queueName一般的格式类似: amq.gen-JzTY20BRgKO-HjmUJj0wLg 。
绑定
在创建了消息队列和 fanout 类型的交换器之后,我们需要将两者进行绑定,让交换器将消息发送
给该队列。
channel.queueBind(queueName, "logs", "");
此时, logs 交换器会将接收到的消息追加到我们的队列中。
可以使用下述命令列出RabbitMQ中交换器的绑定关系:
rabbitmqctl list_bindings
发布订阅模式的整体代码如下:
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = argv.length < 1 ? "info: Hello World!" :
String.join(" ", argv);
channel.basicPublish(EXCHANGE_NAME, "", null,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
当消费者启动起来之后,命令 rabbitmqctl list_bindings 列出绑定关系:
消息的推拉:
实现RabbitMQ的消费者有两种模式,推模式(Push)和拉模式(Pull)。 实现推模式推荐的方式
是继承 DefaultConsumer 基类,也可以使用Spring AMQP的 SimpleMessageListenerContainer 。 推模式是最常用的,但是有些情况下推模式并不适用的,比如说: 由于某些限制,消费者在某个条件成立时才能消费消息 需要批量拉取消息进行处理 实现拉模式 RabbitMQ的Channel提供了 basicGet 方法用于拉取消息。
# 1.5.3 路由模式
使用 direct 类型的Exchange,发N条消费并使用不同的 routingKey ,消费者定义队列并将队
列、 routingKey 、Exchange绑定。此时使用 direct 模式Exchagne必须要 routingKey 完全匹配的情况下消息才会转发到对应的队列中被消费。
上一个模式中,可以将消息广播到很多接收者。
现在我们想让接收者只接收部分消息,如,我们通过直接模式的交换器将关键的错误信息记录到
log文件,同时在控制台正常打印所有的日志信息。
绑定
上一模式中,交换器的使用方式:
channel.queueBind(queueName, EXCHANGE_NAME, "");
绑定语句中还有第三个参数: routingKey :
channel.queueBind(queueName, EXCHANGE_NAME, "black");
bindingKey 的作用与具体使用的交换器类型有关。对于 fanout 类型的交换器,此参数设置无效,系统直接忽略。
# 1.5.4 direct交换器
分布式系统中有很多应用,这些应用需要运维平台的监控,其中一个重要的信息就是服务器的日志
记录。
我们需要将不同日志级别的日志记录交给不同的应用处理。
如何解决?
使用direct交换器
如果要对不同的消息做不同的处理,此时不能使用 fanout 类型的交换器,因为它只会盲目的广播
消息。
我们需要使用 direct 类型的交换器。 direct 交换器的路由算法很简单:只要消息的
routingKey 和队列的 bindingKey 对应,消息就可以推送给该队列。
上图中的交换器 X 是 direct 类型的交换器,绑定的两个队列中,一个队列的 bindingKey 是
orange ,另一个队列的 bindingKey 是 black 和 green 。
如此,则 routingKey 是 orange 的消息发送给队列Q1, routingKey 是 black 和 green 的消息发
送给Q2队列,其他消息丢弃。
多重绑定
上图中,我们使用 direct 类型的交换器 X ,建立了两个绑定:队列Q1根据 bindingKey 的值
black 绑定到交换器 X ,队列Q2根据 bindingKey 的值 black 绑定到交换器 X ;交换器 X 会将消息发送给队列Q1和队列Q2。交换器的行为跟 fanout 的行为类似,也是广播。
在案例中,我们将日志级别作为 routingKey 。
EmitLogsDirect.java
package com.lagou.rabbitmq.demo;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class EmitLogsDirect {
public static void main(String[] args) throws IOException,
TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("node1");
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String servrity = null;
// 声明direct类型的交换器logs
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
for (int i = 0; i < 100; i++) {
switch (i % 3) {
case 0:
servrity = "info";
break;
case 1:
servrity = "warn";
break;
case 2:
servrity = "error";
break;
default:
System.err.println("log错误,程序退出");
System.exit(-1);
}
String logStr = "这是 【" + servrity + "】 的消息";
channel.basicPublish("direct_logs", servrity, null,
logStr.getBytes("UTF-8"));
}
}
}
ReceiveErrorLogsDirect.java
package com.lagou.rabbitmq.demo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveErrorLogsDirect {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("node1");
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
// 将logs交换器和queueName队列通过bindingKey:error绑定
channel.queueBind(queueName, "direct_logs", "error");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, deliverCallback, consumerTag ->{});
}
}
ReceiveWarnInfoLogsDirect.java
package com.lagou.rabbitmq.demo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveWarnInfoLogsDirect {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("node1");
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
// 将logs交换器和queueName队列通过bindingKey:warn绑定
channel.queueBind(queueName, "direct_logs", "warn");
// 将logs交换器和queueName队列通过bindingKey:info绑定
channel.queueBind(queueName, "direct_logs", "info");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, deliverCallback, consumerTag ->{});
}
}
# 1.5.5 主题模式
使用 topic 类型的交换器,队列绑定到交换器、 bindingKey 时使用通配符,交换器将消息路由转
发到具体队列时会根据消息 routingKey 模糊匹配,比较灵活。
上个模式中,我们通过 direct 类型的交换器做到了根据日志级别的不同,将消息发送给了不同队
列的。
这里有一个限制,加入现在我不仅想根据日志级别划分日志消息,还想根据日志来源划分日志,怎
么做?
比如,我想监听cron服务发送的 error 消息,又想监听从kern服务发送的所有消息。
此时可以使用RabbitMQ的主题模式( Topic )。
要想 topic 类型的交换器, routingKey 就不能随便写了,它必须得是点分单词。单词可以随便
写,生产中一般使用消息的特征。如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”等。该点分单词字符串最长255字节。
bindingKey 也必须是这种形式。 topic 类型的交换器背后原理跟 direct 类型的类似:只要队列
的 bindingKey 的值与消息的 routingKey 匹配,队列就可以收到该消息。有两个不同:
- (star)匹配一个单词
# 匹配0到多个单词
上图中,我们发送描述动物的消息。消息发送的时候指定的 routingKey 包含了三个词,两个点。
第一个单词表示动物的速度,第二个是颜色,第三个是物种:<speed>.<color>.<species>
。
创建三个绑定:Q1绑定到" .orange. "Q2绑定到" ..rabbit "和" lazy.# "。
- Q1关注orange颜色动物的消息
- Q2关注兔子的消息,以及所有懒的动物消息
如果不能匹配,就丢弃消息。
如果发送的消息 routingKey 是" lazy.orange.male.rabbit ",则会匹配最后一个绑定。
如果在 topic 类型的交换器中 bindingKey 使用 # ,则就是 fanout 类型交换器的行为。
如果在 topic 类型的交换器中 bindingKey 中不使用 * 和 # ,则就是 direct 类型交换器的行为。
EmitLogTopic.java
package com.lagou.rabbitmq.demo;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String[] SPEED = {
"lazy",
"quick",
"normal"
};
private static final String[] COLOR = {
"black",
"orange",
"red",
"yellow",
"blue",
"white",
"pink"
};
private static final String[] SPECIES = {
"dog",
"rabbit",
"chicken",
"horse",
"bear",
"cat"
};
private static final Random RANDOM = new Random();
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("node1");
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String message = null;
String routingKey = null;
String speed = null;
String color = null;
String species = null;
for (int i = 0; i < 10; i++) {
speed = SPEED[RANDOM.nextInt(SPEED.length)];
color = COLOR[RANDOM.nextInt(COLOR.length)];
species = SPECIES[RANDOM.nextInt(SPECIES.length)];
message = speed + "-" + color + "-" + species;
routingKey = speed + "." + color + "." + species;
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
}
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
EmitLogTopic1.java
package com.lagou.rabbitmq.demo;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
public class EmitLogTopic1 {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String[] SPECIES = {
"dog",
"rabbit",
"chicken",
"horse",
"bear",
"cat"
};
private static final Random RANDOM = new Random();
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("node1");
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String message = null;
String routingKey = null;
String speed = null;
String species = null;
for (int i = 0; i < 10; i++) {
speed = "lazy";
species = SPECIES[RANDOM.nextInt(SPECIES.length)];
message = speed + "-" + species;
routingKey = speed + "." + species;
channel.basicPublish(EXCHANGE_NAME, routingKey, null,
message.getBytes());
}
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
ReceiveLogTopic.java
package com.lagou.rabbitmq.demo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("node1");
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
DeliverCallback callback = (consumerTag, message) -> {
System.out.println("*.*.rabbit 匹配到的消息:" + new String(message.getBody(), "UTF-8"));
};
channel.basicConsume(queueName, true, callback, consumerTag -> {});
}
}
ReceiveLogTopic1.java
package com.lagou.rabbitmq.demo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogsTopic1 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("node1");
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
DeliverCallback callback = (consumerTag, message) -> {
System.out.println("*.orange.* 匹配到的消息:" + new String(message.getBody(), "UTF-8"));
};
channel.basicConsume(queueName, true, callback, consumerTag -> {});
}
}
ReceiveLopTopic2.java
package com.lagou.rabbitmq.demo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogsTopic2 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("node1");
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.*.*");
DeliverCallback callback = (consumerTag, message) -> {
System.out.println("lazy.*.* 匹配到的消息:" + new String(message.getBody(), "UTF-8"));
};
channel.basicConsume(queueName, true, callback, consumerTag -> {});
}
}
ReceiveLogTopic3.java
package com.lagou.rabbitmq.demo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogsTopic3 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("node1");
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
DeliverCallback callback = (consumerTag, message) -> {
System.out.println("lazy.*.* 匹配到的消息:" + new String(message.getBody(), "UTF-8"));
};
channel.basicConsume(queueName, true, callback, consumerTag -> {});
}
}
# 1.6 Spring整合RabbitMQ
spring-amqp是对AMQP的一些概念的一些抽象,spring-rabbit是对RabbitMQ操作的封装实
现。
主要有几个核心类 RabbitAdmin 、 RabbitTemplate 、 SimpleMessageListenerContainer 等
RabbitAdmin 类完成对Exchange,Queue,Binding的操作,在容器中管理了 RabbitAdmin 类的
时候,可以对Exchange,Queue,Binding进行自动声明。
RabbitTemplate 类是发送和接收消息的工具类。
SimpleMessageListenerContainer 是消费消息的容器。
目前比较新的一些项目都会选择基于注解方式,而比较老的一些项目可能还是基于配置文件的。
# 1.6.1 基于配置文件的整合
- 创建maven项目
- 配置pom.xml,添加rabbit的spring依赖
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
</dependencies>
- rabbit-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory"
host="node1"
virtual-host="/"
username="root"
password="123456"
port="5672"/>
<!--创建一个rabbit的
template对象
(org.springframework.amqp.rabbit.core.RabbitTemplate),
以便于访问broker-->
<rabbit:template id="amqpTemplate" connection-
factory="connectionFactory"/>
<!--
自动查找类型是Queue、Exchange、Binding的bean,并为用户向
RabbitMQ声明 -->
<!-- 因此,我们不需要显式地在java中声明 -->
<rabbit:admin id="rabbitAdmin" connection-
factory="connectionFactory"/>
<!-- 为消费者创建一个队列,如果broker中存在,则使用同名存在的队列,否则创
建一个新的。 -->
<!-- 如果要发送消息,得使用交换器 -->
<!-- 这里使用的是默认的交换器 -->
<rabbit:queue name="myqueue"/>
<rabbit:direct-exchange name="direct.biz.ex" auto-
declare="true" auto-delete="false" durable="false">
<rabbit:bindings>
<!--exchange:其他绑定到该交换器的交换器名称-->
<!--queue:绑定到该交换器的queue的bean名称-->
<!--key:显式声明的路由key-->
<rabbit:binding queue="myqueue" key="dir.ex">
</rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
启动RabbitMQ之后,直接运行即可。
# 1.6.2 基于注解的整合
- 创建maven项目
- 配置pom.xml,添加rabbit的spring依赖
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
</dependencies>
- 添加配置类RabbitConfiguration.java
package com.lagou.rabbitmq.demo;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfiguration {
@Bean
public com.rabbitmq.client.ConnectionFactory rabbitFactory() {
com.rabbitmq.client.ConnectionFactory rabbitFactory = new com.rabbitmq.client.ConnectionFactory();
rabbitFactory.setHost("node1");
rabbitFactory.setVirtualHost("/");
rabbitFactory.setUsername("root");
rabbitFactory.setPassword("123456");
rabbitFactory.setPort(5672);
return rabbitFactory;
}
@Bean
public ConnectionFactory
connectionFactory(com.rabbitmq.client.ConnectionFactory rabbitFactory) {
ConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitFactory);
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin(ConnectionFactory factory) {
AmqpAdmin amqpAdmin = new RabbitAdmin(factory);
return amqpAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory factory)
{
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
return rabbitTemplate;
}
@Bean
public Queue queue() {
Queue myqueue = new Queue("myqueue");
return myqueue;
}
}
- 主入口类App.java
package com.lagou.rabbitmq.demo;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationC
ontext;
import org.springframework.context.support.AbstractApplicationContext;
/**
* 使用spring的注解方式发送和接收消息
*/
public class SpringAnnotationDemo {
public static void main(String[] args) {
AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
System.out.println(foo);
context.close();
}
}
# 1.7 SpringBoot整合RabbitMQ
- 添加starter依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- application.properties中添加连接信息
spring.application.name=springboot_rabbitmq
spring.rabbitmq.host=node1
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672
- 主入口类
package com.lagou.rabbitmq.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitqmDemo {
public static void main(String[] args) {
SpringApplication.run(RabbitqmDemo.class, args);
}
}
- RabbitConfig类
package com.lagou.rabbitmq.demo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
@Bean
public Exchange myExchange() {
// new Exchange()
// return new TopicExchange("topic.biz.ex", false, false, null);
// return new DirectExchange("direct.biz.ex", false, false, null);
// return new FanoutExchange("fanout.biz.ex", false, false,null);
//
return new HeadersExchange("header.biz.ex", false, false,
null);
// 交换器名称,交换器类型(),是否是持久化的,是否自动删除,交换器属性Map集合
// return new CustomExchange("custom.biz.ex", ExchangeTypes.DIRECT, false, false, null);
return new DirectExchange("myex", false, false, null);
}
@Bean
public Binding myBinding() {
// 绑定的目的地,绑定的类型:到交换器还是到队列,交换器名称,路由key, 绑定的属性
// new Binding("", Binding.DestinationType.EXCHANGE, "", "",null);
// 绑定的目的地,绑定的类型:到交换器还是到队列,交换器名称,路由key,绑定的属性
// new Binding("", Binding.DestinationType.QUEUE, "", "",null);
// 绑定了交换器direct.biz.ex到队列myqueue,路由key是
direct.biz.ex
return new Binding(
"myqueue",
Binding.DestinationType.QUEUE,
"myex",
"direct.biz.ex",
null);
}
}
- 使用RestController发送消息
package com.lagou.rabbitmq.demo;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping("/send/{message}")
public String sendMessage(@PathVariable String message) {
rabbitTemplate.convertAndSend("myex", "direct.biz.ex", message);
return "ok";
}
}
- 使用监听器,用于推消息
package com.lagou.rabbitmq.demo;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class HelloConsumer {
@RabbitListener(queues = "myqueue")
public void service(String message) {
System.out.println("消息队列推送来的消息:" + message);
}
}