跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

  • 消息队列

    • 消息中间件(MQ)介绍
    • Kafka

      • Kafka基础使用
      • Kafka深入

        • Kafka架构与实战
        • Kafka高级特性解析
        • Kafka高级特性-消费者
        • Kafka高级特性-主题
        • Kafka高级特性-分区
        • Kafka高级特性-物理存储
        • Kafka高级特性-稳定性
        • Kafka高级特性-延时队列
        • Kafka高级特性-重试队列
        • Kafka集群与运维
          • 第1节 消息传递
          • 第2节 网站活动路由
          • 第3节 监控指标
          • 第4节 日志汇总
          • 第5节 流处理
          • 第6节 活动采集
          • 第7节 提交日志
          • 3.2.1 Zookeeper 集群搭建
          • 3.2.2 Kafka 集群搭建
          • 3.3.1 监控度量指标
            • 3.3.1.1 JMX
            • 3.3.1.1.1 Kafka开启Jmx端口
            • 3.3.1.1.2 验证JMX开启
            • 3.3.1.2 使用JConsole链接JMX端口
            • 3.3.1.3 编程手段来获取监控指标
          • 3.3.2 监控工具 Kafka Eagle
        • Kafka源码剖析
        • 源码剖析-Broker启动流程
        • 源码剖析-Topic创建流程
        • 源码剖析-Producer生产者流程
        • 源码剖析-Consumer消费者流程
        • 源码剖析-消息存储机制
        • 源码剖析-SocketServer
        • 源码剖析-KafkaRequestHandlerPool
        • 源码剖析-LogManager
        • 源码剖析-ReplicaManager
        • 源码剖析-OffsetManager
        • 源码剖析-KafkaApis
        • 源码剖析-KafkaController
        • 源码剖析-KafkaHealthcheck
        • 源码剖析-DynamicConfigManager
        • 源码剖析-分区消费模式
        • 源码剖析-组消费模式
        • 源码剖析-同步发送模式
        • 源码剖析-异步发送模式
    • RabbitMQ

    • RocketMQ

  • Zookeeper

  • java组件
  • 消息队列
  • Kafka
  • Kafka深入
Revin
2023-07-23
目录

Kafka集群与运维

# 3.1 集群应用场景

# 第1节 消息传递

Kafka可以很好地替代传统邮件代理。消息代理的使用有多种原因(将处理与数据生产者分离,缓冲未处理的消息等)。与大多数邮件系统相比,Kafka具有更好的吞吐量,内置的分区,复制和容错功能,这使其成为大规模邮件处理应用程序的理想解决方案。

根据我们的经验,消息传递的使用通常吞吐量较低,但是可能需要较低的端到端延迟,并且通常取决于Kafka提供的强大的持久性保证。

在这个领域,Kafka与ActiveMQ或 RabbitMQ等传统消息传递系统相当。

# 第2节 网站活动路由

Kafka最初的用例是能够将用户活动跟踪管道重建为一组实时的发布-订阅。这意味着将网站活动(页面浏览,搜索或用户可能采取的其他操作)发布到中心主题,每种活动类型只有一个主题。这些提要可用于一系列用例的订阅,包括实时处理,实时监控,以及加载到Hadoop或脱机数据仓库系统中以进行脱机处理和报告。

活动跟踪通常量很大,因为每个用户页面视图都会生成许多活动消息。

# 第3节 监控指标

Kafka通常用于操作监控数据。这涉及汇总来自分布式应用程序的统计信息,以生成操作数据的集中。

# 第4节 日志汇总

许多人使用Kafka代替日志聚合解决方案。日志聚合通常从服务器收集物理日志文件,并将它们放在中央位置(也许是文件服务器或HDFS)以进行处理。Kafka提取文件的详细信息,并以日志流的形式更清晰地抽象日志或事件数据。这允许较低延迟的处理,并更容易支持多个数据源和分布式数据消耗。与以日志为中心的系统(例如Scribe或Flume)相比,Kafka具有同样出色的性能,由于复制而提供的更强的耐用性保证以及更低的端到端延迟。

# 第5节 流处理

Kafka的许多用户在由多个阶段组成的处理管道中处理数据,其中原始输入数据从Kafka主题中使用,然后进行汇总,充实或以其他方式转换为新主题,以供进一步使用或后续处理。例如,用于推荐新闻文章的处理管道可能会从RSS提要中检索文章内容,并将其发布到“文章”主题中。进一步的处理可能会使该内容规范化或重复数据删除,并将清洗后的文章内容发布到新主题中;最后的处理阶段可能会尝试向用户推荐此内容。这样的处理管道基于各个主题创建实时数据流的图形。从0.10.0.0开始,一个轻量但功能强大的流处理库称为Kafka Streams 可以在Apache Kafka中使用来执行上述数据处理。除了Kafka Streams以外,其他开源流处理工具还包括Apache Storm和 Apache Samza。

# 第6节 活动采集

事件源是一种应用程序,其中状态更改以时间顺序记录记录。Kafka对大量存储的日志数据的支持使其成为以这种样式构建的应用程序的绝佳后端。

# 第7节 提交日志

Kafka可以用作分布式系统的一种外部提交日志。该日志有助于在节点之间复制数据,并充当故障节点恢复其数据的重新同步机制。Kafka中的日志压缩功能有助于支持此用法。在这种用法中,Kafka类似于Apache BookKeeper项目。

  1. 横向扩展,提高Kafka的处理能力
  2. 镜像,副本,提供高可用。

# 3.2 集群搭建

  1. 搭建设计

Kafka_Page214_001

  1. 分配三台Linux,用于安装拥有三个节点的Kafka集群。
node2(192.168.100.102)
node3(192.168.100.103)
node4(192.168.100.104)
1
2
3

以上三台主机的/etc/hosts配置:

192.168.100.101 node1
192.168.100.102 node2
192.168.100.103 node3
192.168.100.104 node4
1
2
3
4

# 3.2.1 Zookeeper 集群搭建

  1. Linux安装JDK,三台Linux都安装。
  • 上传JDK到linux

  • 安装并配置JDK

# 使用rpm安装JDK
rpm -ivh jdk-8u261-linux-x64.rpm
# 默认的安装路径是/usr/java/jdk1.8.0_261-amd64
# 配置JAVA_HOME
vim /etc/profile
# 文件最后添加两行
export JAVA_HOME=/usr/java/jdk1.8.0_261-amd64
export PATH=$PATH:$JAVA_HOME/bin
# 退出vim,使配置生效
source /etc/profile
1
2
3
4
5
6
7
8
9
10
  • 查看JDK是否正确安装
 java -version
1

Kafka_Page215_001

  1. Linux 安装Zookeeper,三台Linux都安装,以搭建Zookeeper集群
  • 上传zookeeper-3.4.14.tar.gz到Linux

  • 解压并配置zookeeper

# node2操作
# 解压到/opt目录
tar -zxf zookeeper-3.4.14.tar.gz -C /opt
# 配置
cd /opt/zookeeper-3.4.14/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
# 设置
dataDir=/var/lagou/zookeeper/data
# 添加
server.1=node2:2881:3881
server.2=node3:2881:3881
server.3=node4:2881:3881

# 退出vim
mkdir -p /var/lagou/zookeeper/data
echo 1 > /var/lagou/zookeeper/data/myid

# 配置环境变量
vim /etc/profile
# 添加
export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
export ZOO_LOG_DIR=/var/lagou/zookeeper/log

# 退出vim,让配置生效
source /etc/profile

# 将/opt/zookeeper-3.4.14拷贝到node3,node4
scp -r /opt/zookeeper-3.4.14/ node3:/opt
scp -r /opt/zookeeper-3.4.14/ node4:/opt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

node3配置

# 配置环境变量
vim /etc/profile
# 在配置JDK环境变量基础上,添加内容
export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
export ZOO_LOG_DIR=/var/lagou/zookeeper/log

# 退出vim,让配置生效
source /etc/profile

mkdir -p /var/lagou/zookeeper/data
echo 2 > /var/lagou/zookeeper/data/myid
1
2
3
4
5
6
7
8
9
10
11
12

node4配置

# 配置环境变量
vim /etc/profile
# 在配置JDK环境变量基础上,添加内容
export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
export ZOO_LOG_DIR=/var/lagou/zookeeper/log

# 退出vim,让配置生效
source /etc/profile

mkdir -p /var/lagou/zookeeper/data
echo 3 > /var/lagou/zookeeper/data/myid
1
2
3
4
5
6
7
8
9
10
11
12

启动zookeeper

# 在三台Linux上启动Zookeeper
[root@node2 ~]# zkServer.sh start
[root@node3 ~]# zkServer.sh start
[root@node4 ~]# zkServer.sh start

# 在三台Linux上查看Zookeeper的状态
[root@node2 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: follower

[root@node3 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: leader

[root@node4 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: follower
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 3.2.2 Kafka 集群搭建

  1. 安装Kafka
  • 上传并解压Kafka到/opt
# 解压到/opt
tar -zxf kafka_2.12-1.0.2.tgz -C /opt

# 拷贝到node3和node4
scp -r /opt/kafka_2.12-1.0.2/ node3:/opt
scp -r /opt/kafka_2.12-1.0.2/ node4:/opt
1
2
3
4
5
6
  • 配置Kafka
# 配置环境变量,三台Linux都要配置
vim /etc/profile
# 添加以下内容:
export KAFKA_HOME=/opt/kafka_2.12-1.0.2
export PATH=$PATH:$KAFKA_HOME/bin

# 让配置生效
source /etc/profile

# node2配置
vim /opt/kafka_2.12-1.0.2/config/server.properties

broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://node2:9092
log.dirs=/var/lagou/kafka/kafka-logs
zookeeper.connect=node2:2181,node3:2181,node4:2181/myKafka
# 其他使用默认配置


# node3配置
vim /opt/kafka_2.12-1.0.2/config/server.properties

broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://node3:9092
log.dirs=/var/lagou/kafka/kafka-logs
zookeeper.connect=node2:2181,node3:2181,node4:2181/myKafka
# 其他使用默认配置

# node4配置
vim /opt/kafka_2.12-1.0.2/config/server.properties

broker.id=2
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://node4:9092
log.dirs=/var/lagou/kafka/kafka-logs
zookeeper.connect=node2:2181,node3:2181,node4:2181/myKafka
# 其他使用默认配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
  • 启动Kafka
[root@node2 ~]# kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties
[root@node3 ~]# kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties
[root@node4 ~]# kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties
1
2
3
  • 验证Kafka

node2节点的Cluster Id:

Kafka_Page218_001

node3节点的Cluster Id:

Kafka_Page218_002

node4节点的Cluster Id:

Kafka_Page218_003

  1. Cluster Id是一个唯一的不可变的标志符,用于唯一标志一个Kafka集群。
  2. 该Id最多可以有22个字符组成,字符对应于URL-safe Base64。
  3. Kafka 0.10.1版本及之后的版本中,在集群第一次启动的时候,Broker从Zookeeper的<Kafka_ROOT>/cluster/id节点获取。如果该Id不存在,就自动生成一个新的。
zkCli.sh
# 查看每个Broker的信息
get /brokers/ids/0
get /brokers/ids/1
get /brokers/ids/2
1
2
3
4
5

node2节点在Zookeeper上的信息:

Kafka_Page218_004

node3节点在Zookeeper上的信息:

Kafka_Page218_005

node4节点在Zookeeper上的信息:

Kafka_Page219_001

# 3.3 集群监控

# 3.3.1 监控度量指标

Kafka使用Yammer Metrics在服务器和Scala客户端中报告指标。Java客户端使用Kafka Metrics,它是一个内置的度量标准注册表,可最大程度地减少拉入客户端应用程序的传递依赖项。两者都通过JMX公开指标,并且可以配置为使用可插拔的统计报告器报告统计信息,以连接到您的监视系统。

具体的监控指标可以查看官方文档 (opens new window)。

# 3.3.1.1 JMX

# 3.3.1.1.1 Kafka开启Jmx端口

 [root@node4 bin]# vim /opt/kafka_2.12-1.0.2/bin/kafka-server-start.sh
1

Kafka_Page219_002

所有kafka机器添加一个 JMX_PORT ,并重启kafka

# 3.3.1.1.2 验证JMX开启

首先打印9581端口占用的进程信息,然后使用进程编号对应到Kafka的进程号,搞定。

Kafka_Page219_003

也可以查看Kafka启动日志,确定启动参数-Dcom.sun.management.jmxremote.port=9581存在即可

# 3.3.1.2 使用JConsole链接JMX端口

  1. win/mac,找到jconsole工具并打开, 在 ${JAVA_HOEM}/bin/ Mac电脑可以直接命令行输入 jconsole

Kafka_Page220_001

Kafka_Page221_001

Kafka_Page221_002

Kafka_Page222_001

详细的监控指标

相见官方文档:http://kafka.apache.org/10/documentation.html#monitoring

这里列出常用的:

OS监控项

image-20230731165128504

broker指标

image-20230731165201654

producer以及opic指标

image-20230731165313312

consumer指标

image-20230731165333107

# 3.3.1.3 编程手段来获取监控指标

查看要监控哪个指标:

Kafka_Page225_001

代码实现:

package com.lagou.kafka.demo.monitor;
import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
public class JMXMonitorDemo
{
    public static void main(String[] args) throws IOException, MalformedObjectNameException, AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException
    {
        String jmxServiceURL = "service:jmx:rmi:///jndi/rmi://192.168.100.103:9581/jmxrmi";
        JMXServiceURL jmxURL = null;
        JMXConnector jmxc = null;
        MBeanServerConnection jmxs = null;
        ObjectName mbeanObjName = null;
        Iterator sampleIter = null;
        Set sampleSet = null;
        // 创建JMXServiceURL对象,参数是
        jmxURL = new JMXServiceURL(jmxServiceURL);
        // 建立到指定URL服务器的连接
        jmxc = JMXConnectorFactory.connect(jmxURL);
        // 返回代表远程MBean服务器的MBeanServerConnection对象
        jmxs = jmxc.getMBeanServerConnection();
        // 根据传入的字符串,创建ObjectName对象
        // mbeanObjName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec");
        mbeanObjName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=tp_eagle_01");
        // 获取指定ObjectName对应的MBeans
        sampleSet = jmxs.queryMBeans(null, mbeanObjName);
        // 迭代器
        sampleIter = sampleSet.iterator();
        if(sampleSet.isEmpty())
        {}
        else
        {
            // 如果返回了,则打印信息
            while(sampleIter.hasNext())
            {
                // Used to represent the object name of an MBean and its class name.
                // If the MBean is a Dynamic MBean the class name should be retrieved from the MBeanInfo it provides.
                // 用于表示MBean的ObjectName和ClassName
                ObjectInstance sampleObj = (ObjectInstance) sampleIter.next();
                ObjectName objectName = sampleObj.getObjectName();
                // 查看指定MBean指定属性的值
                String count = jmxs.getAttribute(objectName, "Count").toString();
                System.out.println(count);
            }
        }
        // 关闭
        jmxc.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

# 3.3.2 监控工具 Kafka Eagle

我们可以使用Kafka-eagle管理Kafka集群

核心模块:

  • 面板可视化

  • 主题管理,包含创建主题、删除主题、主题列举、主题配置、主题查询等

  • 消费者应用:对不同消费者应用进行监控,包含Kafka API、Flink API、Spark API、Storm API、Flume API、LogStash API等

  • 集群管理:包含对Kafka集群和Zookeeper集群的详情展示,其内容包含Kafka启动时间、Kafka端口号、Zookeeper Leader角色等。同时,还有多集群切换管理,Zookeeper Client操作入口

  • 集群监控:包含对Broker、Kafka核心指标、Zookeeper核心指标进行监控,并绘制历史趋势图

  • 告警功能:对消费者应用数据积压情况进行告警,以及对Kafka和Zookeeper监控度进行告警。同时,支持邮件、微信、钉钉告警通知

  • 系统管理:包含用户创建、用户角色分配、资源访问进行管理

架构:

  • 可视化:负责展示主题列表、集群健康、消费者应用等

  • 采集器:数据采集的来源包含Zookeeper、Kafka JMX & 内部Topic、Kafka API(Kafka 2.x以后版本)

  • 数据存储:目前Kafka Eagle存储采用MySQL或SQLite,数据库和表的创建均是自动完成的,按照官方文档进行配置好,启动Kafka Eagle就会自动创建,用来存储元数据和监控数据

  • 监控:负责见消费者应用消费情况、集群健康状态

  • 告警:对监控到的异常进行告警通知,支持邮件、微信、钉钉等方式

  • 权限管理:对访问用户进行权限管理,对于管理员、开发者、访问者等不同角色的用户,分配不用的访问权限

需要Kafka节点开启JMX。前面讲过了。

# 下载编译好的包
wget http://pkgs-linux.cvimer.com/kafka-eagle.zip

# 配置kafka-eagle
unzip kafka-eagle.zip
cd kafka-eagle/kafka-eagle-web/target
mkdir -p test
cp kafka-eagle-web-2.0.1-bin.tar.gz test/
tar xf kafka-eagle-web-2.0.1-bin.tar.gz
cd kafka-eagle-web-2.0.1
1
2
3
4
5
6
7
8
9
10

需要配置环境变量:

KE_HOME=
PATH=
1
2

conf下的配置文件:system-config.properties

######################################
# multi zookeeper & kafka cluster list
######################################
# 集群的别名,用于在kafka-eagle中进行区分。
# 可以配置监控多个集群,别名用逗号隔开
# kafka.eagle.zk.cluster.alias=cluster1,cluster2,cluster3
kafka.eagle.zk.cluster.alias=cluster1
# cluster1.zk.list=10.1.201.17:2181,10.1.201.22:2181,10.1.201.23:2181
# 配置当前集群的zookeeper地址,此处的值要与Kafka的server.properties中的zookeeper.connect的值一致
# 此处的前缀就是集群的别名
cluster1.zk.list=node2:2181,node3:2181,node4:2181/myKafka
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181

######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123

######################################
# broker size online list
######################################
cluster1.kafka.eagle.broker.size=20

######################################
# zookeeper客户端连接数限制
######################################
kafka.zk.limit.size=25

######################################
# kafka eagle网页端口号
######################################
kafka.eagle.webui.port=8048

######################################
# kafka 消费信息存储位置,用来兼容kafka低版本
######################################
cluster1.kafka.eagle.offset.storage=kafka
cluster2.kafka.eagle.offset.storage=zk

######################################
# kafka metrics, 15 days by default
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=15


######################################
# kafka sql topic records max
######################################
kafka.eagle.sql.topic.records.max=5000
kafka.eagle.sql.fix.error=true

######################################
# 管理员删除kafka中topic的口令
######################################
kafka.eagle.topic.token=keadmin

######################################
# kafka 集群是否开启了认证模式,此处是cluster1集群的配置,禁用
######################################
cluster1.kafka.eagle.sasl.enable=false
cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.kafka.eagle.sasl.client.id=
cluster1.kafka.eagle.sasl.cgroup.enable=false
cluster1.kafka.eagle.sasl.cgroup.topics=

######################################
# kafka ssl authenticate,示例配置
######################################
cluster2.kafka.eagle.sasl.enable=false
cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster2.kafka.eagle.sasl.mechanism=PLAIN
cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
cluster2.kafka.eagle.sasl.client.id=
cluster2.kafka.eagle.sasl.cgroup.enable=false
cluster2.kafka.eagle.sasl.cgroup.topics=

######################################
# kafka ssl authenticate,示例配置
######################################
cluster3.kafka.eagle.ssl.enable=false
cluster3.kafka.eagle.ssl.protocol=SSL
cluster3.kafka.eagle.ssl.truststore.location=
cluster3.kafka.eagle.ssl.truststore.password=
cluster3.kafka.eagle.ssl.keystore.location=
cluster3.kafka.eagle.ssl.keystore.password=
cluster3.kafka.eagle.ssl.key.password=
cluster3.kafka.eagle.ssl.cgroup.enable=false
cluster3.kafka.eagle.ssl.cgroup.topics=

######################################
# 存储监控数据的数据库地址
# kafka默认使用sqlite存储,需要指定和创建sqlite的目录
# 如 /home/lagou/hadoop/kafka-eagle/db
######################################
kafka.eagle.driver=org.sqlite.JDBC
kafka.eagle.url=jdbc:sqlite:/home/lagou/hadoop/kafka-eagle/db/ke.db
kafka.eagle.username=root
kafka.eagle.password=www.kafka-eagle.org

######################################
# 还可以使用MySLQ存储监控数据
######################################
#kafka.eagle.driver=com.mysql.jdbc.Driver
#kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
#kafka.eagle.username=root
#kafka.eagle.password=123456

######################################
# kafka eagle 设置告警邮件服务器
######################################
kafka.eagle.mail.enable=true
kafka.eagle.mail.sa=kafka_lagou_alert
kafka.eagle.mail.username=kafka_lagou_alert@163.com
kafka.eagle.mail.password=Pas2W0rd
kafka.eagle.mail.server.host=smtp.163.com
kafka.eagle.mail.server.port=25
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122

也可以自行编译, https://github.com/smartloli/kafka-eagle

创建Eagel的存储目录: mkdir -p /hadoop/kafka-eagle

# 启动kafka-eagle
./bin/ke.sh start
1
2

会提示我们登陆地址和账号密码

上次更新: 2025/04/03, 11:07:08
Kafka高级特性-重试队列
Kafka源码剖析

← Kafka高级特性-重试队列 Kafka源码剖析→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式