跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

    • RabbitMQ

    • RocketMQ

      • RocketMQ基础

      • RocketMQ高级

        • RocketMQ架构与实战
        • RocketMQ高级特性及原理
        • RocketMQ高级实战
        • RocketMQ集群与运维
          • 4.1.1 集群特点
          • 4.1.2 集群模式
            • 1)单Master模式
            • 2)多Master模式
            • 3)多Master多Slave模式(异步)
            • 4)多Master多Slave模式(同步)
          • 4.2.0 前置配置
          • 4.2.1单Master模式
            • 4.2.1.1 启动NameServer
            • 4.2.1.2 启动Broker
          • 4.2.2 多Master模式
            • 4.2.2.1启动NameServer
            • 4.2.2.2 启动Broker集群
          • 4.2.3 多Master和Slave模式-异步复制
            • 4.2.3.1 启动NameServer
            • 4.2.3.2 启动Broker集群
          • 4.2.4 多Master和Slave模式-同步双写
            • 4.2.4.1 启动NameServer
            • 4.2.4.2 启动Broker集群
          • 4.3.1 Topic相关
            • 4.3.1.1 命令列表
            • 4.3.1.2 具体操作
          • 4.3.2 集群相关
            • 4.3.2.1 命令列表
            • 4.3.2.2 具体操作
          • 4.3.3 Broker相关
            • 4.3.3.1 命令列表
            • 4.3.3.2 具体操作
          • 4.3.4 消息相关
            • 4.3.4.1 命令列表
            • 4.3.4.2 具体操作
          • 4.3.5 消费者、消费组相关
            • 4.3.5.1 命令列表
            • 4.3.5.2 具体操作
          • 4.3.6 连接相关
            • 4.3.6.1 命令列表
            • 4.3.6.2 具体操作
          • 4.3.7 NameServer相关
            • 4.3.7.1 命令列表
            • 4.3.7.2 具体操作
          • 4.3.8 其他
            • 4.3.8.1 命令列表
            • 4.3.8.2 具体操作
          • 4.4.1 RocketMQ的mqadmin命令报错问题
          • 4.4.2 RocketMQ生产端和消费端版本不一致导致不能正常消费的问题
          • 4.4.3 新增一个topic的消费组时,无法消费历史消息的问题**
          • 4.4.4 如何开启从Slave读数据功能
          • 4.4.5 性能调优问题
          • 4.4.6 在RocketMQ中msgId和offsetMsgId的含义与区别
        • RocketMQ源码剖析
  • Zookeeper

  • java组件
  • 消息队列
  • RocketMQ
  • RocketMQ高级
Revin
2023-07-23
目录

RocketMQ集群与运维

# 4.1 集群搭建方式

# 4.1.1 集群特点

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

    每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。

  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

# 4.1.2 集群模式

# 1)单Master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

# 2)多Master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:

  • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;

  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

# 3)多Master多Slave模式(异步)

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;

  • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。

# 4)多Master多Slave模式(同步)

每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;

  • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

# 4.2 集群的搭建

# 4.2.0 前置配置

  1. 安装JDK 11.0.5
  2. 修改RocketMQ的启动脚本:
bin/runserver.sh
bin/runbroker.sh
bin/tools.sh
1
2
3

bin/runserver.sh:

 #!/bin/sh
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements. See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License. You may obtain a copy of the License at
 #
 # http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.

 #===========================================================================================
 # Java Environment Setting #===========================================================================================
 error_exit ()
 {
 echo "ERROR: $1 !!"
 exit 1
 }

 [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
 [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
 [ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

 export JAVA_HOME
 export JAVA="$JAVA_HOME/bin/java"
 export BASE_DIR=$(dirname $0)/..
 export

CLASSPATH=.:${BASE_DIR}/conf:${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib/*
 #===========================================================================================
 # JVM Configuration
 #===========================================================================================
 JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=160m"
 JAVA_OPT="${JAVA_OPT} -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8"
 JAVA_OPT="${JAVA_OPT} -verbose:gc -Xlog:gc:/dev/shm/rmq_srv_gc.log -XX:+PrintGCDetails"
 JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
 JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
 # JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
 #JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
 JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
 JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

 $JAVA ${JAVA_OPT} $@
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

bin/runbroker.sh:

 #!/bin/sh
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements. See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License. You may obtain a copy of the License at
 #
 # http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.

 #===========================================================================================
 # Java Environment Setting
#===========================================================================================
 error_exit ()
 {
 echo "ERROR: $1 !!"
 exit 1
 }

 [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
 [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
 [ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

 export JAVA_HOME
 export JAVA="$JAVA_HOME/bin/java"
 export BASE_DIR=$(dirname $0)/..
 export

CLASSPATH=.${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}

 #===========================================================================================
 # JVM Configuration
 #===========================================================================================
 JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
 JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
 JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails"
 JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
 JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
 JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
 JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
 #JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
 JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
 JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

 numactl --interleave=all pwd > /dev/null 2>&1
 if [ $? -eq 0 ]
 then
 if [ -z "$RMQ_NUMA_NODE" ] ; then
 numactl --interleave=all $JAVA ${JAVA_OPT} $@
 else
 numactl --cpunodebind=$RMQ_NUMA_NODE --membind=$RMQ_NUMA_NODE $JAVA

${JAVA_OPT} $@
 fi
 else
 $JAVA ${JAVA_OPT} --add-exports=java.base/jdk.internal.ref=ALL-UNNAMED

$@
 fi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

bin/tools.sh:

 #!/bin/sh

 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements. See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License. You may obtain a copy of the License at
 #
 # http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.

 #===========================================================================================
 # Java Environment Setting
 #===========================================================================================
 error_exit ()
 {
 echo "ERROR: $1 !!"
 exit 1
 }

 [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
 [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
 [ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

 export JAVA_HOME
 export JAVA="$JAVA_HOME/bin/java"
 export BASE_DIR=$(dirname $0)/..
 # export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
 export CLASSPATH=.${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}
 #===========================================================================================
 # JVM Configuration
 #===========================================================================================
 JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m"
 # JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext"
 JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

 $JAVA ${JAVA_OPT} $@
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# 4.2.1单Master模式

这是最简单也是风险最大的模式,一旦broker重启或宕机,整个服务不可用。不推荐在生产环境使用,一般用于开发或本地测试。

步骤如下:

# 4.2.1.1 启动NameServer

### 首先启动NameServer
$ nohup sh mqnamesrv &

### 检查Name Server是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
1
2
3
4
5
6

只要在namesrv.log中看到“The Name Server boot success.. ”就表示NameServer启动成功了。

# 4.2.1.2 启动Broker

### 首先启动broker
$ nohup sh mqbroker -n localhost:9876 &

### 检查broker是否启动成功。如果看到broker的下面的语句表示启动成功
$ tail -f ~/logs/rocketmqlogs/broker.log
The broker[broker-a,192.169.1.2:10911] boot success...
1
2
3
4
5
6

如果在broker.log中看到“The broker[brokerName,ip:port] boot success..”就表示broker启动成功。

# 4.2.2 多Master模式

多Master模式意味着所有的节点都是Master节点,没有Slave节点。优缺点如下:

  • 优点:
  1. 配置简单。

  2. 一个服务器节点的停用或重启(维护目的)不会对应用造成大的影响。

  3. 如果磁盘配置了RAID10,就不会由消息的丢失。(因为RAID10非常可靠,即使服务器不可恢复,在异步刷盘时会丢失小量数据,同步刷盘不会丢数据)。

  4. 该模式性能最好。

  • 缺点:
  1. 在单个节点停用期间,消费者无法消费该节点上的数据,也不能订阅该节点上的数据,除非节点恢复。消息的实时性受到影响。

启动步骤如下:

# 4.2.2.1启动NameServer

### 首先启动NameServer 
$ nohup sh mqnamesrv &

### 检查NameServer是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success...
1
2
3
4
5

# 4.2.2.2 启动Broker集群

### 启动第一个broker,假定namesrv在192.168.1.1上。注意这里的配置文件的位置 
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m- noslave/broker-a.properties &

### 在第二台服务器上启动另一个broker。
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m- noslave/broker-b.properties &

...
1
2
3
4
5
6
7

上面的NameServer是一台,IP地址直接写,如果是多台NameServer,则需要在 -n 后接多个NameServer的地址,使用分号分开。由于shell对分号敏感,可以使用单引号引起来多个NameServer的地址,禁止shell对分号的解释。

# 4.2.3 多Master和Slave模式-异步复制

每个Master节点配置一个或多个Slave节点,组成一组,有多组这样的组合组成集群。HA使用异步复制,Master和Slave节点间有毫秒级的消息同步延迟。优缺点如下:

  • 优点:
  1. 磁盘坏掉,会有少量消息丢失,但是消息的实时性不会受到影响。

  2. 同时,如果Master宕机,消费者依然可以从Slave节点消费,并且这个转换是透明的,也无需运维手动介入。

  3. 性能和多Master模式差不多,弱一点点。

  • 缺点:
  1. 如果Master宕机,磁盘坏掉,会丢失少量消息。

启动步骤如下:

# 4.2.3.1 启动NameServer

### 首先启动NameServer
$ nohup sh mqnamesrv &

### 检查NameServer是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log

The Name Server boot success...
1
2
3
4
5
6
7

# 4.2.3.2 启动Broker集群

### 启动Master的broker:broker-a
 $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &

 ### 在另外一台服务器上启动另一个Master的broker:broker-b
 $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &

 ### 在另一台服务器上启动broker-a的Slave
 $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &

 ### 在另一台服务器上启动broker-b的Slave
 $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &
1
2
3
4
5
6
7
8
9
10
11

以上启动的是2M-2S-Async模式,双主双从,主从异步复制模式。

# 4.2.4 多Master和Slave模式-同步双写

该模式中,每个Master节点配置多个Slave节点,它们构成一组,多组构成集群。HA使用同步双写,即,只有消息在Master节点和多个Slave节点上写成功,才返回生产者消息发送成功。

ASYNC_MASTER
SYNC_MASTER
1
2

优缺点如下:

  • 优点:
  1. 数据和服务都没有单点故障
  2. 当Master主机宕机,消息的消费不会延迟。
  3. 服务和数据的高可用。
  • 缺点:
  1. 该模式的性能比异步复制的模式低10%左右。
  2. 发送消息的延迟稍微高一点。
  3. 当前版本中,如果Master节点宕机,Slave节点不能自动切换为Master模式。

启动步骤如下:

# 4.2.4.1 启动NameServer

### 首先启动NameServer
$ nohup sh mqnamesrv &

### 检查NameServer是否启动成功。
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
1
2
3
4
5
6

# 4.2.4.2 启动Broker集群

### 在一个节点启动broker-a(MASTER)
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &

### 在另一个节点启动broker-b(MASTER)
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &

### 在另一个节点启动broker-a的同步Slave节点:broker-a-s
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &

### 在另一个节点启动broker-b的同步Slave节点:broker-b-s
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &
1
2
3
4
5
6
7
8
9
10
11

上述配置中,通过相同的brokerName不同的brokerId将Master和Slave组合为一组。Master的brokerId必须是0,Slave的brokerId必须大于0,且不能相同。

# 4.3 mqadmin管理工具

注意:

  1. 执行命令方法: ./mqadmin {command} {args}

  2. 几乎所有命令都需要配置-n表示NameServer地址,格式为ip:port

  3. 几乎所有命令都可以通过-h获取帮助

  4. 如果既有Broker地址(-b)配置项又有clusterName(-c)配置项,则优先以Broker地址执行命令,如果不配置Broker地址,则对集群中所有主机执行命令,只支持一个Broker地址。-b 格式为ip:port,port默认是10911

  5. 在tools下可以看到很多命令,但并不是所有命令都能使用,只有在MQAdminStartup中初始化的命令才能使用,你也可以修改这个类,增加或自定义命令

  6. 由于版本更新问题,少部分命令可能未及时更新,遇到错误请直接阅读相关命令源码

# 4.3.1 Topic相关

启动集群:

[root@node1 ~]# nohup sh mqnamesrv &
[root@node1 ~]# nohup sh mqbroker -n node1:9876 -c /opt/rocketmq/conf/2m-2s-sync/broker-a.properties &
[root@node2 ~]# nohup sh mqbroker -n node1:9876 -c /opt/rocketmq/conf/2m-2s-sync/broker-a-s.properties &
[root@node3 ~]# nohup sh mqbroker -n node1:9876 -c /opt/rocketmq/conf/2m-2s-sync/broker-b.properties &
[root@node4 ~]# nohup sh mqbroker -n node1:9876 -c /opt/rocketmq/conf/2m-2s-sync/broker-b-s.properties &
1
2
3
4
5

# 4.3.1.1 命令列表

image-20230723210734058

image-20230723210800029

image-20230723210812481

# 4.3.1.2 具体操作

# 查看指定NameServer下的主题
[root@node4 ~]# mqadmin topicList -n node1:9876
# 查看指定NameServer,指定集群名称下的主题
[root@node4 ~]# mqadmin topicList -n node1:9876 -c DefaultCluster
# 创建主题,指定NameServer,指定Broker 指定主题名称,指定主题的写队列个数,指定读主题队列个数
[root@node4 ~]# mqadmin updateTopic -b node1:10911 -r 3 -w 3 -t tp_admin_01
# 描述主题,指定NameServer,指定主题名称
[root@node4 ~]# mqadmin topicStatus -t tp_admin_01 -n node1:9876
# 创建主题,指定NameServer,指定集群名称,指定主题名称,指定读主题队列个数,指定写主题队列个数
[root@node4 ~]# mqadmin updateTopic -c DefaultCluster -n node1:9876 -r 3 -w 3 -t tp_admin_02
# 查看指定主题的状态,指定NameServer地址,指定主题名称
[root@node4 ~]# mqadmin topicStatus -t tp_admin_02 -n node1:9876
#删除主题,指定NameServer地址,指定集群名称,指定主题名称
[root@node3 ~]# mqadmin deleteTopic -n node1:9876 -c DefaultCluster -t tp_admin_03
# 查看主题所在的集群,指定NameServer地址,指定主题名称。因为不同集群可以拥有同名的主题,并且不同集群可以注册到同一个NameServer
[root@node3 ~]# mqadmin topicClusterList -n node1:9876 -t tp_admin_02
# 计算消费的负载均衡,不同的-i列表,计算不同的消费平衡负载结果
[root@node3 ~]# mqadmin allocateMQ -n node1:9876 -t tp_admin_02 -i node1,node3
[root@node3 ~]# mqadmin allocateMQ -n node1:9876 -t tp_admin_02 -i node1,node2,node3,node4
# 打印Topic订阅关系、TPS、积累量、24h读写总量等信息
[root@node3 ~]# mqadmin statsAll -n node1:9876
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 4.3.2 集群相关

# 4.3.2.1 命令列表

image-20230723211211416

# 4.3.2.2 具体操作

# 查看集群信息,集群、BrokerName、BrokerId、TPS等信息
[root@node3 ~]# mqadmin clusterList -n node1:9876 -i 1 -m
# 检查集群中broker的延迟。
[root@node3 ~]# mqadmin clusterRT -a 5 -s 1048576 -c DefaultCluster -p true -i 2 -n node1:9876
1
2
3
4

# 4.3.3 Broker相关

# 4.3.3.1 命令列表

image-20230723211436270

image-20230723211457549

# 4.3.3.2 具体操作

# 查看broker状态
[root@node3 ~]# mqadmin brokerStatus -b node1:10911
[root@node3 ~]# mqadmin brokerStatus -n node1:9876 -b node2:10911
# 修改节点的配置,配置文件也会修改
[root@node3 ~]# mqadmin updateBrokerConfig -n node1:9876 -b node2:10911 -c DefaultCluster -k brokerRole -v ASYNC_MASTER -k brokerId -v 0 -k brokerName -v 'broker-c'
# 获取Broker配置
[root@node1 ~]# mqadmin getBrokerConfig -n node1:9876 -b node3:10911
# 清理Broker上不使用的Topic,从内存中释放Topic的Consume Queue,如果手动删除Topic会产生不使用的Topic
[root@node1 ~]# mqadmin cleanUnusedTopic -n node1:9876 -b node1:10911 -c DefaultCluster
# 向Broker发消息,返回发送状态和RT
[root@node3 ~]# mqadmin sendMsgStatus -n node1:9876 -b broker-b -s 128 -c 5
1
2
3
4
5
6
7
8
9
10
11

# 4.3.4 消息相关

# 4.3.4.1 命令列表

3

image-20230723211748464

image-20230723211803505

image-20230723211816999

# 4.3.4.2 具体操作

# 根据MsgKey查询消息,指定NameServer地址,指定主题名称,指定MsgKey
[root@node3 ~]# mqadmin queryMsgByKey -n node1:9876 -t tp_admin_01 -k 00100
# 根据UNIQ_KEY查询消息,指定NameServer地址,指定UNIQ_KEY,指定消费组名称,指定主题名称
[root@node3 ~]# mqadmin queryMsgByUniqueKey -n node1:9876 -i C0A864672C8B277050DC5F8417BA0000 -g test_grp_console -t tp_admin_01
# 发送检查消息延迟,指定NameServer,指定主题,指定测试次数,指定消息大小,默认128KB
[root@node3 ~]# mqadmin checkMsgSendRT -n node1:9876 -t tp_admin_01 -a 5 -s 128
# 指定发送消息的大小,以测试延迟 1MB
[root@node3 ~]# mqadmin checkMsgSendRT -n node1:9876 -t tp_admin_01 -a 5 -s 1048576
# 消费消息,指定NameServer,指定主题,指定broker名称,指定MQ的id。
[root@node3 ~]# mqadmin consumeMessage -n node1:9876 -t tp_admin_01 -o 0 -b broker-a -i 0
# 发送消息,指定主题,指定NameServer地址,指定消息体
[root@node3 ~]# mqadmin sendMessage -n node1:9876 -t tp_admin_01 -p 'hello lagou console'
# 发送消息,指定主题,指定NameServer地址,指定消息体,指定keys,指定tags
[root@node3 ~]# mqadmin sendMessage -n node1:9876 -t tp_admin_01 -p 'hello lagou console' -k '00100' -c 'test'
# 发送消息,指定NameServer,指定消息体,指定Broker,指定主题
[root@node3 ~]# mqadmin sendMessage -n node1:9876 -p 'hello lagou test 01' -b broker-a -i 0 -t 'tp_admin_01'
# 查看偏移量,指定NameServer,指定主题
[root@node1 ~]# mqadmin topicStatus -n node1:9876 -t tp_admin_01
# 打印消息,指定NameServer地址,指定主题,指定标签过滤,指定是否打印消息体
[root@node3 ~]# mqadmin printMsg -n node1:9876 -t tp_admin_01 -s "*" -d
true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 4.3.5 消费者、消费组相关

# 4.3.5.1 命令列表

image-20230723212038104

image-20230723212057340

# 4.3.5.2 具体操作

package com.lagou.rocket.demo.producer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class MyProducer
{
    public static void main(String[] args) throws MQClientException,
        RemotingException, InterruptedException, MQBrokerException
        {
            DefaultMQProducer producer = new DefaultMQProducer("mygrp");
            producer.setNamesrvAddr("node1:9876");
            producer.start();
            Message message = null;
            for(int i = 0; i < 1000; i++)
            {
                message = new Message("tp_admin_01", ("hello lagou - " + i).getBytes());
                producer.send(message);
                Thread.sleep(1000);
            }
            producer.shutdown();
        }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.lagou.rocket.demo.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class MyConsumer
{
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException
    {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("mygrp_consume");
        consumer.setNamesrvAddr("node1:9876");
        consumer.start();
        Map < MessageQueue, Long > offsetMap = new HashMap < > ();
        Set < MessageQueue > messageQueues = consumer.fetchSubscribeMessageQueues("tp_admin_01");
        while(true)
        {
            for(MessageQueue messageQueue: messageQueues)
            {
                Long aLong = offsetMap.get(messageQueue);
                if(aLong == null)
                {
                    offsetMap.put(messageQueue, 0 L);
                }
                PullResult pull = consumer.pull(messageQueue, "*", offsetMap.get(messageQueue), 1);
                System.out.println(pull.getMsgFoundList().size() + "\t" + pull.getMsgFoundList().get(0).getMsgId());
                offsetMap.put(messageQueue, pull.getNextBeginOffset());
                Thread.sleep(1000);
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 查看消费者状态
[root@node3 ~]# mqadmin consumerStatus -n node1:9876 -g mygrp_consume -s
1
2

# 4.3.6 连接相关

# 4.3.6.1 命令列表

image-20230723212341977

# 4.3.6.2 具体操作

package com.lagou.rocket.demo.producer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class MyProducer
{
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException
    {
        DefaultMQProducer producer = new DefaultMQProducer("mygrp");
        producer.setNamesrvAddr("node1:9876");
        producer.start();
        Message message = null;
        for(int i = 0; i < 1000; i++)
        {
            message = new Message("tp_admin_01", ("hello lagou - " + i).getBytes());
            producer.send(message);
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.lagou.rocket.demo.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class MyConsumer
{
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException
    {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("mygrp_consume");
        consumer.setNamesrvAddr("node1:9876");
        consumer.start();
        Map < MessageQueue, Long > offsetMap = new HashMap < > ();
        Set < MessageQueue > messageQueues = consumer.fetchSubscribeMessageQueues("tp_admin_01");
        while(true)
        {
            for(MessageQueue messageQueue: messageQueues)
            {
                Long aLong = offsetMap.get(messageQueue);
                if(aLong == null)
                {
                    offsetMap.put(messageQueue, 0 L);
                }
                PullResult pull = consumer.pull(messageQueue, "*", offsetMap.get(messageQueue), 1);
                System.out.println(pull.getMsgFoundList().size() + "\t" + pull.getMsgFoundList().get(0).getMsgId());
                offsetMap.put(messageQueue, pull.getNextBeginOffset());
                Thread.sleep(1000);
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 查看生产者连接
[root@node3 bin]# mqadmin producerConnection -g mygrp -t tp_admin_01 -n node1:9876
# 查看消费者连接
[root@node3 bin]# mqadmin consumerConnection -g mygrp_consume -n node1:9876
1
2
3
4

# 4.3.7 NameServer相关

# 4.3.7.1 命令列表

image-20230723212630457

# 4.3.7.2 具体操作

# 获取NameServer配置信息
[root@node2 ~]# mqadmin getNamesrvConfig -n node1:9876
# 修改NameServer的配置
[root@node2 ~]# mqadmin updateNamesrvConfig -n node1:9876 -k serverWorkerThreads -v 10
1
2
3
4

# 4.3.8 其他

# 4.3.8.1 命令列表

image-20230723212746130

# 4.3.8.2 具体操作

[root@node4 ~]# mqadmin startMonitoring -n node1:9876
1

# 4.4 运维常见问题

# 4.4.1 RocketMQ的mqadmin命令报错问题

问题描述:有时候在部署完RocketMQ集群后,尝试执行“mqadmin”一些运维命令,会出现下面的异常信息:

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <null> failed
1

解决方法:可以在部署RocketMQ集群的虚拟机上执行 export NAMESRV_ADDR=ip:9876 (ip指的是集群中部署NameServer组件的机器ip地址)命令之后再使用“mqadmin”的相关命令进行查询,即可得到结果。

# 4.4.2 RocketMQ生产端和消费端版本不一致导致不能正常消费的问题

问题描述:同一个生产端发出消息,A消费端可消费,B消费端却无法消费,rocketMQ Console中出现:

Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message的异常消息。
1

解决方案:RocketMQ 的jar包:rocketmq-client等包应该保持生产端,消费端使用相同的version。

# 4.4.3 新增一个topic的消费组时,无法消费历史消息的问题**

问题描述:当同一个topic的新增消费组启动时,消费的消息是当前的offset的消息,并未获取历史消息。

解决方案:rocketmq默认策略是从消息队列尾部,即跳过历史消息。如果想消费历史消息,则需要设置:

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#setConsumeFromWhere。常用的有以下三种配置:

默认配置,一个新的订阅组第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费,即跳过历史消息;

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
1

一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费,即消费Broker未过期的历史消息;

 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
1

一个新的订阅组第一次启动从指定时间点开始消费,后续再启动接着上次消费的进度开始消费,和consumer.setConsumeTimestamp()配合使用,默认是半个小时以前;

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
1

# 4.4.4 如何开启从Slave读数据功能

在某些情况下,Consumer需要将消费位点重置到1-2天前,这时在内存有限的Master Broker上,CommitLog会承载比较重的IO压力,影响到该Broker的其它消息的读与写。可以开启slaveReadEnable=true ,当Master Broker发现Consumer的消费位点与CommitLog的最新值的差值的容量超过该机器内存的百分比( accessMessageInMemoryMaxRatio=40% ),会推荐Consumer从Slave Broker中去读取数据,降低Master Broker的IO。

# 4.4.5 性能调优问题

异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁,调整Broker配置项useReentrantLockWhenPutMessage ,默认为false;异步刷盘建议开启TransientStorePoolEnable ;建议关闭transferMsgByHeap,提高拉消息效率;同步刷盘建议适当增大 sendMessageThreadPoolNums ,具体配置需要经过压测。

# 4.4.6 在RocketMQ中msgId和offsetMsgId的含义与区别

使用RocketMQ完成生产者客户端消息发送后,通常会看到如下日志打印信息:

SendResult [sendStatus=SEND_OK, msgId=0A42333A0DC818B4AAC246C290FD0000,offsetMsgId=0A42333A00002A9F000000000134F1F5, messageQueue=MessageQueue [topic=topicTest1, BrokerName=mac.local, queueId=3], queueOffset=4]
1
  • msgId,对于客户端来说msgId是由客户端producer实例端生成的,具体来说,调用方法MessageClientIDSetter.createUniqIDBuffer() 生成唯一的Id;

RocketMQ_Page128_001

RocketMQ_Page128_002

RocketMQ_Page128_003

RocketMQ_Page128_004

  • offsetMsgId,offsetMsgId是由Broker服务端在写入消息时生成的(采用”IP地址+Port端口”与“CommitLog的物理偏移量地址”做了一个字符串拼接),其中offsetMsgId就是在RocketMQ控制台直接输入查询的那个messageId。

RocketMQ_Page129_001

上次更新: 2025/04/03, 11:07:08
RocketMQ高级实战
RocketMQ源码剖析

← RocketMQ高级实战 RocketMQ源码剖析→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式