RabbitMQ高级特性解析
# 2.1 消息可靠性
# 一、案例
你用支付宝给商家支付,如果是个仔细的人,会考虑我转账的话,会不会把我的钱扣了,商家没有收到我的钱?
一般我们使用支付宝或微信转账支付的时候,都是扫码,支付,然后立刻得到结果,说你支付了多少钱,如果你绑定的是银行卡,可能这个时候你并没有收到支付的确认消息。往往是在一段时间之后, 你会收到银行卡发来的短信,告诉你支付的信息。
小伙伴有没有想过:支付平台如何保证这笔帐不出问题?
支付平台必须保证数据正确性,保证数据并发安全性,保证数据最终一致性。
支付平台通过如下几种方式保证数据一致性:
1、分布式锁
这个比较容易理解,就是在操作某条数据时先锁定,可以用redis或zookeeper等常用框架来 实现。 比如我们在修改账单时,先锁定该账单,如果该账单有并发操作,后面的操作只能等待上一个操作的锁释放后再依次执行。
优点:能够保证数据强一致性。
缺点:高并发场景下可能有性能问题。
2、消息队列
消息队列是为了保证最终一致性,我们需要确保消息队列有ack机制,客户端收到消息并消费处理完成后,客户端发送ack消息给消息中间件,如果消息中间件超过指定时间还没收到ack消息,则定时去重发消息。
比如我们在用户充值完成后,会发送充值消息给账户系统,账户系统再去更改账户余额。
优点:异步、高并发
缺点:有一定延时、数据弱一致性,并且必须能够确保该业务操作肯定能够成功完成,不可能失败。
我们可以从以下几方面来保证消息的可靠性:
- 客户端代码中的异常捕获,包括生产者和消费者
- AMQP/RabbitMQ的事务机制
- 发送端确认机制
- 消息持久化机制
- Broker端的高可用集群
- 消费者确认机制
- 消费端限流
- 消息幂等性
# 二、异常捕获机制
先执行行业务操作,业务操作成功后执行行消息发送,消息发送过程通过try catch 方式捕获异常,在异常处理理的代码块中执行行回滚业务操作或者执行行重发操作等。这是一种最大努力确保的方式,并无法保证100%绝对可靠,因为这里没有异常并不代表消息就一定投递成功。
另外,可以通过spring.rabbitmq.template.retry.enabled=true 配置开启发送端的重试。
# 三、AMQP/RabbitMQ的事务机制
没有捕获到异常并不能代表消息就一定投递成功了。
一直到事务提交后都没有异常,确实就说明消息是投递成功了。但是,这种方式在性能方面的开销比较大,一般也不推荐使用。
# 四、发送端确认机制
RabbitMQ后来引入了一种轻量量级的方式,叫发送方确认(publisher confirm)机制。生产者将信道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上面面发布的消息都会被指派一个唯一的ID(从1 开始),一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这样生产者就知道消息已经正确送达了。
RabbitMQ 回传给生产者的确认消息中的deliveryTag 字段包含了确认消息的序号,另外,通过设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息是否都已经得到了处理了。生产者投递消息后并不需要一直阻塞着,可以继续投递下一条消息并通过回调方式处理理ACK响应。如果 RabbitMQ 因为自身内部错误导致消息丢失等异常情况发生,就会响应一条nack(Basic.Nack) 命令,生产者应用程序同样可以在回调方法中处理理该 nack 命令。
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 向RabbitMQ服务器发送AMQP命令,将当前通道标记为发送方确认通道
final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
channel.queueDeclare("queue.pc", true, false, false, null);
channel.exchangeDeclare("ex.pc", "direct", true, false, null);
channel.queueBind("queue.pc", "ex.pc", "key.pc");
String message = "hello world";
// 发送消息
channel.basicPublish("ex.pc", "key.pc", null, message.getBytes());
try {
// 同步的方式等待RabbitMQ的确认消息
channel.waitForConfirmsOrDie(5_000);
System.out.println("发送的消息已经得到确认");
} catch (IOException ex) {
System.out.println("消息被拒收");
} catch (IllegalStateException ex) {
System.out.println("发送消息的通道不是PublisherConfirms通道");
} catch (TimeoutException ex) {
System.out.println("等待消息确认超时");
}
channel.close();
connection.close();
waitForConfirm方法有个重载的,可以自定义timeout超时时间,超时后会抛 TimeoutException。类似的有几个waitForConfirmsOrDie方法,Broker端在返回nack(Basic.Nack)之后该方法会抛出java.io.IOException。需要根据异常类型来做区别处理理, TimeoutException超时是属于第三状态(无法确定成功还是失败),而返回Basic.Nack抛出IOException这种是明确的失败。上面的代码主要只是演示confirm机制,实际上还是同步阻塞模式的,性能并不不是太好。
实际上,我们也可以通过“批处理理”的方式来改善整体的性能(即批量量发送消息后仅调用一次 waitForConfirms方法)。正常情况下这种批量处理的方式效率会高很多,但是如果发生了超时或者 nack(失败)后那就需要批量量重发消息或者通知上游业务批量回滚(因为我们只知道这个批次中有消息没投递成功,而并不知道具体是那条消息投递失败了,所以很难针对性处理),如此看来,批量重发消息肯定会造成部分消息重复。另外,我们可以通过异步回调的方式来处理Broker的响应。 addConfirmListener 方法可以添加ConfirmListener 这个回调接口,这个 ConfirmListener 接口包含两个方法:handleAck 和handleNack,分别用来处理 RabbitMQ 回传的 Basic.Ack 和 Basic.Nack。
# 1、原生API案例
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 向RabbitMQ服务器发送AMQP命令,将当前通道标记为发送方确认通道
final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
channel.queueDeclare("queue.pc", true, false, false, null);
channel.exchangeDeclare("ex.pc", "direct", true, false, null);
channel.queueBind("queue.pc", "ex.pc", "key.pc");
String message = "hello-";
// 批处理的大小
int batchSize = 10;
// 用于对需要等待确认消息的计数
int outstrandingConfirms = 0;
for (int i = 0; i < 103; i++) {
channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes());
outstrandingConfirms++;
if (outstrandingConfirms == batchSize) {
// 此时已经有一个批次的消息需要同步等待broker的确认消息
// 同步等待
channel.waitForConfirmsOrDie(5_000);
System.out.println("消息已经被确认了");
outstrandingConfirms = 0;
}
}
if (outstrandingConfirms > 0) {
channel.waitForConfirmsOrDie(5_000);
System.out.println("剩余消息已经被确认了");
}
channel.close();
connection.close();
# 2、使用回调方法
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 向RabbitMQ服务器发送AMQP命令,将当前通道标记为发送方确认通道
final AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
channel.queueDeclare("queue.pc", true, false, false, null);
channel.exchangeDeclare("ex.pc", "direct", true, false, null);
channel.queueBind("queue.pc", "ex.pc", "key.pc");
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback clearOutstandingConfirms = (deliveryTag, multiple) -> {
if (multiple) {
System.out.println("编号小于等于 " + deliveryTag + " 的消息都已经被确认了");
final ConcurrentNavigableMap<Long, String> headMap
= outstandingConfirms.headMap(deliveryTag, true);
// 清空outstandingConfirms中已经被确认的消息信息
headMap.clear();
} else {
// 移除已经被确认的消息
outstandingConfirms.remove(deliveryTag);
System.out.println("编号为:" + deliveryTag + " 的消息被确认");
}
};
// 设置channel的监听器,处理确认的消息和不确认的消息
channel.addConfirmListener(clearOutstandingConfirms, (deliveryTag, multiple) -> {
if (multiple) {
// 将没有确认的消息记录到一个集合中
// 此处省略实现
System.out.println("消息编号小于等于:" + deliveryTag + " 的消息 不确认");
} else {
System.out.println("编号为:" + deliveryTag + " 的消息不确认");
}
});
String message = "hello-";
for (int i = 0; i < 1000; i++) {
// 获取下一条即将发送的消息的消息ID
final long nextPublishSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("ex.pc", "key.pc", null, (message + i).getBytes());
System.out.println("编号为:" + nextPublishSeqNo + " 的消息已经发送成功,尚未确认");
outstandingConfirms.put(nextPublishSeqNo, (message + i));
}
// 等待消息被确认
Thread.sleep(10000);
channel.close();
connection.close();
# 3、springboot案例
# 3.1 pom.xml添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
# 3.2 application.properties添加RabbitMQ配置信息
spring.application.name=rabbitmq_publisher_message_confirm
spring.rabbitmq.host=47.113.82.142
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
# 3.3 主入口类
@SpringBootApplication
public class PublisherMessageConfirmApplication {
public static void main(String[] args) {
SpringApplication.run(PublisherMessageConfirmApplication.class, args);
}
}
# 3.4 RabbitConfig类
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
return new Queue("queue.biz.publisher.message.confirm", false, false, false, null);
}
@Bean
public Exchange exchange() {
return new DirectExchange("ex.biz.publisher.message.confirm", false, false, null);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("biz.publisher.message.confirm").noargs();
}
}
# 3.5 BizController类
@RestController
public class PublisherMessageConfirm {
private RabbitTemplate rabbitTemplate;
@Autowired
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setConfirmCallback((correlationData, flag, cause) -> {
if (flag) {
try {
System.out.println("消息确认:" + correlationData.getId() + " "
+ new String(correlationData.getReturnedMessage().getBody(), "utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
} else {
System.out.println(cause);
}
});
}
@RequestMapping("/biz")
public String doBiz() throws UnsupportedEncodingException {
MessageProperties props = new MessageProperties();
props.setCorrelationId("1234");
props.setConsumerTag("msg1");
props.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
props.setContentEncoding("utf-8");
// props.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
// props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
CorrelationData cd = new CorrelationData();
cd.setId("msg1");
cd.setReturnedMessage(new Message("这是msg1的响应".getBytes("utf-8"), null));
Message message = new Message("这是等待确认的消息".getBytes("utf-8"), props);
rabbitTemplate.convertAndSend("ex.biz.publisher.message.confirm", "biz.publisher.message.confirm", message, cd);
return "ok";
}
@RequestMapping("/bizFalse")
public String doBizFalse() throws UnsupportedEncodingException {
MessageProperties props = new MessageProperties();
props.setCorrelationId("1234");
props.setConsumerTag("msg1");
props.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
props.setContentEncoding("utf-8");
Message message = new Message("这是等待确认的消息".getBytes("utf-8"), props);
rabbitTemplate.convertAndSend("ex.bizFalse.publisher.message.confirm", "biz.publisher.message.confirm", message);
return "ok";
}
}
# 3.6 结果
# 五、持久化存储机制
持久化是提高RabbitMQ可靠性的基础,否则当RabbitMQ遇到异常时(如:重启、断电、停机等)数据将会丢失。主要从以下几个方面来保障消息的持久性:
Exchange的持久化。通过定义时设置durable 参数为ture来保证Exchange相关的元数据不丢失。
Queue的持久化。也是通过定义时设置durable 参数为ture来保证Queue相关的元数据不丢失。
消息的持久化。通过将消息的投递模式 (BasicProperties 中的 deliveryMode 属性)设置为 2 即可实现消息的持久化,保证消息自身不丢失。
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@riemann:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// durable:true表示是持久化消息队列
channel.queueDeclare("queue.persistent", true, false, false, null);
// 持久化的交换器
channel.exchangeDeclare("ex.persistent", "direct", true, false, null);
channel.queueBind("queue.persistent", "ex.persistent", "key.persistent");
final AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 表示是持久化消息
.build();
channel.basicPublish("ex.persistent",
"key.persistent",
properties, // 设置消息的属性,此时消息是持久化消息
"hello world".getBytes());
channel.close();
connection.close();
}
RabbitMQ中的持久化消息都需要写入磁盘(当系统内存不不足时,非持久化的消息也会被刷盘处理),这些处理理动作都是在“持久层”中完成的。持久层是一个逻辑上的概念,实际包含两个部分:
- 队列索引(rabbit_queue_index),rabbit_queue_index 负责维护Queue中消息的信息,包括消息的存储位置、是否已交给消费者、是否已被消费及Ack确认等,每个Queue都有与之对应的rabbit_queue_index。
- 消息存储(rabbit_msg_store),rabbit_msg_store 以键值对的形式存储消息,它被所有队列列共享,在每个节点中有且只有一个。
下图中,RabbitMQ Home的/var/lib/mnesia/rabbit@HOSTNAME/msg_stores/vhosts/$VHostId 这个路路径下包含 queues、msg_store_persistent、 msg_store_transient 这 3 个目录,这是实际存储消息的位置。其中queues目录中保存着 rabbit_queue_index相关的数据,而msg_store_persistent保存着持久化消息数据, msg_store_transient保存着非非持久化相关的数据。
另外,RabbitMQ通过配置queue_index_embed_msgs_below可以根据消息大小决定存储位置, 默认queue_index_embed_msgs_below是4096字节(包含消息体、属性及headers),小于该值的消息存在rabbit_queue_index中。
# 六、Consumer ACK
如何保证消息被消费者成功消费?
前面我们讲了生产者发送确认机制和消息的持久化存储机制,然而这依然无法完全保证整个过程的可靠性,因为如果消息被消费过程中业务处理失败了但是消息却已经出列了(被标记为已消费了),我们又没有任何重试,那结果跟消息丢失没什么分别。
RabbitMQ在消费端会有Ack机制,即消费端消费消息后需要发送Ack确认报文给Broker端,告知自己是否已消费完成,否则可能会一直重发消息直到消息过期(AUTO模式)。
这也是我们之前一直在讲的“最终一致性”、“可恢复性” 的基础。
一般而言,我们有如下处理手段:
- 采用NONE模式,消费的过程中自行捕获异常,引发异常后直接记录日志并落到异常恢复表, 再通过后台定时任务扫描异常恢复表尝试做重试动作。如果业务不自行处理则有丢失数据的风险。
- 采用AUTO(自动Ack)模式,不主动捕获异常,当消费过程中出现异常时会将消息放回 Queue中,然后消息会被重新分配到其他消费者节点(如果没有则还是选择当前节点)重新被消费,默认会一直重发消息并直到消费完成返回Ack或者一直到过期。
- 采用MANUAL(手动Ack)模式,消费者自行控制流程并手动调用channel相关的方法返回 Ack。
/**
* NONE模式,则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险
* AUTO模式,看情况确认,如果此时消费者抛出异常则消息会返回到队列中
* MANUAL模式,需要显式的调用当前channel的basicAck方法
* @param channel
* @param deliveryTag
* @param message
*/
@RabbitListener(queues = "lagou.topic.queue", ackMode = "AUTO")
public void handleMessageTopic(Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, @Payload byte[] message) {
System.out.println("RabbitListener消费消息,消息内容:" + new
String((message)));
try {
// 手动ack,deliveryTag表示消息的唯一标志,multiple表示是否是批量确认
channel.basicAck(deliveryTag, false);
// 手动nack,告诉broker消费者处理失败,最后一个参数表示是否需要将消息重新入列
channel.basicNack(deliveryTag, false, true);
// 手动拒绝消息。第二个参数表示是否重新入列
channel.basicReject(deliveryTag, true);
} catch (IOException e) {
e.printStackTrace();
}
}
上面是通过在消费端直接配置指定ackMode,在一些比较老的spring项目中一般是通过xml方式去定义、声明和配置的,不管是XML还是注解,相关配置、属性这些其实都是大同小异,触类旁通。然后需要注意的是channel.basicAck这几个手工Ack确认的方法。
SpringBoot项目中支持如下的一些配置:
#最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5
#是否开启消费者重试(为false时关闭消费者重试,意思不是“不重试”,而是一直收到消息直到jack确认或者一直到超时)
spring.rabbitmq.listener.simple.retry.enabled=true
#重试间隔时间(单位毫秒)
spring.rabbitmq.listener.simple.retry.initial-interval=5000
# 重试超过最大次数后是否拒绝
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#ack模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
本小节的内容总结起来就如图所示,本质上就是“请求/应答”确认模式
# springboot完整案例
# 1、pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
# 2、application.properties
# datasource相关配置
spring.datasource.druid.db-type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver=com.mysql.jdbc.Driver
spring.datasource.druid.url=jdbc:mysql://localhost:3306/spring?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false
spring.datasource.druid.username=root
spring.datasource.druid.password=root
# 连接池配置
spring.datasource.druid.initial-size=5
spring.datasource.druid.min-idle=5
spring.datasource.druid.max-active=20
# 连接等待超时时间
spring.datasource.druid.max-wait=30000
# 配置监测可以关闭的空闲连接间隔时间
spring.datasource.druid.time-between-eviction-runs-millis=60000
# 配置连接在池中的最小生存时间
spring.datasource.druid.min-evictable-idle-time-millis=300000
spring.datasource.druid.filter.stat.log-slow-sql=true
spring.application.name=rabbitmq_consumer_message_ack
spring.rabbitmq.host=47.113.82.142
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672
# 最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5
# 是否开启消费者重试(为false时关闭消费者重试,意思不是“不重试”,而是一直收到消息直到ack确认或者一直到超时)
spring.rabbitmq.listener.simple.retry.enabled=true
# 重试间隔时间(单位毫秒)
spring.rabbitmq.listener.simple.retry.initial-interval=5000
# 重试超过最大次数后是否拒绝
spring.rabbitmq.listener.simple.default-requeue-rejected=false
# ack模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 3、主入口类
@SpringBootApplication
public class ConsumerMessageAckApplication {
@Autowired
private RabbitTemplate rabbitTemplate;
public static void main(String[] args) {
SpringApplication.run(ConsumerMessageAckApplication.class, args);
}
@Bean
public ApplicationRunner runner() {
return args -> {
Thread.sleep(5000);
for (int i = 0; i < 10; i++) {
MessageProperties props = new MessageProperties();
props.setDeliveryTag(i);
Message message = new Message(("消息:" + i).getBytes("utf-8"), props);
this.rabbitTemplate.convertAndSend("ex.biz", "biz", message);
}
};
}
}
# 4、RabbitConfig
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
return new Queue("q.biz", false, false, false, null);
}
@Bean
public Exchange exchange() {
return new DirectExchange("ex.biz", false, false, null);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("biz").noargs();
}
}
# 5、MessageListener
public class MessageListener {
private Random random = new Random();
/**
* NONE模式,则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险
* AUTO模式,看情况确认,如果此时消费者抛出异常则消息会返回到队列中
* MANUAL模式,需要显式的调用当前channel的basicAck方法
*
* 在配置文件配置
*/
// @RabbitListener(queues = "q.biz", ackMode = "MANUAL")
@RabbitListener(queues = "q.biz")
public void handleMessageTopic(Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
@Payload String message) {
System.out.println("RabbitListener消费消息,消息内容:" + message);
try {
if (random.nextInt(10) % 3 != 0) {
// 手动nack,告诉broker消费者处理失败,最后一个参数表示是否需要将消息重新入列
// channel.basicNack(deliveryTag, false, true);
// 手动拒绝消息。第二个参数表示是否重新入列
channel.basicReject(deliveryTag, true);
} else {
// 手动ack,deliveryTag表示消息的唯一标志,multiple表示是否是批量确认
channel.basicAck(deliveryTag, false);
System.err.println("已确认消息:" + message);
}
} catch (IOException e) {
}
}
}
# 6、BizController
@RestController
public class BizController {
@Autowired
private RabbitTemplate rabbitTemplate;
private Random random = new Random();
@RequestMapping("/biz")
public String getBizMessage() {
String message = rabbitTemplate.execute(new ChannelCallback<String>() {
@Override
public String doInRabbit(Channel channel) throws Exception {
final GetResponse getResponse = channel.basicGet("q.biz", false);
if (getResponse == null) return "你已消费完所有的消息";
String message = new String(getResponse.getBody(), "utf-8");
if (random.nextInt(10) % 3 == 0) {
channel.basicAck(getResponse.getEnvelope().getDeliveryTag(),false);
return "已确认的消息:" + message;
} else {
// 拒收一条消息
channel.basicReject(getResponse.getEnvelope().getDeliveryTag(),true);
// 可以拒收多条消息
// channel.basicNack(getResponse.getEnvelope().getDeliveryTag(), false, true);
return "拒绝的消息:" + message;
}
}
});
return message;
}
}
# 七、消费端限流
在电商的秒杀活动中,活动一开始会有大量并发写请求到达服务端,需要对消息进行削峰处理,如何削峰?
当消息投递速度远快于消费速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩溃
,而分布式系统的故障往往会发生上下游传递,连锁反应那就会很悲剧。。。
下面我将从多个角度介绍QoS与限流,防止上面的悲剧发生。
1、RabbitMQ 可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应项指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已连接客户端的套接字读取数据。连接心跳监视也将被禁用。所有网络连接将在rabbitmqctl和管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以继续或被阻止,这意味着它们已发布,现在已暂停。兼容的客户端被阻止时将收到通知。
在/etc/rabbitmq/rabbitmq.conf中配置磁盘可用空间大小:
2、RabbitMQ 还默认提供了一种基于credit flow 的流控机制,面向每一个连接进行流控。当单个队列达到最大流速时,或者多个队列达到总流速时,都会触发流控。触发单个链接的流控可能是因为connection、channel、queue的某一个过程处于flow状态,这些状态都可以从监控平台看到。
3、RabbitMQ中有一种QoS保证机制,可以限制Channel上接收到的未被Ack的消息数量,如果超过这个数量限制RabbitMQ将不会再往消费端推送消息。这是一种流控手段,可以防止大量消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费端)。比较值得注意的是 QoS机制仅对于消费端推模式有效,对拉模式无效。而且不支持NONE Ack模式。执行channel.basicConsume 方法之前通过 channel.basicQoS 方法可以设置该数量。消息的发送是异步的,消息的确认也是异步的。在消费者消费慢的时候,可以设置Qos的 prefetchCount,它表示broker在向消费者发送消息的时候,一旦发送了prefetchCount个消息而没有一个消息确认的时候,就停止发送。消费者确认一个,broker就发送一个,确认两个就发送两个。换句话说,消费者确认多少,broker就发送多少,消费者等待处理的个数永远限制在prefetchCount个。
如果对于每个消息都发送确认,增加了网络流量,此时可以批量确认消息。如果设置了 multiple为true,消费者在确认的时候,比如说id是8的消息确认了,则在8之前的所有消息都确认了。
生产者往往是希望自己产生的消息能快速投递出去,而当消息投递太快且超过了下游的消费速度时就容易出现消息积压/堆积,所以,从上游来讲我们应该在生产端应用程序中加入限流、应急开关等控制手段,避免超过Broker端的极限承载能力或者压垮下游消费者。
再看看下游,我们期望下游消费端能尽快消费完消息,而且还要防止瞬时大量消息压垮消费端(推
模式),我们期望消费端处理速度是最快、最稳定而且还相对均匀(比较理想化)。
提升下游应用的吞吐量和缩短消费过程的耗时,优化主要以下几种方式:
- 优化应用程序的性能,缩短响应时间(需要时间)
- 增加消费者节点实例(成本增加,而且底层数据库操作这些也可能是瓶颈)
- 调整并发消费的线程数(线程数并非越大越好,需要大量压测调优至合理值)
@Bean
public RabbitListenerContainerFactory
rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
// SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其
// 转换为String类型的,没有content_type都按byte[]类型
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 设置并发线程数
factory.setConcurrentConsumers(10);
// 设置最大并发线程数
factory.setMaxConcurrentConsumers(20);
return factory;
}
代码demo
public class MessageQosConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@riemann:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("queue.qos", false, false, false, null);
// 使用basic做限流,仅对消息推送模式生效。
// 表示Qos是10个消息,最多有10个消息等待确认
channel.basicQos(10);
// 表示最多10个消息等待确认。如果global设置为true,则表示只要是使用当前的channel的Consumer,该设置都生效
// false表示仅限于当前Consumer
channel.basicQos(10, false);
// 第一个参数表示未确认消息的大小,Rabbit没有实现,不用管。
channel.basicQos(1000, 10, true);
channel.basicConsume("queue.qos", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
// some code going on
// 可以批量确认消息,减少每个消息都发送确认带来的网络流量负载。
channel.basicAck(envelope.getDeliveryTag(), true);
}
});
channel.close();
connection.close();
}
}
# 八、消息可靠性保障
在讲高级特性的时候几乎已经都涉及到了,这里简单回顾总结下:
- 消息传输保障
- 各种限流、应急手段
- 业务层面的一些容错、补偿、异常重试等手段
消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障分为三个层级:
- At most once:最多一次。消息可能会丢失,但绝不会重复传输
- At least once:最少一次。消息绝不会丢失,但可能会重复传输
- Exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次
RabbitMQ 支持其中的“最多一次”和“最少一次”。
1、其中“最少一次”投递实现需要考虑以下这个几个方面的内容:
消息生产者需要开启事务机制或者publisher confirm 机制,以确保消息可以可靠地传输到 RabbitMQ 中。
消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。
消息和队列都需要进行持久化处理,以确保RabbitMQ 服务器在遇到异常情况时不会造成消息丢失。
消费者在消费消息的同时需要将autoAck 设置为false,然后通过手动确认的方式去确认已经正确消费的消息,以避免在消费端引起不必要的消息丢失。
2、“最多一次”的方式就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这样很难确保消息不会丢失。(估计有不少公司的业务系统都是这样的,想想都觉得可怕)
3、**“恰好一次”**是RabbitMQ目前无法保障的。
考虑这样一种情况,消费者在消费完一条消息之后向RabbitMQ 发送确认Basic.Ack 命令,此时由于网络断开或者其他原因造成RabbitMQ 并没有收到这个确认命令,那么RabbitMQ 不会将此条消息标记删除。在重新建立连接之后,消费者还是会消费到这一条消息,这就造成了重复消费。
再考虑一种情况,生产者在使用publisher confirm机制的时候,发送完一条消息等待RabbitMQ返回确认通知,此时网络断开,生产者捕获到异常情况,为了确保消息可靠性选择重新发送,这样 RabbitMQ 中就有两条同样的消息,在消费的时候消费者就会重复消费。
# 九、消息幂等性处理
刚刚我们讲到,追求高性能就无法保证消息的顺序,而追求可靠性那么就可能产生重复消息,从而导致重复消费。。。真是应证了那句老话:做架构就是权衡取舍。
RabbitMQ层面有实现**“去重机制”来保证“恰好一次”吗?答案是并没有**。而且这个在目前主流的消息中间件都没有实现。
借用淘宝沈洵的一句话:最好的解决办法就是不去解决。当为了在基础的分布式中间件中实现某种相对不太通用的功能,需要牺牲到性能、可靠性、扩展性时,并且会额外增加很多复杂度,最简单的办法就是交给业务自己去处理。事实证明,很多业务场景下是可以容忍重复消息的。例如:操作日志收集,而对一些金融类的业务则要求比较严苛。
一般解决重复消息的办法是,在消费端让我们消费消息的操作具备幂等性。
幂等性问题并不是消息系统独有,而是(分布式)系统中普遍存在的问题。例如:RPC框架调用超 后会重试,HTTP请求会重复发起(用户手抖多点了几下按钮)
幂等(Idempotence)是一个数学上的概念,它是这样定义的:
如果一个函数f(x) 满足:f(f(x)) = f(x),则函数f(x) 满足幂等性。这个概念被拓展到计算机领域,被用来描述一个操作、方法或者服务。
一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。
对于幂等的方法,不用担心重复执行会对系统造成任何改变。
举个简单的例子(在不考虑并发问题的情况下):
select * from xx where id=1
delete from xx where id=1
这两条sql语句就是天然幂等的,它本身的重复执行并不会引起什么改变。而update就要看情况的,
update xxx set amount=100 where id=1
这条语句执行1次和100次都是一样的结果(最终余额都还是100),所以它是满足幂等性的。 而
update xxx set amount=amount+100 whereid=1
它就不满足幂等性的。
业界对于幂等性的一些常见做法:
- 借助数据库唯一索引,重复插入直接报错,事务回滚。还是举经典的转账的例子,为了保证不重复扣款或者重复加钱,我们这边维护一张“资金变动流水表”,里面至少需要交易单号、变动账户、变动金额等3个字段。我们选择交易单号和变动账户做联合唯一索引(单号是上游生成 的可保证唯一性),这样如果同一笔交易发生重复请求时就会直接报索引冲突,事务直接回滚。现实中,数据库唯一索引的方式通常做为兜底保证。
- 前置检查机制。这个很容易理解,并且有几种实现办法。还是引用上面转账的例子,当我在执行更改账户余额这个动作之前,我得先检查下资金变动流水表(或者Tair中)中是否已经存在这笔交易相关的记录了,select * from xx where account=xxx and orderId=yyy,如果已经存在,那么直接返回,否则执行正常的更新余额的动作。为了防止并发问题,我们通常需要借助“排他锁”来完成。在支付宝有一条铁律叫:一锁、二判、三操作。当然,我们也可以使用乐观锁或CAS机制,乐观锁一般会使用扩展一个版本号字段做判断条件。
- 唯一Id机制,比较通用的方式。对于每条消息我们都可以生成唯一Id,消费前判断Tair中是否存在(MsgId做Tair排他锁的key),消费成功后将状态写入Tair中,这样就可以防止重复消费 了。
对于接口请求类的幂等性保证要相对更复杂,我们通常要求上游请求时传递一个类GUID的请求号 (或TOKEN),如果我们发现已经存在了并且上一次请求处理结果是成功状态的(有时候上游的重试请求是正常诉求,我们不能将上一次异常/失败的处理结果返回或者直接提示“请求异常”,如果这样重试就变得没意义了)则不继续往下执行,直接返回“重复请求”的提示和上次的处理结果(上游通常是由于请 求超时等未知情况才发起重试的,所以直接返回上次请求的处理结果就好了)。如果请求ID都不存在或者上次处理结果是失败/异常的,那就继续处理流程,并最终记录最终的处理结果。这个请求序号由上游自己生成,上游通用需要根据请求参数、时间间隔等因子来生成请求ID。同样也需要利用这个请求ID做分布式锁的KEY实现排他。
# 2.2 可靠性分析
在使用任何消息中间件的过程中,难免会出现消息丢失等异常情况,这个时候就需要有一个良好的
机制来跟踪记录消息的过程(轨迹溯源),帮助我们排查问题。
在RabbitMQ 中可以使用Firehose 功能来实现消息追踪,Firehose 可以记录每一次发送或者消费
消息的记录,方便RabbitMQ 的使用者进行调试、排错等。
Firehose 的原理是将生产者投递给RabbitMQ 的消息,或者RabbitMQ 投递给消费者的消息按照指定的格式发送到默认的交换器上。这个默认的交换器的名称为 amq.rabbitmq.trace ,它是一个topic 类型的交换器。发送到这个交换器上的消息的路由键为 publish.{exchangename} 和 deliver.{queuename} 。其中 exchangename 和 queuename 为交换器和队列的名称,分别对应生产者投递到交换器的消息和消费者从队列中获取的消息。
开启Firehose命令:
rabbitmqctl trace_on [-p vhost]
其中[-p vhost]是可选参数,用来指定虚拟主机vhost。
对应的关闭命令为:
rabbitmqctl trace_off [-p vhost]
Firehose 默认情况下处于关闭状态,并且Firehose 的状态是非持久化的,会在RabbitMQ服务重启
的时候还原成默认的状态。Firehose 开启之后多少会影响RabbitMQ 整体服务性能,因为它会引起额外的消息生成、路由和存储。
rabbitmq_tracing 插件相当于Firehose 的GUI 版本,它同样能跟踪RabbitMQ 中消息的流入流出
情况。rabbitmq_tracing 插件同样会对流入流出的消息进行封装,然后将封装后的消息日志存入相应的trace 文件中。
可以使用
rabbitmq-plugins enable rabbitmq_tracing
命令来启动rabbitmq_ tracing 插件
使用
rabbitmq-plugins disable rabbitmq_tracing
命令关闭该插件。
Name表示rabbitmq_tracing的一个条目的名称,Format可以选择Text或JSON,连接的用户名写
root,密码写123456。
Pattern:发布的消息:publish.<exname>
Pattern:消费的消息:deliver.<queuename>
# 2.3 TTL机制
在京东下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内
用户没有支付,则默认订单取消。
该如何实现?
- 定期轮询(数据库等)
用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更
改数据库状态。定期轮询数据库支付状态,如果超过30分钟就将该订单取消。
优点:设计实现简单
缺点:需要对数据库进行大量的IO操作,效率低下。
- Timer
SimpleDateFormat simpleDateFormat = new
SimpleDateFormat("HH:mm:ss");
Timer timer = new Timer();
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
System.out.println("用户没有付款,交易取消:" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
timer.cancel();
}
};
System.out.println("等待用户付款:" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
// 10秒后执行timerTask
timer.schedule(timerTask, 10 * 1000);
缺点:
Timers没有持久化机制.
Timers不灵活 (只可以设置开始时间和重复间隔,对等待支付貌似够用)
Timers 不能利用线程池,一个timer一个线程
Timers没有真正的管理计划
- ScheduledExecutorService
SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
// 线程工厂
ThreadFactory factory = Executors.defaultThreadFactory();
// 使用线程池
ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10, factory);
System.out.println("开始等待用户付款10秒:" + format.format(new Date()));
service.schedule(new Runnable() {
@Override
public void run() {
System.out.println("用户未付款,交易取消:" + format.format(new Date()));
}// 等待10s 单位秒
}, 10, TimeUnit.SECONDS);
优点:可以多线程执行,一定程度上避免任务间互相影响,单个任务异常不影响其
它任务。
在高并发的情况下,不建议使用定时任务去做,因为太浪费服务器性能,不建议。
- RabbitMQ
使用TTL
- Quartz
- Redis Zset
- JCronTab
- SchedulerX
- 。。。
TTL,Time to Live 的简称,即过期时间。
RabbitMQ 可以对消息和队列两个维度来设置TTL。
任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种
过期的机制来做兜底。
目前有两种方法可以设置消息的TTL。
- 通过Queue属性设置,队列中所有消息都有相同的过期时间。
- 对消息自身进行单独设置,每条消息的TTL 可以不同。
如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准。通常来讲,消息在队列中的生存
时间一旦超过设置的TTL 值时,就会变成“死信”(Dead Message),消费者默认就无法再收到该消息。当然,“死信”也是可以被取出来消费的,下一小节我们会讲解。
# 原生API案例
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 创建队列(实际上使用的是AMQP default这个direct类型的交换器)
// 设置队列属性
Map<String, Object> arguments = new HashMap<>();
// 设置队列的TTL
arguments.put("x-message-ttl", 30000);
// 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,队列可以存活多久)
arguments.put("x-expires", 10000);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
for (int i = 0; i < 1000000; i++) {
String message = "Hello World!" + i;
channel.basicPublish(
"",
QUEUE_NAME,
new AMQP.BasicProperties().builder().expiration("30000").build(),
message.getBytes()
System.out.println(" [X] Sent '" + message + "'");
);
}
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
此外,还可以通过命令行方式设置全局TTL,执行如下命令:
rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues
还可以通过restful api方式设置,这里不做过多介绍。
默认规则:
- 如果不设置TTL,则表示此消息不会过期;
- 如果TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢
弃;
注意理解 message-ttl 、 x-expires 这两个参数的区别,有不同的含义。但是这两个参数属性都
遵循上面的默认规则。一般TTL相关的参数单位都是毫秒(ms)
# springboot案例
# 1. pom.xml添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
# 2. application.properties添加rabbitmq连接信息
spring.application.name=ttl
spring.rabbitmq.host=node1
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672
# 3. 主入口类
package com.lagou.rabbitmq.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitmqDemo {
public static void main(String[] args) {
SpringApplication.run(RabbitmqDemo07.class, args);
}
}
# 4. RabbitConfig类
package com.lagou.rabbitmq.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
@Bean
public Queue queueTTLWaiting() {
Map<String, Object> props = new HashMap<>();
// 对于该队列中的消息,设置都等待10s
props.put("x-message-ttl", 10000);
Queue queue = new Queue("q.pay.ttl-waiting", false, false, false, props);
return queue;
}
@Bean
public Queue queueWaiting() {
Queue queue = new Queue("q.pay.waiting", false, false, false);
return queue;
}
@Bean
public Exchange exchangeTTLWaiting() {
DirectExchange exchange = new DirectExchange("ex.pay.ttl-waiting", false, false);
return exchange;
}
/**
* 该交换器使用的时候,需要给每个消息设置有效期
* @return
*/
@Bean
public Exchange exchangeWaiting() {
DirectExchange exchange = new DirectExchange("ex.pay.waiting", false, false);
return exchange;
}
@Bean
public Binding bindingTTLWaiting() {
return BindingBuilder.bind(queueTTLWaiting()).to(exchangeTTLWaiting()).with("pay.ttl-waiting").noargs();
}
@Bean
public Binding bindingWaiting() {
return BindingBuilder.bind(queueWaiting()).to(exchangeWaiting()).with("pay.waiting").noargs();
}
}
# 5. PayController类
package com.lagou.rabbitmq.demo.controller;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;
@RestController
public class PayController {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping("/pay/queuettl")
public String sendMessage() {
rabbitTemplate.convertAndSend("ex.pay.ttl-waiting", "pay.ttl-waiting", "发送了TTL-WAITING-MESSAGE");
return "queue-ttl-ok";
}
@RequestMapping("/pay/msgttl")
public String sendTTLMessage() throws UnsupportedEncodingException {
MessageProperties properties = new MessageProperties();
properties.setExpiration("5000");
Message message = new Message("发送了WAITING-MESSAGE".getBytes("utf-8"), properties);
rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
return "msg-ttl-ok";
}
}
# 2.4 死信队列
用户下单,调用订单服务,然后订单服务调用派单系统通知外卖人员送单,这时候订单系统与派单
系统 采用 MQ异步通讯。
在定义业务队列时可以考虑指定一个 死信交换机,并绑定一个死信队列。当消息变成死信时,该消
息就会被发送到该死信队列上,这样方便我们查看消息失败的原因。
DLX,全称为Dead-Letter-Exchange,死信交换器。消息在一个队列中变成死信(Dead Letter)
之后,被重新发送到一个特殊的交换器(DLX)中,同时,绑定DLX的队列就称为**“死信队列**”。
以下几种情况导致消息变为死信:
- 消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;
- 消息过期;
- 队列达到最大长度。
对于RabbitMQ 来说,DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被
消费者正确消费(消费者调用了Basic.Nack 或者Basic.Reject)而被置入死信队列中的情况,后
续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善
和优化系统。
# 原生API案例
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 定义一个死信交换器(也是一个普通的交换器)
channel.exchangeDeclare("exchange.dlx", "direct", true);
// 定义一个正常业务的交换器
channel.exchangeDeclare("exchange.biz", "fanout", true);
Map<String, Object> arguments = new HashMap<>();
// 设置队列TTL
arguments.put("x-message-ttl", 10000);
// 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加入死信队列)
arguments.put("x-dead-letter-exchange", "exchange.dlx");
// 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的routingKey
arguments.put("x-dead-letter-routing-key", "routing.key.dlx.test");
channel.queueDeclare("queue.biz", true, false, false, arguments);
channel.queueBind("queue.biz", "exchange.biz", "");
channel.queueDeclare("queue.dlx", true, false, false, null);
// 死信队列和死信交换器
channel.queueBind("queue.dlx", "exchange.dlx", "routing.key.dlx.test");
channel.basicPublish("exchange.biz", "",
MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx.test".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
# springboot案例
# 1. pom.xml添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
# 2. application.properties添加RabbitMQ连接信息
spring.application.name=dlx
spring.rabbitmq.host=node1
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672
# 3. 主入口类:
package com.lagou.rabbitmq.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitmqDemo {
public static void main(String[] args) {
SpringApplication.run(RabbitmqDemo08.class, args);
}
}
# 4. RabbitConfig类:
package com.lagou.rabbitmq.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
Map<String, Object> props = new HashMap<>();
// 消息的生存时间 10s
props.put("x-message-ttl", 10000);
// 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加入死信队列)
props.put("x-dead-letter-exchange", "ex.go.dlx");
// 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的routingKey
props.put("x-dead-letter-routing-key", "go.dlx");
Queue queue = new Queue("q.go", true, false, false, props);
return queue;
}
@Bean
public Queue queueDlx() {
Queue queue = new Queue("q.go.dlx", true, false, false);
return queue;
}
@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("ex.go", true,false, null);
return exchange;
}
/**
* 死信交换器
* @return
*/
@Bean
public Exchange exchangeDlx() {
DirectExchange exchange = new DirectExchange("ex.go.dlx",true, false, null);
return exchange;
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("go").noargs();
}
/**
* 死信交换器绑定死信队列
* @return
*/
@Bean
public Binding bindingDlx() {
return BindingBuilder.bind(queueDlx()).to(exchangeDlx()).with("go.dlx").noargs();
}
}
# 5. GoController类:
package com.lagou.rabbitmq.demo.controller;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class GoController {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping("/go")
public String distributeGo() {
rabbitTemplate.convertAndSend("ex.go", "go", "送单到石景山x小区,请在10秒内接受任务");
return "任务已经下发,等待送单。。。";
}
@RequestMapping("/notgo")
public String getAccumulatedTask() {
String notGo = (String) rabbitTemplate.receiveAndConvert("q.go.dlx");
return notGo;
}
}
# 2.5 延迟队列
延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消
费。
例如下面的业务场景:在支付宝上面买电影票,锁定了一个座位后系统默认会帮你保留15分钟时
间,如果15分钟后还没付款那么不好意思系统会自动把座位释放掉。怎么实现类似的功能呢?
- 可以用定时任务每分钟扫一次,发现有占座超过15分钟还没付款的就释放掉。但是这样做很
低效,很多时候做的都是些无用功;
- 可以用分布式锁、分布式缓存的被动过期时间,15分钟过期后锁也释放了,缓存key也不存在
了;
- 还可以用延迟队列,锁座成功后会发送1条延迟消息,这条消息15分钟后才会被消费,消费的
过程就是检查这个座位是否已经是“已付款”状态;
你在公司的协同办公系统上面预约了一个会议,邀请汪产品和陈序员今晚22点准时参加会有。系统还比较智能,除了默认发会议邀请的邮件告知参会者以外,到了今晚21:45分的时候(提前15分钟)就会通知提醒参会人员做好参会准备,会议马上开始...
同样的,这也可以通过轮询“会议预定表”来实现,比如我每分钟跑一次定时任务看看当前有哪些会议即将开始了。当然也可以通过延迟消息来实现,预定会议以后系统投递一条延迟消息,而这条消息比较特殊不会立马被消费,而是延迟到指定时间后再触发消费动作(发通知提醒参会人准备)。不过遗憾的是,在AMQP协议和RabbitMQ中都没有相关的规定和实现。不过,我们似乎可以借助上一小节介绍的“死信队列”来变相的实现。
可以使用rabbitmq_delayed_message_exchange插件实现。
这里和TTL方式有个很大的不同就是TTL存放消息在死信队列(delayqueue)里,二基于插件存放消息在延时交换机里(x-delayed-message exchange)。
- 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
- 延时交换机(exchange)存储消息等待消息到期根据路由键(routekey)找到绑定自己的队列
(queue)并把消息给它
- 队列(queue)再把消息发送给监听它的消费者(customer)
# 1. 下载插件
# 2. 安装插件
将插件拷贝到rabbitmq-server的安装路径:/usr/lib/rabbitmq/lib/rabbitmq_server-
3.8.4/plugins
# 3. 启用插件
rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 4. 重启rabbitmq-server
systemctl restart rabbitmq-server
# 5. 编写代码,首先是SpringBootApplication主入口类
package com.lagou.rabbitmq.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitmqDemo {
public static void main(String[] args) {
SpringApplication.run(RabbitmqDemo.class, args);
}
}
# RabbitMQ的对象配置
package com.lagou.rabbitmq.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
Queue queue = new Queue("q.delayed", false, false, false, null);
return queue;
}
@Bean
public Exchange exchange() {
Map<String, Object> props = new HashMap<>();
props.put("x-delayed-type", ExchangeTypes.FANOUT);
Exchange exchange = new CustomExchange("ex.delayed", "x-delayed-message", true, false, props);
return exchange;
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("key.delayed").noargs();
}
}
# 使用推消息模式接收延迟队列的广播
package com.lagou.rabbitmq.demo.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class MeetingListener {
@RabbitListener(queues = "q.delayed")
public void broadcastMeetingAlarm(Message message, Channel channel) throws IOException {
System.err.println("提醒:5秒后:" + newS tring(message.getBody(), "utf-8"));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
# 开发RestController,用于向延迟队列发送消息,并指定延迟的时长
package com.lagou.rabbitmq.demo.controller;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.UnsupportedEncodingException;
@RestController
public class PublishController {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping("/prepare/{seconds}")
public String toMeeting(@PathVariable Integer seconds) throws
UnsupportedEncodingException {
// RabbitMQ只会检查队列头部的消息是否过期,如果过期就放到死信队列
// 假如第一个过期时间很长,10s,第二个消息3s,则系统先看第一个消息,等到第一个消息过期,放到DLX
// 此时才会检查第二个消息,但实际上此时第二个消息早已经过期了,但是并没有先于第一个消息放到DLX。
// 插件rabbitmq_delayed_message_exchange帮我们搞定这个。
MessageProperties properties = new MessageProperties();
properties.setHeader("x-delay", (seconds - 10) * 1000);
Message message = new Message((seconds + "秒后召开销售部门会议。").getBytes("utf-8"), properties);
// 如果不设置message的properties,也可以使用下述方法设置x-delay属性的值
// rabbitTemplate.convertAndSend("ex.delayed", "key.delayed", message, msg -> {
// // 使用定制的属性x-delay设置过期时间,也就是提前5s提醒
// // 当消息转换完,设置消息头字段
// msg.getMessageProperties().setHeader("x-delay", (seconds - 5) * 1000);
// return msg;
// });
rabbitTemplate.convertAndSend("ex.delayed", "key.delayed",
message);
return "已经定好闹钟了,到时提前告诉大家";
}
}
# application.properties中添加rabbitmq的配置
spring.application.name=delayedqueue
spring.rabbitmq.host=node1
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672
# pom.xml添加依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>