跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • Spring Data JPA
  • MyBatis

  • Spring

  • SpringBoot

  • Jdk

  • Tomcat

  • Netty

    • 基础

      • Netty是什么?
      • Netty的高性能设计
      • Netty快速入⻔
      • Netty核心组件
      • 详解ByteBuf
        • 5.1、⼯作原理
        • 5.2、基本使⽤
          • 5.2.1、读取操作
          • 5.2.2、写⼊操作
          • 5.2.3、丢弃已读字节
          • 5.2.4、clear()
        • 5.3、ByteBuf 使⽤模式
        • 5.4、ByteBuf 的分配
        • 5.5、ByteBuf的释放
          • 5.5.1、⼿动释放
          • 5.5.2、⾃动释放
          • 5.5.2.1、TailHandler
          • 5.5.2.2、SimpleChannelInboundHandler
          • 5.5.2.3、HeadHandler
          • 5.5.3、⼩结
    • 进阶

    • 资料
  • 若依

  • Traefik

  • Openresty

  • 开源框架
  • Netty
  • 基础
Revin
2023-06-17
目录

详解ByteBuf

# 5.1、⼯作原理

Java NIO 提供了ByteBuffer 作为它 的字节容器,但是这个类使⽤起来过于复杂,⽽且也有些繁琐。 Netty 的 ByteBuffer 替代品是 ByteBuf,⼀个强⼤的实现,既解决了JDK API 的局限性, ⼜为⽹络应⽤ 程序的开发者提供了更好的API。

从结构上来说,ByteBuf 由⼀串字节数组构成。数组中每个字节⽤来存放信息。 ByteBuf 提供了两个索引,⼀个⽤于读取数据,⼀个⽤于写⼊数据。这两个索引通过在字节数组中移 动,来定位需要读或者写信息的位置。

当从 ByteBuf 读取时,它的 readerIndex(读索引)将会根据读取的字节数递增。 同样,当写 ByteBuf 时,它的 writerIndex(写索引) 也会根据写⼊的字节数进⾏递增。

image-20230617231840516

如果 readerIndex 超过了 writerIndex 的时候,Netty 会抛出 IndexOutOf-BoundsException 异常。

# 5.2、基本使⽤

# 5.2.1、读取操作

package cn.itcast.myrpc.test;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;

public class TestByteBuf01 {
    public static void main(String[] args) {
        //构造 
      ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
      
        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
        
      while(byteBuf.isReadable()) { //⽅法⼀:内部通过移动readerIndex进⾏读取
            System.out.println((char) byteBuf.readByte());
       }
        //⽅法⼆:通过下标直接读取 
      for (int i = 0; i < byteBuf.readableBytes(); i++) { System.out.println((char)byteBuf.getByte(i));
      }
        //⽅法三:转化为byte[]进⾏读取 
      byte[] bytes = byteBuf.array(); 
      for (byte b : bytes) {
        System.out.println((char)b);
      }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 5.2.2、写⼊操作

package cn.itcast.myrpc.test;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;

public class TestByteBuf02 {
    public static void main(String[] args) {
    //构造空的字节缓冲区,初始⼤⼩为10,最⼤为20
    ByteBuf byteBuf = Unpooled.buffer(10, 20);
    System.out.println("byteBuf的容量为:" + byteBuf.capacity());
    System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
    System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());

      for(int i = 0; i < 5; i++) {
          byteBuf.writeInt(i); //写⼊int类型,⼀个int占4个字节
      } 
        System.out.println("ok");

        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
      
        while(byteBuf.isReadable()) {
            System.out.println(byteBuf.readInt());
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 5.2.3、丢弃已读字节

image-20230617232215396

package cn.itcast.myrpc.test;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;

public class TestByteBuf03 {
    public static void main(String[] args) {
        ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);

        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());

        while(byteBuf.isReadable()) {
            System.out.println((char) byteBuf.readByte());
        }

        byteBuf.discardReadBytes(); //丢弃已读的字节空间

        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 5.2.4、clear()

image-20230617232331742

package cn.itcast.myrpc.test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
public class TestByteBuf04 {
    public static void main(String[] args) {
        ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);

        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
      
        byteBuf.clear(); //重置readerIndex 、 writerIndex 为0
      
        System.out.println("byteBuf的容量为:" + byteBuf.capacity());
        System.out.println("byteBuf的可读容量为:" + byteBuf.readableBytes());
        System.out.println("byteBuf的可写容量为:" + byteBuf.writableBytes());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 5.3、ByteBuf 使⽤模式

根据存放缓冲区的不同分为三类:

  • 堆缓冲区(HeapByteBuf),内存的分配和回收速度⽐较快,可以被JVM⾃动回收,缺点是,如 果进⾏socket的IO读写,需要额外做⼀次内存复制,将堆内存对应的缓冲区复制到内核Channel 中,性能会有⼀定程度的下降。

    由于在堆上被 JVM 管理,在不被使⽤时可以快速释放。可以通过 ByteBuf.array() 来获取 byte[] 数 据。

  • 直接缓冲区(DirectByteBuf),⾮堆内存,它在对外进⾏内存分配,相⽐堆内存,它的分配和回 收速度会慢⼀些,但是将它写⼊或从Socket Channel中读取时,由于减少了⼀次内存拷⻉,速度⽐ 堆内存块。

  • 复合缓冲区,顾名思义就是将上述两类缓冲区聚合在⼀起。Netty 提供了⼀个 CompsiteByteBuf, 可以将堆缓冲区和直接缓冲区的数据放在⼀起,让使⽤更加⽅便。

//默认使⽤的是DirectByteBuf,如果需要使⽤HeapByteBuf模式,则需要进⾏系统参数的设置

System.setProperty("io.netty.noUnsafe", "true"); //netty中IO操作都是基于Unsafe完 成的

//ByteBuf 的分配要设置为⾮池化,否则不能切换到堆缓冲器模式 
serverBootstrap.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
1
2
3
4
5
6

# 5.4、ByteBuf 的分配

Netty 提供了两种 ByteBufAllocator 的实现,分别是:

  • PooledByteBufAllocator,实现了 ByteBuf 的对象的池化,提⾼性能减少并最⼤限度地减少内存 碎⽚。
  • UnpooledByteBufAllocator,没有实现对象的池化,每次会⽣成新的对象实例。
//通过ChannelHandlerContext获取ByteBufAllocator实例 
ctx.alloc();

//通过channel也可以获取 
channel.alloc();

//Netty默认使⽤了PooledByteBufAllocator

//可以在引导类中设置⾮池化模式 
serverBootstrap.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);

//或通过系统参数设置 
System.setProperty("io.netty.allocator.type", "pooled"); System.setProperty("io.netty.allocator.type", "unpooled");
1
2
3
4
5
6
7
8
9
10
11
12
13

# 5.5、ByteBuf的释放

ByteBuf如果采⽤的是堆缓冲区模式的话,可以由GC回收,但是如果采⽤的是直接缓冲区,就不受GC的 管理,就得⼿动释放,否则会发⽣内存泄露。

关于ByteBuf的释放,分为⼿动释放与⾃动释放。

# 5.5.1、⼿动释放

⼿动释放,就是在使⽤完成后,调⽤ReferenceCountUtil.release(byteBuf); 进⾏释放。 通过release⽅法减去 byteBuf 的使⽤计数,Netty 会⾃动回收 byteBuf 。

示例:

/** 
* 获取客户端发来的数据 
* 
* @param ctx 
* @param msg 
* @throws Exception 
*/
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf byteBuf = (ByteBuf) msg;
  
    String msgStr = byteBuf.toString(CharsetUtil.UTF_8);
    System.out.println("客户端发来数据:" + msgStr);
  
    //释放资源 
  	ReferenceCountUtil.release(byteBuf);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

⼿动释放可以达到⽬的,但是这种⽅式会⽐较繁琐,如果⼀旦忘记释放就可能会造成内存泄露。

# 5.5.2、⾃动释放

⾃动释放有三种⽅式,分别是:⼊站的TailHandler、继承SimpleChannelInboundHandler、 HeadHandler的出站释放。

# 5.5.2.1、TailHandler

Netty的ChannelPipleline的流⽔线的末端是TailHandler,默认情况下如果每个⼊站处理器Handler都把 消息往下传,TailHandler会释放掉ReferenceCounted类型的消息。

/** 
* 获取客户端发来的数据 
* 
* @param ctx 
* @param msg 
* @throws Exception 
*/
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf byteBuf = (ByteBuf) msg;
  
    String msgStr = byteBuf.toString(CharsetUtil.UTF_8);
    System.out.println("客户端发来数据:" + msgStr);
  
    //向客户端发送数据 
  ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
  
    ctx.fireChannelRead(msg); //将ByteBuf向下传递
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

在DefaultChannelPipeline中的TailContext内部类会在最后执⾏:

@Override 
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    onUnhandledInboundMessage(ctx, msg);
}
//最后会执⾏ 
protected void onUnhandledInboundMessage(Object msg) {
  try {
      logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg);
  } finally {
      ReferenceCountUtil.release(msg); //释放资源
  }
}
1
2
3
4
5
6
7
8
9
10
11
12

需要注意的是,如果没有进⾏向下传递,那么在TailHandler中是不会进⾏释放操作的。

# 5.5.2.2、SimpleChannelInboundHandler

当ChannelHandler继承了SimpleChannelInboundHandler后,在SimpleChannelInboundHandler的 channelRead()⽅法中,将会进⾏资源的释放,我们的业务代码也需要写⼊到channelRead0()中。

//SimpleChannelInboundHandler中的channelRead() 
@Override 
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    boolean release = true;
    try {
        if(acceptInboundMessage(msg)) {
            @SuppressWarnings("unchecked") 
          	I imsg = (I) msg;
            channelRead0(ctx, imsg);
        } else {
            release = false;
            ctx.fireChannelRead(msg);
        }
    } finally {
        if(autoRelease && release) {
            ReferenceCountUtil.release(msg); //在这⾥释放 }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

使⽤:

package cn.itcast.myrpc.client.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class MyClientHandler extends SimpleChannelInboundHandler < ByteBuf > {
    @Override 
  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("接收到服务端的消息:" + msg.toString(CharsetUtil.UTF_8));
    }
    
  @Override 
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 向服务端发送数据 
        String msg = "hello"; 
        ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
    }
    
  @Override 
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 5.5.2.3、HeadHandler

出站处理流程中,申请分配到的 ByteBuf,通过 HeadHandler 完成⾃动释放。

出站处理⽤到的 Bytebuf 缓冲区,⼀般是要发送的消息,通常由应⽤所申请。在出站流程开始的时候, 通过调⽤ ctx.writeAndFlush(msg),Bytebuf 缓冲区开始进⼊出站处理的 pipeline 流⽔线 。

在每⼀个出站Handler中的处理完成后,最后消息会来到出站的最后⼀棒 HeadHandler,再经过⼀轮复 杂的调⽤,在flush完成后终将被release掉。

示例:

package cn.itcast.myrpc.client.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class MyClientHandler extends SimpleChannelInboundHandler < ByteBuf > {
    @Override 
  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("接收到服务端的消息:" + msg.toString(CharsetUtil.UTF_8));
    }
  
    @Override 
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 向服务端发送数据
        String msg = "hello";
        ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
    }
  
    @Override 
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

执⾏⽅法调⽤链:

Page37_01

# 5.5.3、⼩结

  • ⼊站处理流程中,如果对原消息不做处理,调⽤ ctx.fireChannelRead(msg) 把原消息往下传,由 流⽔线最后⼀棒 TailHandler 完成⾃动释放。
  • 如果截断了⼊站处理流⽔线,则可以继承 SimpleChannelInboundHandler ,完成⼊站ByteBuf ⾃ 动释放。
  • 出站处理过程中,申请分配到的 ByteBuf,通过 HeadHandler 完成⾃动释放。
  • ⼊站处理中,如果将原消息转化为新的消息并调⽤ ctx.fireChannelRead(newMsg)往下传,那必须把原消息release掉;
  • ⼊站处理中,如果已经不再调⽤ ctx.fireChannelRead(msg) 传递任何消息,也没有继承 SimpleChannelInboundHandler 完成⾃动释放,那更要把原消息release掉;
上次更新: 2025/04/03, 11:07:08
Netty核心组件
Netty编解码器

← Netty核心组件 Netty编解码器→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式