RocketMQ高级特性及原理
# 2.1 消费发送
生产者向消息队列里写入消息,不同的业务场景需要生产者采用不同的写入策略。比如同步发送、异步发送、Oneway发送、延迟发送、发送事务消息等。 默认使用的是DefaultMQProducer类,发送消息要经过五个步骤:
- 1)设置Producer的GroupName。
- 2)设置InstanceName,当一个Jvm需要启动多个Producer的时候,通过设置不同的InstanceName来区分,不设置的话系统使用默认名称“DEFAULT”。
- 3)设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。想保证不丢消息,可以设置多重试几次。
- 4)设置NameServer地址
- 5)组装消息并发送。
消息发生返回状态(SendResult#SendStatus)有如下四种:
- FLUSH_DISK_TIMEOUT
- FLUSH_SLAVE_TIMEOUT
- SLAVE_NOT_AVAILABLE
- SEND_OK
不同状态在不同的刷盘策略和同步策略的配置下含义是不同的:
- **FLUSH_DISK_TIMEOUT:**表示没有在规定时间内完成刷盘(需要Broker的刷盘策略被设置成SYNC_FLUSH才会报这个错误)。
- FLUSH_SLAVE_TIMEOUT:表示在主备方式下,并且Broker被设置成SYNC_MASTER方式,没有在设定时间内完成主从同步。
- **SLAVE_NOT_AVAILABLE:**这个状态产生的场景和FLUSH_SLAVE_TIMEOUT类似,表示在主备方式下,并且Broker被设置成SYNC_MASTER,但是没有找到被配置成Slave的Broker。
- **SEND_OK:**表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave上?消息在Slave上是否被写入磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND_OK。
写一个高质量的生产者程序,重点在于对发送结果的处理,要充分考虑各种异常,写清对应的处理逻辑。
# 提升写入的性能
发送一条消息出去要经过三步
- 客户端发送请求到服务器。
- 服务器处理该请求。
- 服务器向客户端返回应答
一次消息的发送耗时是上述三个步骤的总和。
在一些对速度要求高,但是可靠性要求不高的场景下,比如日志收集类应用, 可以采用Oneway方式发送
Oneway方式只发送请求不等待应答,即将数据写入客户端的Socket缓冲区就返回,不等待对方返回结果。
用这种方式发送消息的耗时可以缩短到微秒级。
另一种提高发送速度的方法是增加Producer的并发量,使用多个Producer同时发送,
我们不用担心多Producer同时写会降低消息写磁盘的效率,RocketMQ引入了一个并发窗口,在窗口内消息可以并发地写入DirectMem中,然后异步地将连续一段无空洞的数据刷入文件系统当中。
顺序写CommitLog可让RocketMQ无论在HDD还是SSD磁盘情况下都能保持较高的写入性能。
目前在阿里内部经过调优的服务器上,写入性能达到90万+的TPS,我们可以参考这个数据进行系统优化。
在Linux操作系统层级进行调优,推荐使用EXT4文件系统,IO调度算法使用deadline算法。
# 2.2 消息消费
简单总结消费的几个要点:
- 消息消费方式(Pull和Push)
- 消息消费的模式(广播模式和集群模式)
- 流量控制(可以结合sentinel来实现,后面单独讲)
- 并发线程数设置
- 消息的过滤(Tag、Key) TagA||TagB||TagC * null
当Consumer的处理速度跟不上消息的产生速度,会造成越来越多的消息积压,这个时候首先查看消费逻辑本身有没有优化空间,除此之外还有三种方法可以提高Consumer的处理能力。
1. 提高消费并行度
在同一个ConsumerGroup下(Clustering方式),可以通过增加Consumer实例的数量来提
高并行度。
通过加机器,或者在已有机器中启动多个Consumer进程都可以增加Consumer实例数。
注意:总的Consumer数量不要超过Topic下Read Queue数量,超过的Consumer实例接收
不到消息。
此外,通过提高单个Consumer实例中的并行处理的线程数,可以在同一个Consumer内增加
并行度来提高吞吐量(设置方法是修改consumeThreadMin和consumeThreadMax)。
2. 以批量方式进行消费
某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中
涉及update某个数据库,一次update10条的时间会大大小于十次update1条数据的时间。
可以通过批量方式消费来提高消费的吞吐量。实现方法是设置Consumer的
consumeMessageBatchMaxSize这个参数,默认是1,如果设置为N,在消息多的时候每次
收到的是个长度为N的消息链表。
3. 检测延时情况,跳过非重要消息
Consumer在消费的过程中,如果发现由于某种原因发生严重的消息堆积,短时间无法消除堆
积,这个时候可以选择丢弃不重要的消息,使Consumer尽快追上Producer的进度。
# 2.3 消息存储
# 2.3.1 存储介质
# 关系型数据库DB
Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障
# 文件系统
目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。
# 2.3.2 性能对比
文件系统>关系型数据库DB
# 2.3.3 消息的存储和发送
# 1) 消息存储
目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。
但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!
因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。
RocketMQ的消息用顺序写,保证了消息存储的速度。
# 2) 存储结构
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。
- 消息存储架构图中主要有下面三个跟消息存储相关的文件构成。
(1) CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消
息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移
量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为
1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏
移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文
件;
(2) ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能
RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行
如果要遍历commitlog文件根据topic检索消息是非常低效。
Consumer即可根据ConsumeQueue来查找待消费的消息。
其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引:
- 保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset
- 消息大小size
- 消息Tag的HashCode值。
consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:
topic/queue/file三层组织结构
具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。
consumequeue文件采取定长设计,每个条目共20个字节,分别为:
- 8字节的commitlog物理偏移量
- 4字节的消息长度
- 8字节tag hashcode
单个文件由30W个条目组成,可以像数组一样随机访问每一个条目
每个ConsumeQueue文件大小约5.72M;
(3) IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。
- Index文件的存储位置是: $HOME/store/index/${fileName}
- 文件名fileName是以创建时的时间戳命名的
- 固定的单个IndexFile文件大小约为400M
- 一个IndexFile可以保存 2000W个索引
- IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底
层实现为hash索引。
# 2.4 过滤消息
RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。
RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。
其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正式基于这个字段值的。
主要支持如下2种的过滤方式
(1) Tag过滤方式:Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。
- Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。
- Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。
- Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤。
- 在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。
(2) SQL92的过滤方式:
仅对push的消费者起作用。
Tag方式虽然效率高,但是支持的过滤逻辑比较简单。
SQL表达式可以更加灵活的支持复杂过滤逻辑,这种方式的大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样
真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。
每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。
SQL92的表达式上下文为消息的属性。
conf/broker.conf
首先需要开启支持SQL92的特性,然后重启broker:
mqbroker -n localhost:9876 -c /opt/rocket/conf/broker.conf
RocketMQ仅定义了几种基本的语法,用户可以扩展:
- 数字比较:
>
,>=
,<
,<=
,BETWEEN
,=
- 字符串比较:
=
,<>
,IN
;IS NULL
或者IS NOT NULL
; - 逻辑比较:
AND
,OR
,NOT
; - Constant types are: 数字如:123, 3.1415; 字符串如:'abc',必须是单引号引起来 NULL,特
殊常量 布尔型如:TRUE or FALSE;
(3) Filter Server方式。这是一种比SQL表达式更灵活的过滤方式,允许用户自定义Java函数,根据Java函数的逻辑对消息进行过滤。
要使用Filter Server,首先要在启动Broker前在配置文件里加上 filterServer-Nums=3 这样的配置,Broker在启动的时候,就会在本机启动3个Filter Server进程。Filter Server类似一个RocketMQ的Consumer进程,它从本机Broker获取消息,然后根据用户上传过来的Java函数进行过滤,过滤后的消息再传给远端的Consumer。
这种方式会占用很多Broker机器的CPU资源,要根据实际情况谨慎使用。上传的java代码也要经过检查,不能有申请大内存、创建线程等这样的操作,否则容易造成Broker服务器宕机。
# 2.5 零拷贝原理
# 2.5.1 PageCache
- 由内存中的物理page组成,其内容对应磁盘上的block。
- page cache的大小是动态变化的。
- backing store: cache缓存的存储设备
- 一个page通常包含多个block, 而block不一定是连续的。
# 2.5.1.1 读Cache
当内核发起一个读请求时, 先会检查请求的数据是否缓存到了page cache中。
- 如果有,那么直接从内存中读取,不需要访问磁盘, 此即 cache hit(缓存命中)
- 如果没有, 就必须从磁盘中读取数据, 然后内核将读取的数据再缓存到cache中, 如此后续的读请求就可以命中缓存了。
page可以只缓存一个文件的部分内容, 而不需要把整个文件都缓存进来。
# 2.5.1.2 写Cache
- 当内核发起一个写请求时, 也是直接往cache中写入, 后备存储中的内容不会直接更新。
- 内核会将被写入的page标记为dirty, 并将其加入到dirty list中。
- 内核会周期性地将dirty list中的page写回到磁盘上, 从而使磁盘上的数据和内存中缓存的数据一致。
# 2.5.1.3 cache回收
Page cache的另一个重要工作是释放page, 从而释放内存空间。
cache回收的任务是选择合适的page释放
- 如果page是dirty的, 需要将page写回到磁盘中再释放。
# 2.5.2 cache和buffer的区别
- Cache:缓存区,是高速缓存,是位于CPU和主内存之间的容量较小但速度很快的存储器,因为CPU的速度远远高于主内存的速度,CPU从内存中读取数据需等待很长的时间,而 Cache
保存着CPU刚用过的数据或循环使用的部分数据,这时从Cache中读取数据会更快,减少了
CPU等待的时间,提高了系统的性能。
Cache并不是缓存文件的,而是缓存块的(块是I/O读写最小的单元);Cache一般会用在I/O请求上,如果多个进程要访问某个文件,可以把此文件读入Cache中,这样下一个进程获取CPU控制权并访问此文件直接从Cache读取,提高系统性能。
- Buffer:缓冲区,用于存储速度不同步的设备或优先级不同的设备之间传输数据;通过buffer
可以减少进程间通信需要等待的时间,当存储速度快的设备与存储速度慢的设备进行通信时,
存储慢的数据先把数据存放到buffer,达到一定程度存储快的设备再读取buffer的数据,在此
期间存储快的设备CPU可以干其他的事情。
Buffer:一般是用在写入磁盘的,例如:某个进程要求多个字段被读入,当所有要求的字段被读入之前已经读入的字段会先放到buffer中。
# 2.5.3 HeapByteBuffer和DirectByteBuffer
HeapByteBuffer,是在jvm堆上面一个buffer,底层的本质是一个数组,用类封装维护了很多的索引(limit/position/capacity等)。
DirectByteBuffer,底层的数据是维护在操作系统的内存中,而不是jvm里,DirectByteBuffer里维护了一个引用address指向数据,进而操作数据。
HeapByteBuffer优点:内容维护在jvm里,把内容写进buffer里速度快;更容易回收
DirectByteBuffer优点:跟外设(IO设备)打交道时会快很多,因为外设读取jvm堆里的数据时,不是直接读取的,而是把jvm里的数据读到一个内存块里,再在这个块里读取的,如果使用
DirectByteBuffer,则可以省去这一步,实现zero copy(零拷贝)
外设之所以要把jvm堆里的数据copy出来再操作,不是因为操作系统不能直接操作jvm内存,而是因为jvm在进行gc(垃圾回收)时,会对数据进行移动,一旦出现这种问题,外设就会出现数据错乱的情况。
所有的通过allocate方法创建的buffer都是HeapByteBuffer.
堆外内存实现零拷贝
- 前者分配在JVM堆上(ByteBuffer.allocate()),后者分配在操作系统物理内存上
(ByteBuffer.allocateDirect(),JVM使用C库中的malloc()方法分配堆外内存);
- DirectByteBuffer可以减少JVM GC压力,当然,堆中依然保存对象引用,fullgc发生时也会回收直接内存,也可以通过system.gc主动通知JVM回收,或者通过 cleaner.clean主动清理。
Cleaner.create()方法需要传入一个DirectByteBuffer对象和一个Deallocator(一个堆外内存
回收线程)。GC发生时发现堆中的DirectByteBuffer对象没有强引用了,则调用Deallocator
的run()方法回收直接内存,并释放堆中DirectByteBuffer的对象引用;
- 底层I/O操作需要连续的内存(JVM堆内存容易发生GC和对象移动),所以在执行write操作时需要将HeapByteBuffer数据拷贝到一个临时的(操作系统用户态)内存空间中,会多一次额外
拷贝。而DirectByteBuffer则可以省去这个拷贝动作,这是Java层面的 “零拷贝” 技术,在
netty中广泛使用;
- MappedByteBuffer底层使用了操作系统的mmap机制,FileChannel#map()方法就会返回
MappedByteBuffer。DirectByteBuffer虽然实现了MappedByteBuffer,不过
DirectByteBuffer默认并没有直接使用mmap机制。
# 2.5.4 缓冲IO和直接IO
# 2.5.4.1 缓存IO
缓存I/O又被称作标准I/O,大多数文件系统的默认I/O操作都是缓存I/O。在Linux的缓存I/O机制中,数据先从磁盘复制到内核空间的缓冲区,然后从内核空间缓冲区复制到应用程序的地址空间。
读操作:操作系统检查内核的缓冲区有没有需要的数据,如果已经缓存了,那么就直接从缓存中返回;否则从磁盘中读取,然后缓存在操作系统的缓存中。
写操作:将数据从用户空间复制到内核空间的缓存中。这时对用户程序来说写操作就已经完成,至于什么时候再写到磁盘中由操作系统决定,除非显示地调用了sync同步命令。
缓存I/O的优点:
- 在一定程度上分离了内核空间和用户空间,保护系统本身的运行安全;
- 可以减少读盘的次数,从而提高性能。
缓存I/O的缺点:
- 在缓存 I/O 机制中,DMA 方式可以将数据直接从磁盘读到页缓存中,或者将数据从页缓存直
接写回到磁盘上,而不能直接在应用程序地址空间和磁盘之间进行数据传输。数据在传输过程
中就需要在应用程序地址空间(用户空间)和缓存(内核空间)之间进行多次数据拷贝操作,
这些数据拷贝操作所带来的CPU以及内存开销是非常大的。
# 2.5.4.2 直接IO
直接IO就是应用程序直接访问磁盘数据,而不经过内核缓冲区,这样做的目的是减少一次从内核缓冲区到用户程序缓存的数据复制。比如说数据库管理系统这类应用,它们更倾向于选择它们自己的缓存机制,因为数据库管理系统往往比操作系统更了解数据库中存放的数据,数据库管理系统可以提供一种更加有效的缓存机制来提高数据库中数据的存取性能。
直接IO的缺点:如果访问的数据不在应用程序缓存中,那么每次数据都会直接从磁盘加载,这种直接加载会非常缓慢。通常直接IO与异步IO结合使用,会得到比较好的性能。
下图分析了写场景下的DirectIO和BufferIO:
# 2.5.5 内存映射文件(Mmap)
在LINUX中我们可以使用mmap用来在进程虚拟内存地址空间中分配地址空间,创建和物理内存的映射关系。
映射关系可以分为两种
- 文件映射 磁盘文件映射进程的虚拟地址空间,使用文件内容初始化物理内存。
- 匿名映射 初始化全为0的内存空间。
而对于映射关系是否共享又分为
- 私有映射(MAP_PRIVATE) 多进程间数据共享,修改不反应到磁盘实际文件,是一个copy-on-write(写时复制)的映射方式。
- 共享映射(MAP_SHARED) 多进程间数据共享,修改反应到磁盘实际文件中。
因此总结起来有4种组合
- 私有文件映射 多个进程使用同样的物理内存页进行初始化,但是各个进程对内存文件的修改不会共享,也不会反应到物理文件中
- 私有匿名映射 mmap会创建一个新的映射,各个进程不共享,这种使用主要用于分配内存(malloc分配大内存会调用mmap)。 例如开辟新进程时,会为每个进程分配虚拟的地址空间,这些虚拟地址映射的物理内存空间各个进程间读的时候共享,写的时候会copy-on-write。
- 共享文件映射 多个进程通过虚拟内存技术共享同样的物理内存空间,对内存文件 的修改会反应到实际物理文件中,他也是进程间通信(IPC)的一种机制。
- 共享匿名映射 这种机制在进行fork的时候不会采用写时复制,父子进程完全共享同样的物理内存页,这也就实现了父子进程通信(IPC).
mmap只是在虚拟内存分配了地址空间,只有在第一次访问虚拟内存的时候才分配物理内存。
在mmap之后,并没有在将文件内容加载到物理页上,只上在虚拟内存中分配了地址空间。当进程在访问这段地址时,通过查找页表,发现虚拟内存对应的页没有在物理内存中缓存,则产生"缺页",由内核的缺页异常处理程序处理,将文件对应内容,以页为单位(4096)加载到物理内存,注意是只加载缺页,但也会受操作系统一些调度策略影响,加载的比所需的多。
# 2.5.6 直接内存读取并发送文件的过程
# 2.5.7 Mmap读取并发送文件的过程
零拷贝(zero copy)小结
- 虽然叫零拷贝,实际上sendfile有2次数据拷贝的。第1次是从磁盘拷贝到内核缓冲区,第二次是从内核缓冲区拷贝到网卡(协议引擎)。如果网卡支持 SG-DMA(The Scatter-Gather
Direct Memory Access)技术,就无需从PageCache拷贝至 Socket 缓冲区;
- 之所以叫零拷贝,是从内存角度来看的,数据在内存中没有发生过拷贝,只是在内存和I/O设
备之间传输。很多时候我们认为sendfile才是零拷贝,mmap严格来说不算;
- Linux中的API为sendfile、mmap,Java中的API为FileChanel.transferTo()、
FileChannel.map()等;
- Netty、Kafka(sendfile)、Rocketmq(mmap)、Nginx等高性能中间件中,都有大量利用操作系统零拷贝特性。
# 2.6 同步复制和异步复制
如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。
# 1)同步复制
同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态;
在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
# 2)异步复制
异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失;
# 3)配置
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。
/opt/rocket/conf/broker.conf 文件:Broker的配置文件
参数名 | 默认值 | 说明 |
---|---|---|
listenPort | 10911 | 接受客户端连接的监听端口 |
namesrvAddr | null | nameServer 地址 |
brokerIP1 | 网卡的 InetAddress | 当前 broker 监听的 IP |
brokerIP2 | 跟 brokerIP1 一样 | 存在主从 broker 时,如果在 broker主节点上配置了 brokerIP2 属性,broker 从节点会连接主节点配置的brokerIP2 进行同步 |
brokerName | null | broker 的名称 |
brokerClusterName | DefaultCluster | 本 broker 所属的 Cluser 名称 |
brokerId | 0 | broker id, 0 表示 master, 其他的正整数表示 slave |
storePathCommitLog | $HOME/store/commitlog/ | 存储 commit log 的路径 |
storePathConsumerQueue | $HOME/store/consumequeue/ | 存储 consume queue 的路径 |
mapedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |
deleteWhen | 04 | 在每天的什么时间删除已经超过文件保留时间的 commit log |
fileReserverdTime | 72 | 以小时计算的文件保留时间 |
brokerRole | ASYNC_MASTER | SYNC_MASTER或者ASYNC_MASTER或者SLAVE SYNC_MASTER表示当前broker是一个同步复制的Master ASYNC_MASTER表示当前broker是一个异步复制的Master SLAVE表示当前borker是一个Slave。 |
flushDiskType | ASYNC_FLUSH | SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保证在收到确认生产者之前将消息刷盘。 ASYNC_FLUSH 模式下的 broker 则利用刷盘一组消息的模式,可以取得更好的性能。 |
# 4)总结
实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH方式,由于频繁地触发磁盘写动作,会明显降低性能。通常情况下,应该把Master和Save配置成ASYNC_FLUSH的刷盘 方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。
# 2.7 高可用机制
RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的。
Master和Slave的区别:
- 在Broker的配置文件中,参数brokerId的值为0表明这个Broker是Master,
- 大于0表明这个Broker是Slave,
- brokerRole参数也说明这个Broker是Master还是Slave。
(SYNC_MASTER/ASYNC_MASTER/SALVE)
- Master角色的Broker支持读和写,Slave角色的Broker仅支持读。
- Consumer可以连接Master角色的Broker,也可以连接Slave角色的Broker来读取消息。
# 2.7.1 消息消费高可用
在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。
有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。
这就达到了消费端的高可用性。
# 2.7.2 消息发送高可用
如何达到发送端的高可用性呢?
在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同brokerId的机器组成一个Broker组),这样既可以在性能方面具有扩展性,也可以降低主节点故障对整体上带来的影响,而且当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息的。
RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足,需要把Slave转成Master。
- 手动停止Slave角色的Broker。
- 更改配置文件。
- 用新的配置文件启动Broker。
#
这种早期方式在大多数场景下都可以很好的工作,但也面临一些问题。
比如,在需要保证消息严格顺序的场景下,由于在主题层面无法保证严格顺序,所以必须指定队列来发送消息,对于任何一个队列,它一定是落在一组特定的主从节点上,如果这个主节点宕机,其他的主节点是无法替代这个主节点的,否则就无法保证严格顺序。
在这种复制模式下,严格顺序和高可用只能选择一个。
RocketMQ 在 2018 年底迎来了一次重大的更新,引入 Dledger,增加了一种全新的复制方式。
RocketMQ 引入 Dledger,使用新的复制方式,可以很好地解决这个问题。
Dledger 在写入消息的时候,要求至少消息复制到半数以上的节点之后,才给客户端返回写入成功,并且它是支持通过选举来动态切换主节点的。
举例:
假如有3个节点,当主节点宕机的时候,2 个从节点会通过投票选出一个新的主节点来继续提供服务,相比主从的复制模式,解决了可用性的问题。
由于消息要至少复制到 2 个节点上才会返回写入成功,即使主节点宕机了,也至少有一个节点上的消息是和主节点一样的。
Dledger在选举时,总会把数据和主节点一样的从节点选为新的主节点,这样就保证了数据的一致性,既不会丢消息,还可以保证严格顺序。
存在问题:
当然,Dledger的复制方式也不是完美的,依然存在一些不足:
- 比如,选举过程中不能提供服务。
- 最少需要 3 个节点才能保证数据一致性,3 节点时,只能保证 1 个节点宕机时可用,如果 2个节点同时宕机,即使还有 1 个节点存活也无法提供服务,资源的利用率比较低。
- 另外,由于至少要复制到半数以上的节点才返回写入成功,性能上也不如主从异步复制的方式快。
# 2.8 刷盘机制
RocketMQ 的所有消息都是持久化的,先写入系统 PageCache,然后刷盘,可以保证内存与磁盘都有一份数据, 访问时,直接从内存读取。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。
# 2.8.1 同步刷盘
同步刷盘与异步刷盘的唯一区别是异步刷盘写完 PageCache直接返回,而同步刷盘需要等待刷盘完成才返回, 同步刷盘流程如下:
(1). 写入 PageCache后,线程等待,通知刷盘线程刷盘。
(2). 刷盘线程刷盘后,唤醒前端等待线程,可能是一批线程。
(3). 前端等待线程向用户返回成功
# 2.8.2 异步刷盘
在有 RAID 卡,SAS 15000 转磁盘测试顺序写文件,速度可以达到 300M 每秒左右,而线上的网卡一般都为千兆 网卡,写磁盘速度明显快于数据网络入口速度,那么是否可以做到写完内存就向用户返回,由后台线程刷盘呢?
- 由于磁盘速度大于网卡速度,那么刷盘的进度肯定可以跟上消息的写入速度。
- 万一由于此时系统压力过大,可能堆积消息,除了写入 IO,还有读取 IO,万一出现磁盘读取
落后情况, 会不会导致系统内存溢出,答案是否定的,原因如下:
- 写入消息到 PageCache时,如果内存不足,则尝试丢弃干净的 PAGE,腾出内存供新消息使用,策略是LRU 方式。
- 如果干净页不足,此时写入 PageCache会被阻塞,系统尝试刷盘部分数据,大约每次尝试 32个 PAGE , 来找出更多干净 PAGE。
综上,内存溢出的情况不会出现。
# 2.9 负载均衡
RocketMQ中的负载均衡都在Client端完成,具体来说的话,主要可以分为Producer端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡。
# 2.9.1 Producer的负载均衡
如图所示,5 个队列可以部署在一台机器上,也可以分别部署在 5 台不同的机器上,发送消息通过轮询队列的方式 发送,每个队列接收平均的消息量。通过增加机器,可以水平扩展队列容量。 另外也可以自定义方式选择发往哪个队列。
# 创建主题
[root@node1 ~]# mqadmin updateTopic -n localhost:9876 -t tp_demo_02 -w 6 -b
localhost:10911
DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");
producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = new Message();
message.setTopic("tp_demo_02");
message.setBody("hello lagou".getBytes());
// 指定MQ
SendResult result = producer.send(message,
new MessageQueue("tp_demo_06", "node1", 5),
1_000
);
System.out.println(result.getSendStatus());
producer.shutdown();
# 2.9.2 Consumer的负载均衡
如图所示,如果有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二
consumer 消费 2 个队列。 这样即可达到平均消费的目的,可以水平扩展 Consumer 来提高消费能力。但是 Consumer 数量要小于等于队列数 量,如果 Consumer 超过队列数量,那么多余的Consumer 将不能消费消息 。
在RocketMQ中,Consumer端的两种消费模式(Push/Pull)底层都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。
如果未拉取到消息,则延迟一下又继续拉取。
在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端在知道从Broker端的哪一个消息队列中去获取消息。
因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个
ConsumerGroup中的哪些Consumer消费。
要做负载均衡,必须知道一些全局信息,也就是一个ConsumerGroup里到底有多少个
Consumer。
知道了全局信息,才可以根据某种算法来分配,比如简单地平均分到各个Consumer。
在RocketMQ中,负载均衡或者消息分配是在Consumer端代码中完成的,Consumer从Broker处获得全局信息,然后自己做负载均衡,只处理分给自己的那部分消息。
Pull Consumer可以看到所有的Message Queue,而且从哪个Message Queue读取消息,读消息时的Offset都由使用者控制,使用者可以实现任何特殊方式的负载均衡。
DefaultMQPullConsumer有两个辅助方法可以帮助实现负载均衡,一个是
registerMessageQueueListener函数,一个是MQPullConsumerScheduleService(使用这个Class类似使用DefaultMQPushConsumer,但是它把Pull消息的主动性留给了使用者)
public class MyConsumer {
public static void main(String[] args) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
DefaultMQPullConsumer consumer = new
DefaultMQPullConsumer("consumer_pull_grp_01");
consumer.setNamesrvAddr("node1:9876");
consumer.start();
Set<MessageQueue> messageQueues =
consumer.fetchSubscribeMessageQueues("tp_demo_01");
for (MessageQueue messageQueue : messageQueues) {
// 指定从哪个MQ拉取数据
PullResult result = consumer.pull(messageQueue, "*", 0L, 10);
List<MessageExt> msgFoundList = result.getMsgFoundList();
for (MessageExt messageExt : msgFoundList) {
System.out.println(messageExt);
}
}
consumer.shutdown();
}
}
DefaultMQPushConsumer的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个
DefaultMQPushConsumer启动后,会马上会触发一个doRebalance动作;而且在同一个
ConsumerGroup里加入新的DefaultMQPush-Consumer时,各个Consumer都会被触发
doRebalance动作。
负载均衡的分配粒度只到Message Queue,把Topic下的所有Message Queue分配到不同的
Consumer中
如下图所示,具体的负载均衡算法有几种,默认用的是AllocateMessageQueueAveragely。
我们可以设置负载均衡的算法:
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer_push_grp_01");
consumer.setNamesrvAddr("node1:9876");
// 设置负载均衡算法
consumer.setAllocateMessageQueueStrategy(new
AllocateMessageQueueAveragely());
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// todo 处理接收到的消息
return null;
}
});
consumer.start();
以AllocateMessageQueueAveragely策略为例,如果创建Topic的时候,把Message Queue数设为3,当Consumer数量为2的时候,有一个Consumer需要处理Topic三分之二的消息,另一个处理三分之一的消息;当Consumer数量为4的时候,有一个Consumer无法收到消息,其他3个Consumer各处理Topic三分之一的消息。
可见Message Queue数量设置过小不利于做负载均衡,通常情况下,应把一个Topic的Message Queue数设置为16。
1、Consumer端的心跳包发送
在Consumer启动后,它就会通过定时任务不断地向RocketMQ集群中的所有Broker实例发送心跳包(其中包含了消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable中,为之后做Consumer端的负载均衡提供可以依据的元数据信息。
2、Consumer端实现负载均衡的核心类—RebalanceImpl
在Consumer实例的启动流程中启动MQClientInstance实例的部分,会完成负载均衡服务线程—RebalanceService的启动(每隔20s执行一次)。
通过查看源码可以发现,RebalanceService线程的run()方法最终调用的是RebalanceImpl类的
rebalanceByTopic()方法,该方法是实现Consumer端负载均衡的核心。
这里,rebalanceByTopic()方法根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。
对于集群模式:
case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic,
consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.",
consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed",
consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 对MQ进行排序
Collections.sort(mqAll);
// 对消费者ID进行排序
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy =
this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 计算当前消费者应该分配的MQ集合
allocateResult = strategy.allocate(
// 当前消费者所属的消费组
this.consumerGroup,
// 当前消费者ID
this.mQClientFactory.getClientId(),
// MQ集合
mqAll,
// 消费组中消费者ID集合
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception.
allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic,
allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed.
allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={},
mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet=
{}",
strategy.getName(), consumerGroup, topic,
this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
默认的负载均衡算法:
AllocateMessageQueueAveragely是默认的MQ分配对象。
算法:
// 获取当前消费者在cidAll集合中的下标
int index = cidAll.indexOf(currentCID);
// mqAll对cidAll大小取模
int mod = mqAll.size() % cidAll.size();
// 计算每个消费者应该分配到的mq数量
// 如果mq个数小于等于消费者个数,每个消费者最多分配一个mq
// 如果mq个数大于消费者个数,
int averageSize =
// 如果mq个数小于等于消费组中消费者个数
mqAll.size() <= cidAll.size() ?
// 平均数就是1
1
:
// 否则,看mod和index大小
(
mod > 0 && index < mod ?
// 如果余数大于0并且当前消费者下标小于余数,则当前消费者应该消费平
均数个mq+1
mqAll.size() / cidAll.size() + 1
:
// 如果余数大于0并且当前消费者下标大于等于余数,则当前消费者应该消
费平均数个mq
mqAll.size() / cidAll.size()
);
// 计算当前消费者消费mq的起始位置
int startIndex = (mod > 0 && index < mod)
?
index * averageSize
:
index * averageSize + mod;
// 计算当前消费者消费mq的跨度,即当前消费者分几个MQ
int range = Math.min(averageSize, mqAll.size() - startIndex);
// 分配MQ,放到result集合中返回
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
消息消费队列在同一消费组不同消费者之间的负载均衡,其核心设计理念是在一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列。
# 2.10 消息重试
# 2.10.1 顺序消息的重试
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer_grp_04_01");
consumer.setNamesrvAddr("node1:9876");
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
// 消息订阅
consumer.subscribe("tp_demo_04", "*");
// 并发消费
// consumer.setMessageListener(new MessageListenerConcurrently() {
// @Override
// public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs, ConsumeConcurrentlyContext context) {
// return null;
// }
// });
// 顺序消费
consumer.setMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg.getMsgId() + "\t" + msg.getQueueId() +
"\t" + new String(msg.getBody()));
}
return null;
}
});
consumer.start();
# 2.10.2 无序消息的重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
1)重试次数
消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:
如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。
2)配置方式
消费失败后,重试配置方式
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置
(三种方式任选一种):
- 返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; (推荐)
- 返回 Null
- 抛出异常
public class MyConcurrentlyMessageListener implements
MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
//处理消息
doConsumeMessage(msgs);
//方式1:返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,消息将重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
//方式2:返回 null,消息将重试
return null;
//方式3:直接抛出异常, 消息将重试
throw new RuntimeException("Consumer Message exceotion");
}
}
消费失败后,不重试配置方式
集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS,此后这条消息将不会再重试。
public class MyConcurrentlyMessageListener implements
MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
doConsumeMessage(msgs);
} catch (Throwable e) {
//捕获消费逻辑中的所有异常,并返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//消息处理正常,直接返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
自定义消息最大重试次数
消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:
- 最大重试次数小于等于 16 次,则重试时间间隔同上表描述。
- 最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer_grp_04_01");
// 设置重新消费的次数
// 共16个级别,大于16的一律按照2小时重试
consumer.setMaxReconsumeTimes(20);
注意:
- 消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。
- 如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了
MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。
- 配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置
获取消息重试次数
消费者收到消息后,可按照如下方式获取消息的重试次数:
public class MyConcurrentlyMessageListener implements
MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg.getReconsumeTimes());
}
doConsumeMessage(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
# 2.11 死信队列
RocketMQ中消息重试超过一定次数后(默认16次)就会被放到死信队列中,在消息队列
RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。可以在控制台Topic列表中看到“DLQ”相关的
Topic,默认命名是:
%RETRY%消费组名称(重试Topic)
%DLQ%消费组名称(死信Topic)
死信队列也可以被订阅和消费,并且也会过期
可视化工具:rocketmq-console下载地址:
https://github.com/apache/rocketmq-externals/archive/rocketmq-console-1.0.0.zip
使用jdk8:
# 编译打包
mvn clean package -DskipTests
# 运行工具
java -jar target/rocketmq-console-ng-1.0.0.jar
页面设置NameSrv地址即可。如果不生效,就直接修改项目的application.properties中的
namesrv地址选项的值。
# 2.11.1 死信特性
死信消息具有以下特性
- 不会再被消费者正常消费。
- 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3天内及时处理。
死信队列具有以下特性:
- 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
- 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队- 列。
- 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
# 2.11.2 查看死信信息
1.在控制台查询出现死信队列的主题信息
2.在消息界面根据主题查询死信消息
3.选择重新发送消息
一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。
# 2.12 延迟消息
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的 topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:
msg.setDelayLevel(level)。level有以下三种情况:
level == 0
,消息为非延迟消息1<=level<=maxLevel
,消息延迟特定时间,例如level==1
,延迟1slevel > maxLevel
,则level== maxLevel
,例如level==20
,延迟2h
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
查看SCHEDULE_TOPIC_XXXX主题信息:
生产者:
public class MyProducer {
public static void main(String[] args) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new
DefaultMQProducer("producer_grp_06_01");
producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = null;
for (int i = 0; i < 20; i++) {
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
message = new Message("tp_demo_06", ("hello lagou - " +
i).getBytes());
// 设置延迟级别,0表示不延迟,大于18的总是延迟2h
message.setDelayTimeLevel(i);
producer.send(message);
}
producer.shutdown();
}
}
消费者:
public class MyConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer_grp_06_01");
consumer.setNamesrvAddr("node1:9876");
consumer.subscribe("tp_demo_06", "*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(System.currentTimeMillis() / 1000);
for (MessageExt msg : msgs) {
System.out.println(
msg.getTopic() + "\t"
+ msg.getQueueId() + "\t"
+ msg.getMsgId() + "\t"
+ msg.getDelayTimeLevel() + "\t"
+ new String(msg.getBody())
);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
# 2.13 顺序消息
顺序消息是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序。比如订单的生成、付款、发货,这3个消息必须按顺序处理才行。
顺序消息分为全局顺序消息和部分顺序消息:
- 全局顺序消息指某个Topic下的所有消息都要保证顺序;
- 部分顺序消息只要保证每一组消息被顺序消费即可,比如上面订单消息的例子,只要保证同一个订单ID的三个消息能按顺序消费即可。
在多数的业务场景中实际上只需要局部有序就可以了。
RocketMQ在默认情况下不保证顺序,比如创建一个Topic,默认八个写队列,八个读队列。这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个Consumer,每个
Consumer也可能启动多个线程并行处理,所以消息被哪个Consumer消费,被消费的顺序和写入的顺序是否一致是不确定的。
要保证全局顺序消息,需要先把Topic的读写队列数设置为一,然后Producer和Consumer的并发设置也要是一。简单来说,为了保证整个Topic的全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理。
原理如上图所示:
要保证部分消息有序,需要发送端和消费端配合处理。在发送端,要做到把同一业务ID的消息发送到同一个Message Queue;在消费过程中,要做到从同一个Message Queue读取的消息不被并发处理,这样才能达到部分有序。消费端通过使用MessageListenerOrderly类来解决单Message Queue的消息被并发处理的问题。
Consumer使用MessageListenerOrderly的时候,下面四个Consumer的设置依旧可以使用:
- setConsumeThreadMin
- setConsumeThreadMax
- setPullBatchSize
- setConsumeMessageBatchMaxSize。
前两个参数设置Consumer的线程数;
PullBatchSize指的是一次从Broker的一个Message Queue获取消息的最大数量,默认值是32;
ConsumeMessageBatchMaxSize指的是这个Consumer的Executor(也就是调用
MessageListener处理的地方)一次传入的消息数(List<MessageExt>msgs
这个链表的最大长度),默认值是1。
上述四个参数可以使用,说明MessageListenerOrderly并不是简单地禁止并发处理。在
MessageListenerOrderly的实现中,为每个Consumer Queue加个锁,消费每个消息前,需要先获得这个消息对应的Consumer Queue所对应的锁,这样保证了同一时间,同一个
Consumer Queue的消息不被并发消费,但不同Consumer Queue的消息可以并发处理。
部分有序:
顺序消息的生产和消费:
# 创建主题,8写8读
[root@node1 ~]# mqadmin updateTopic -b node1:10911 -n localhost:9876 -r 8 -t
tp_demo_07 -w 8
# 删除主题的操作:
[root@node1 ~]# mqadmin deleteTopic -c DefaultCluster deleteTopic -n
localhost:9876 -t tp_demo_07
# 主题描述
[root@node1 ~]# mqadmin topicStatus -n localhost:9876 -t tp_demo_07
OrderProducer.java
package com.lagou.rocket.demo.producer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.List;
public class OrderProducer {
public static void main(String[] args) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new
DefaultMQProducer("producer_grp_07_01");
producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = null;
List<MessageQueue> queues =
producer.fetchPublishMessageQueues("tp_demo_07");
System.err.println(queues.size());
MessageQueue queue = null;
for (int i = 0; i < 100; i++) {
queue = queues.get(i % 8);
message = new Message("tp_demo_07", ("hello lagou - order
create" + i).getBytes());
producer.send(message, queue);
message = new Message("tp_demo_07", ("hello lagou - order
payed" + i).getBytes());
producer.send(message, queue);
message = new Message("tp_demo_07", ("hello lagou - order ship"
+ i).getBytes());
producer.send(message, queue);
}
producer.shutdown();
}
}
OrderConsumer.java
package com.lagou.rocket.demo.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.List;
import java.util.Set;
public class OrderConsumer {
public static void main(String[] args) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
DefaultMQPullConsumer consumer = new
DefaultMQPullConsumer("consumer_grp_07_01");
consumer.setNamesrvAddr("node1:9876");
consumer.start();
Set<MessageQueue> messageQueues =
consumer.fetchSubscribeMessageQueues("tp_demo_07");
System.err.println(messageQueues.size());
for (MessageQueue messageQueue : messageQueues) {
long nextBeginOffset = 0;
System.out.println("===============================");
do {
PullResult pullResult = consumer.pull(messageQueue, "*",
nextBeginOffset, 1);
if (pullResult == null || pullResult.getMsgFoundList() ==
null) break;
nextBeginOffset = pullResult.getNextBeginOffset();
List<MessageExt> msgFoundList =
pullResult.getMsgFoundList();
System.out.println(messageQueue.getQueueId() + "\t" +
msgFoundList.size());
for (MessageExt messageExt : msgFoundList) {
System.out.println(
messageExt.getTopic() + "\t" +
messageExt.getQueueId() + "\t" +
messageExt.getMsgId() + "\t" +
new String(messageExt.getBody())
);
}
} while (true);
}
consumer.shutdown();
}
}
全局有序:
顺序消息的生产和消费:
# 创建主题,8写8读
[root@node1 ~]# mqadmin updateTopic -b node1:10911 -n localhost:9876 -r 1 -t
tp_demo_07_01 -w 1
# 删除主题的操作:
[root@node1 ~]# mqadmin deleteTopic -c DefaultCluster deleteTopic -n
localhost:9876 -t tp_demo_07_01
# 主题描述
[root@node1 ~]# mqadmin topicStatus -n localhost:9876 -t tp_demo_07_01
GlobalOrderProduer.java
package com.lagou.rocket.demo.producer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class GlobalOrderProducer {
public static void main(String[] args) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new
DefaultMQProducer("producer_grp_07_02");
producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = null;
for (int i = 0; i < 100; i++) {
message = new Message("tp_demo_07_01", ("hello lagou" +
i).getBytes());
producer.send(message);
}
producer.shutdown();
}
}
GlobalOrderConsumer.java
package com.lagou.rocket.demo.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class GlobalOrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer_grp_07_03");
consumer.setNamesrvAddr("node1:9876");
consumer.subscribe("tp_demo_07_01", "*");
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
consumer.setPullBatchSize(1);
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt>
msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
# 2.14 事务消息
RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账,A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银行账户扣除一万元”这个操作同时成功或者同时失败。
RocketMQ采用两阶段提交的方式实现事务消息,TransactionMQProducer处理上面情况的流程是,先发一个“准备从B银行账户增加一万元”的消息,发送成功后做从A银行账户扣除一万元的操作,根据操作结果是否成功,确定之前的“准备从B银行账户增加一万元”的消息是做commit还是rollback,具体流程如下:
- 1)发送方向RocketMQ发送“待确认”消息。
- 2)RocketMQ将收到的“待确认”消息持久化成功后,向发送方回复消息已经发送成功,此时第一阶段消息发送完成。
- 3)发送方开始执行本地事件逻辑。
- 4)发送方根据本地事件执行结果向RocketMQ发送二次确认(Commit或是Rollback)消息,RocketMQ收到Commit状态则将第一阶段消息标记为可投递,订阅方将能够收到该消息;收到Rollback状态则删除第一阶段的消息,订阅方接收不到该消息。
- 5)如果出现异常情况,步骤4)提交的二次确认最终未到达RocketMQ,服务器在经过固定时间段后将对“待确认”消息发起回查请求。
- 6)发送方收到消息回查请求后(如果发送一阶段消息的Producer不能工作,回查请求将被发送到和Producer在同一个Group里的其他Producer),通过检查对应消息的本地事件执行结果返回Commit或Roolback状态。
- 7)RocketMQ收到回查请求后,按照步骤4)的逻辑处理。
上面的逻辑似乎很好地实现了事务消息功能,它也是RocketMQ之前的版本实现事务消息的逻辑。
但是因为RocketMQ依赖将数据顺序写到磁盘这个特征来提高性能,步骤4)却需要更改第一阶段消息的状态,这样会造成磁盘Catch的脏页过多,降低系统的性能。所以RocketMQ在4.x的版本中将这部分功能去除。系统中的一些上层Class都还在,用户可以根据实际需求实现自己的事务功能。
客户端有三个类来支持用户实现事务消息,第一个类是LocalTransaction-Executer,用来实例化步骤3)的逻辑,根据情况返回LocalTransactionState.ROLLBACK_MESSAGE或者
LocalTransactionState.COMMIT_MESSAGE状态。第二个类是TransactionMQProducer,它的用法和DefaultMQProducer类似,要通过它启动一个Producer并发消息,但是比DefaultMQProducer多设置本地事务处理函数和回查状态函数。第三个类是TransactionCheckListener,实现步骤5)中MQ服务器的回查请求,返回LocalTransactionState.ROLLBACK_MESSAGE或者
或者LocalTransactionState.COMMIT_MESSAGE
# 2.14.1 RocketMQ事务消息流程概要
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
# 1.事务消息发送及提交:
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
# 2.补偿流程:
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
# 2.12.2 RocketMQ事务消息设计
# 1.事务消息在一阶段对用户不可见
在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。其中,事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息。然后二阶段会显示执行提交或者回滚half消息(逻辑删除)。当然,为了防止二阶段操作失败,RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。
在RocketMQ中,消息在服务端的存储结构如下,每条消息都会有对应的索引信息,Consumer通过ConsumeQueue这个二级索引来读取消息实体内容,其流程如下:
RocketMQ的具体实现策略是:写入的如果事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费。其实改变消息主题是RocketMQ的常用“套路”,回想一下延时消息的实现机制。RMQ_SYS_TRANS_HALF_TOPIC
# 2.Commit和Rollback操作以及Op消息的引入
在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。
# 3.Op消息的存储和对应关系
RocketMQ将Op消息写入到全局一个特定的Topic中通过源码中的方法—
TransactionalMessageUtil.buildOpTopic();这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。
# 4.Half消息的索引构建
在执行二阶段Commit操作时,需要构建出Half消息的索引。一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。
# 5.如何处理二阶段失败的消息?
如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。
Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。
值得注意的是,rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。
事务消息:
TxProducer.java
package com.lagou.rocket.demo.producer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class TxProducer {
public static void main(String[] args) throws MQClientException {
TransactionListener listener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message
msg, Object arg) {
// 当发送事务消息prepare(half)成功后,调用该方法执行本地事务
System.out.println("执行本地事务,参数为:" + arg);
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// return LocalTransactionState.ROLLBACK_MESSAGE;
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt
msg) {
// 如果没有收到生产者发送的Half Message的响应,broker发送请求到生产
者回查生产者本地事务的状态
// 该方法用于获取本地事务执行的状态。
System.out.println("检查本地事务的状态:" + msg);
return LocalTransactionState.COMMIT_MESSAGE;
// return LocalTransactionState.ROLLBACK_MESSAGE;
}
};
TransactionMQProducer producer = new
TransactionMQProducer("tx_producer_grp_08");
producer.setTransactionListener(listener);
producer.setNamesrvAddr("node1:9876");
producer.start();
Message message = null;
message = new Message("tp_demo_08", "hello lagou - tx".getBytes());
producer.sendMessageInTransaction(message, "
{\"name\":\"zhangsan\"}");
}
}
TxConsumer.java
package com.lagou.rocket.demo.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TxConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("txconsumer_grp_08_01");
consumer.setNamesrvAddr("node1:9876");
consumer.subscribe("tp_demo_08", "*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
# 2.15 消息查询
区别于消息消费:先尝后买
尝就是消息查询
买:消息的消费
RocketMQ支持按照下面两种维度(“按照Message Id查询消息”、“按照Message Key查询消息”)进行消息查询。
# 2.15.1 按照MessageId查询消息
MsgId 总共 16 字节,包含消息存储主机地址(ip/port),消息 Commit Log offset。从 MsgId 中解析出 Broker 的地址和 Commit Log 的偏移地址,然后按照存储格式所在位置将消息 buffer 解析成一个完整的消息。
在RocketMQ中具体做法是:Client端从MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封装成一个RPC请求后,通过Remoting通信层发送(业务请求码:
VIEW_MESSAGE_BY_ID)。Broker使用QueryMessageProcessor,使用请求中的 commitLog offset 和 size 去 commitLog 中找到真正的记录并解析成一个完整的消息返回。
# 2.15.2 按照Message Key查询消息
“按照Message Key查询消息”,主要是基于RocketMQ的IndexFile索引文件来实现的。RocketMQ的索引文件逻辑结构,类似JDK中HashMap的实现。索引文件的具体结构如下:
- 1.根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置(slotNum 是一个索引文件里面包含的最大槽的数目, 例如图中所示 slotNum=5000000)。
- 2.根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 总是指向最新的一个索引项)。
- 3.遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录)
- 4.Hash 冲突;
第一种,key 的 hash 值不同但模数相同,此时查询的时候会再比较一次 key 的 hash 值(每个索引项保存了 key 的 hash 值),过滤掉 hash 值不相等的项。
第二种,hash 值相等但 key 不等, 出于性能的考虑冲突的检测放到客户端处理(key 的原始值是存储在消息文件中的,避免对数据文件的解析), 客户端比较一次消息体的 key 是否相同。
- 5.存储;为了节省空间索引项中存储的时间是时间差值(存储时间-开始时间,开始时间存储在索引文件头中), 整个索引文件是定长的,结构也是固定的。
API的使用:
package com.lagou.rocket.demo.query;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class QueryingMessageDemo {
public static void main(String[] args) throws InterruptedException,
RemotingException, MQClientException, MQBrokerException {
DefaultMQPullConsumer consumer = new
DefaultMQPullConsumer("consumer_grp_09_01");
consumer.setNamesrvAddr("node1:9876");
consumer.start();
MessageExt message = consumer.viewMessage("tp_demo_08",
"0A4E00A7178878308DB150A780BB0000");
System.out.println(message);
System.out.println(message.getMsgId());
consumer.shutdown();
}
}
# 2.16 消息优先级
有些场景,需要应用程序处理几种类型的消息,不同消息的优先级不同。RocketMQ是个先入先出的队列,不支持消息级别或者Topic级别的优先级。业务中简单的优先级需求,可以通过间接的方式解决,下面列举三种优先级相关需求的具体处理方法。
# 第一种
多个不同的消息类型使用同一个topic时,由于某一个种消息流量非常大,导致其他类型的消息无法及时消费,造成不公平,所以把流量大的类型消息在一个单独的 Topic,其他类型消息在另外一个Topic,应用程序创建两个 Consumer,分别订阅不同的 Topic,这样就可以了。
# 第二种
情况和第一种情况类似,但是不用创建大量的 Topic。举个实际应用场景: 一个订单处理系统,接收从 100家快递门店过来的请求,把这些请求通过 Producer 写入RocketMQ;订单处理程序通过Consumer 从队列里读取消 息并处理,每天最多处理 1 万单 。 如果这 100 个快递门店中某几个门店订单量 大增,比如门店一接了个大客户,一个上午就发出 2万单消息请求,这样其他 的 99 家门店可能被迫等待门店一的 2 万单处理完,也就是两天后订单才能被处 理,显然很不公平 。
这时可以创建 一 个 Topic, 设置 Topic 的 MessageQueue 数 量 超过 100 个,Producer根据订单的门店号,把每个门店的订单写人 一 个 MessageQueue。
DefaultMQPushConsumer默认是采用循环的方式逐个读取一个 Topic 的所有 MessageQueue,这样如果某家门店订单量大增,这家门店对应的 MessageQueue 消息数增多,等待时间增长,但不会造成其他家门店等待时间增长。
DefaultMQPushConsumer 默认的 pullBatchSize 是 32,也就是每次从某个 MessageQueue 读取消息的时候,最多可以读 32 个 。 在上面的场景中,为了更 加公平,可以把 pullBatchSize 设置成1。
# 第三种
强制优先级
TypeA、 TypeB、 TypeC 三类消息 。 TypeA 处于第一优先级,要确保只要有TypeA消息,必须优先处理; TypeB处于第二优先 级; TypeC 处于第三优先级 。 对这种要求,或者逻辑更复杂的要求,就要用 户自己编码实现优先级控制,如果上述的 三 类消息在一个 Topic 里,可以使 用 PullConsumer,自主控制 MessageQueue 的遍历,以及消息的读取;如果上述三类消息在三个 Topic下,需要启动三个Consumer, 实现逻辑控制三个 Consumer 的消费 。
# 2.17 底层网络通信 - Netty高性能之道
RocketMQ底层通信的实现是在Remoting模块里,因为借助了Netty而没有重复造轮子,
RocketMQ的通信部分没有很多的代码,就是用Netty实现了一个自定义协议的客户端/服务器程序。
- 自定义ByteBuf可以从底层解决ByteBuffer的一些问题,并且通过“内存池”的设计来提升性能
- Reactor主从多线程模型
- 充分利用了零拷贝,CAS/volatite高效并发编程特性
- 无锁串行化设计
- 管道责任链的编程模型
- 高性能序列化框架的支持
- 灵活配置TCP协议参数
RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:
(1) Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向
NameServer上报Topic路由信息。
(2) 消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的
TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。
(3) 消息生产者Producer根据2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者接收消息并落盘存储。
(4) 消息消费者Consumer根据2)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。
从上面1)~3)中可以看出在消息生产者, Broker和NameServer之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。
rocketmq-remoting 模块是 RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块。
RocketMQ中惯用的套路:
请求报文和响应都使用RemotingCommand,然后在Processor处理器中根据RequestCode请求码来匹配对应的处理方法。
处理器通常继承至NettyRequestProcessor,使用前需要先注册才行,注册方式
remotingServer.registerDefaultProcessor
网络通信核心的东西无非是:
线程模型
私有协议定义
编解码器
序列化/反序列化
…
既然是基于Netty的网络通信,当然少不了一堆自定义实现的Handler,
例如继承至:SimpleChannelInboundHandler ChannelDuplexHandler
# 2.17.1 Remoting通信类结构
# 2.17.2 协议设计与编解码
在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。
可见传输内容主要可以分为以下4部分:
(1) 消息长度:总长度,四个字节存储,占用一个int类型;
(2) 序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
(3) 消息头数据:经过序列化后的消息头数据;
(4) 消息主体数据:消息主体的二进制字节数据内容;
# 2.17.3 消息的通信方式和流程
在RocketMQ消息队列中支持通信的方式主要有同步(sync)、异步(async)、单向(oneway) 三种。其中“单向”通信模式相对简单,一般用在发送心跳包场景下,无需关注其Response。这里,主要介绍RocketMQ的异步通信流程。
# 2.17.4 Reactor主从多线程模型
RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。
上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。
一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。
RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。
拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。
处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码code去
processorTable这个本地缓存变量中找到对应的 processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。
从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。
# 2.18 限流
RocketMQ消费端中我们可以:
- 设置最大消费线程数
- 每次拉取消息条数等
同时:
- PushConsumer会判断获取但还未处理的消息个数、消息总大小、Offset的跨度,
- 任何一个值超过设定的大小就隔一段时间再拉取消息,从而达到流量控制的目的。
在 Apache RocketMQ 中,当消费者去消费消息的时候,无论是通过 pull 的方式还是 push 的方式,都可能会出现大批量的消息突刺。如果此时要处理所有消息,很可能会导致系统负载过高,影响稳定性。但其实可能后面几秒之内都没有消息投递,若直接把多余的消息丢掉则没有充分利用系统处理消息的能力。我们希望可以把消息突刺均摊到一段时间内,让系统负载保持在消息处理水位之下的同时尽可能地处理更多消息,从而起到“削峰填谷”的效果:
上图中红色的部分代表超出消息处理能力的部分。我们可以看到消息突刺往往都是瞬时的、不规律的,其后一段时间系统往往都会有空闲资源。我们希望把红色的那部分消息平摊到后面空闲时去处理,这样既可以保证系统负载处在一个稳定的水位,又可以尽可能地处理更多消息。
# 2.18.1 Sentinel 介绍
Sentinel 是阿里中间件团队开源的,面向分布式服务架构的轻量级流量控制产品,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助用户保护服务的稳定性。
# 2.18.2 Sentinel原理
Sentinel 专门为这种场景提供了匀速器 (opens new window)的特性,可以把突然到来的大量请求以匀速的形式均摊,以固定的间隔时间让请求通过,以稳定的速度逐步处理这些请求,起到“削峰填谷”的效果,从而避免流量突刺造成系统负载过高。同时堆积的请求将会排队,逐步进行处理;当请求排队预计超过最大超时时长的时候则直接拒绝,而不是拒绝全部请求。
比如在 RocketMQ 的场景下配置了匀速模式下请求 QPS 为 5,则会每 200 ms 处理一条消息,多余的处理任务将排队;同时设置了超时时间为 5 s,预计排队时长超过 5s 的处理任务将会直接被拒绝。
示意图如下图所示:
RocketMQ 用户可以根据不同的 group 和不同的 topic 分别设置限流规则,限流控制模式设置为匀速器模式(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER),比如:
private void initFlowControlRule() {
FlowRule rule = new FlowRule();
rule.setResource(KEY); // 对应的 key 为 groupName:topicName
rule.setCount(5);
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule.setLimitApp("default");
// 匀速器模式下,设置了 QPS 为 5,则请求每 200 ms 允许通过 1 个
rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
// 如果更多的请求到达,这些请求会被置于虚拟的等待队列中。等待队列有一个 max timeout,
如果请求预计的等待时间超过这个时间会直接被 block
// 在这里,timeout 为 5s
rule.setMaxQueueingTimeMs(5 * 1000);
FlowRuleManager.loadRules(Collections.singletonList(rule));
}
参考:
https://github.com/alibaba/Sentinel/wiki/Sentinel-%E4%B8%BA-RocketMQ-
%E4%BF%9D%E9%A9%BE%E6%8A%A4%E8%88%AA
package com.lagou.rocket.demo.consumer;
import com.alibaba.csp.sentinel.Constants;
import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class PullDemo {
private static final String GROUP_NAME = "consumer_grp_13_05";
private static final String TOPIC_NAME = "tp_demo_13";
private static final String KEY = String.format("%s:%s", GROUP_NAME,
TOPIC_NAME);
private static final Map<MessageQueue, Long> OFFSET_TABLE = new
HashMap<MessageQueue, Long>();
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static final ExecutorService pool =
Executors.newFixedThreadPool(32);
private static final AtomicLong SUCCESS_COUNT = new AtomicLong(0);
private static final AtomicLong FAIL_COUNT = new AtomicLong(0);
public static void main(String[] args) throws MQClientException {
// 初始化哨兵的流控
initFlowControlRule();
DefaultMQPullConsumer consumer = new
DefaultMQPullConsumer(GROUP_NAME);
consumer.setNamesrvAddr("node1:9876");
consumer.start();
Set<MessageQueue> mqs =
consumer.fetchSubscribeMessageQueues(TOPIC_NAME);
for (MessageQueue mq : mqs) {
System.out.printf("Consuming messages from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
if (pullResult.getMsgFoundList() != null) {
for (MessageExt msg : pullResult.getMsgFoundList()) {
doSomething(msg);
}
}
long nextOffset = pullResult.getNextBeginOffset();
// 将每个mq对应的偏移量记录在本地HashMap中
putMessageQueueOffset(mq, nextOffset);
consumer.updateConsumeOffset(mq, nextOffset);
switch (pullResult.getPullStatus()) {
case NO_NEW_MSG:
break SINGLE_MQ;
case FOUND:
case NO_MATCHED_MSG:
case OFFSET_ILLEGAL:
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
/**
* 对每个收到的消息使用一个线程提交任务
* @param message
*/
private static void doSomething(MessageExt message) {
pool.submit(() -> {
Entry entry = null;
try {
ContextUtil.enter(KEY);
entry = SphU.entry(KEY, EntryType.OUT);
// 在这里处理业务逻辑,此处只是打印
System.out.printf("[%d][%s][Success: %d] Receive New Messages: %s %n", System.currentTimeMillis(),
Thread.currentThread().getName(),
SUCCESS_COUNT.addAndGet(1), new String(message.getBody()));
} catch (BlockException ex) {
// Blocked.
System.out.println("Blocked: " + FAIL_COUNT.addAndGet(1));
} finally {
if (entry != null) {
entry.exit();
}
ContextUtil.exit();
}
});
}
private static void initFlowControlRule() {
FlowRule rule = new FlowRule();
// 消费组名称:主题名称 字符串
rule.setResource(KEY);
// 根据QPS进行流控
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// 1表示QPS为1,请求间隔1000ms。
// 如果是5,则表示每秒5个消息,请求间隔200ms
rule.setCount(1);
rule.setLimitApp("default");
// 调用使用固定间隔。如果qps为1,则请求之间间隔为1s
rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
// 如果请求太多,就将这些请求放到等待队列中
// 该队列有超时时间。如果等待队列中请求超时,则丢弃
// 此处设置超时时间为5s
rule.setMaxQueueingTimeMs(5 * 1000);
FlowRuleManager.loadRules(Collections.singletonList(rule));
}
// 获取指定MQ的偏移量
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSET_TABLE.get(mq);
if (offset != null) {
return offset;
}
return 0;
}
// 在本地HashMap中记录偏移量
private static void putMessageQueueOffset(MessageQueue mq, long
offset) {
OFFSET_TABLE.put(mq, offset);
}
}