跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • Spring Data JPA
  • MyBatis

  • Spring

  • SpringBoot

  • Jdk

  • Tomcat

  • Netty

    • 基础

    • 进阶

      • Netty编解码器
      • TCP粘包拆包的问题及解决
        • 2.1、ReplayingDecoder
        • 2.2、什么是TCP粘包/拆包问题?
          • 2.2.1、案例:演示TCP粘包/拆包问题
        • 2.3、解决⽅法
        • 2.4、实战:解决TCP的粘包/拆包问题
          • 2.4.1、⾃定义协议
          • 2.4.2、编解码器
          • 2.4.3、客户端
          • 1.5.2、客户端
          • 2.4.4、服务端
          • 2.4.5、测试
      • 自研RPC实战
      • Netty核心源码剖析
      • Netty优化建议
    • 资料
  • 若依

  • Traefik

  • Openresty

  • 开源框架
  • Netty
  • 进阶
Revin
2023-06-17
目录

TCP粘包拆包的问题及解决

# 2.1、ReplayingDecoder

在前⾯案例中,当需要获取int数据时,需要进⾏判断是否够4个字节,如果解码业务过于复杂的话,这 样的判断会显得⾮常的繁琐,在Netty中提供了ReplayingDecoder就可以解决这样的问题。 ReplayingDecoder也是继承了ByteToMessageDecoder进⾏的扩展。

javadoc⽂档中的⼀个示例:

public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {

	@Override 
  protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {

    if (buf.readableBytes() < 4) {
      return;
    }

    buf.markReaderIndex(); int length = buf.readInt();

    if (buf.readableBytes() < length) {
      buf.resetReaderIndex(); return;
    }

    out.add(buf.readBytes(length));
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

01

使⽤ReplayingDecoder后:

public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> {

protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {

	out.add(buf.readBytes(buf.readInt()));

	}
}
1
2
3
4
5
6
7
8

基本原理:

  • 使⽤了特殊的ByteBuf,叫做ReplayingDecoderByteBuf,扩展了ByteBuf
  • 重写了ByteBuf的readXxx()等⽅法,会先检查可读字节⻓度,⼀旦检测到不满⾜要求就直接抛出 REPLAY(REPLAY继承ERROR)
  • ReplayingDecoder重写了ByteToMessageDecoder的callDecode()⽅法,捕获Signal并在catch块 中重置ByteBuf的readerIndex。 继续等待数据,直到有了数据后继续读取,这样就可以保证读取到需要读取的数据。
  • 类定义中的泛型 S 是⼀个⽤于记录解码状态的状态机枚举类,在state(S s)、checkpoint(S s)等⽅ 法中会⽤到。在简单解码时也可以⽤java.lang.Void来占位。

需要注意:

  • buffer的部分操作(readBytes(ByteBuffer dst)、retain()、release()等⽅法会直接抛出异常)
  • 在某些情况下会影响性能(如多次对同⼀段消息解码)

TCP是基于流的,只保证接收到数据包分⽚顺序,⽽不保证接收到的数据包每个分⽚⼤⼩。因此在使⽤ ReplayingDecoder时,即使不存在多线程,同⼀个线程也可能多次调⽤decode()⽅法。在decode中修 改ReplayingDecoder的类变量时必须⼩⼼谨慎。

//这是⼀个错误的例⼦:
//消息中包含了2个integer,代码中decode⽅法会被调⽤两次,此时队列size不等于2,这段代码达不 到期望结果。 
public class MyDecoder extends ReplayingDecoder<Void> {

  private final Queue<Integer> values = new LinkedList<Integer>(); 
    @Override public void decode(ByteBuf buf, List<Object> out) throws Exception { // A message contains 2 integers.
      values.offer(buf.readInt()); 
      values.offer(buf.readInt()); 
      assert values.size() == 2; 
      out.add(values.poll() + values.poll());
  }
}

//正确的做法: 
public class MyDecoder extends ReplayingDecoder<Void> {
  private final Queue<Integer> values = new LinkedList<Integer>(); 
    @Override 
    public void decode(ByteBuf buf, List<Object> out) throws Exception {

      // Revert the state of the variable that might have been changed
      // since the last partial decode.
      values.clear();

      // A message contains 2 integers. 
      values.offer(buf.readInt()); 
      values.offer(buf.readInt()); // Now we know this assertion will never fail. 
      assert values.size() == 2; 
      out.add(values.poll() + values.poll());

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

ByteToIntegerDecoder2的实现:

package cn.itcast.netty.coder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
public class ByteToIntegerDecoder2 extends ReplayingDecoder < Void > {
        /** 
        * @param ctx 上下⽂ 
        * @param in 输⼊的ByteBuf消息数据 
        * @param out 转化后输出的容器 
        * @throws Exception 
        */
        @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in , List < Object > out) throws Exception {
            out.add(in.readInt()); //读取到int类型数据,放⼊到输出,完成数据类型的转化
        }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

ServerHandler:

package cn.itcast.netty.codec.obj;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ServerHandler extends SimpleChannelInboundHandler < User > {
    @Override
    public void channelRead0(ChannelHandlerContext ctx, User user) throws
    Exception {
        //获取到user对象 System.out.println(user);
        ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 2.2、什么是TCP粘包/拆包问题?

TCP是流传递的,所谓流,就是⼀串没有界限的数据,服务端接收到客户端发来的数据,并不确定这是 ⼀条数据,还是多条数据,应该如何拆包,服务端是不知道的。

所以,客户端与服务端就需要约定好拆包的规则,客户端按照此规则进⾏粘包,⽽服务端按照此规则进 ⾏拆包,这就是TCP的粘包与拆包,如果不约定好,就会出现服务端不能按照期望拿到数据。

实际上,彼此约定的规则就是协议,⾃定义协议就是⾃定义规则。

# 2.2.1、案例:演示TCP粘包/拆包问题

客户端:向服务端发送10条消息,并且记录服务端返回的消息的数量。

package cn.itcast.netty.tcppackage.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            // 服务器启动类 
          Bootstrap bootstrap = new Bootstrap(); 
          bootstrap.group(worker); 
          bootstrap.channel(NioSocketChannel.class); 
          bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ClientHandler());
            }
        });
          ChannelFuture future = bootstrap.connect("127.0.0.1", 5566).sync();
          future.channel().closeFuture().sync();
      } finally {
          worker.shutdownGracefully();
      }
		}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package cn.itcast.netty.tcppackage.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ClientHandler extends SimpleChannelInboundHandler < ByteBuf > {
    private int count;
  
    @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
      
        System.out.println("接收到服务端的消息:" + msg.toString(CharsetUtil.UTF_8));
        System.out.println("接收到服务端的消息数量:" + (++count));
    }
  
    @Override 
  	public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for(int i = 0; i < 10; i++) {
            ctx.writeAndFlush(Unpooled.copiedBuffer("from client a message!", CharsetUtil.UTF_8));
        }
    }
  
    @Override 
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

服务端:接收消息并且记录消息的数量,向客户端发送响应。

package cn.itcast.netty.tcppackage.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {
    public static void main(String[] args) throws Exception {
        // 主线程,不处理任何业务逻辑,只是接收客户的连接请求 
      EventLoopGroup boss = new NioEventLoopGroup(1); 
      // ⼯作线程,线程数默认是:cpu*2 
      EventLoopGroup worker = new NioEventLoopGroup();
        try {
            // 服务器启动类 
          ServerBootstrap serverBootstrap = new ServerBootstrap(); 
          serverBootstrap.group(boss, worker); //配置server通道 
          serverBootstrap.channel(NioServerSocketChannel.class); 
          serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>
            () {
                @Override protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ServerHandler());
                }
            }); //worker线程的处理器
        ChannelFuture future = serverBootstrap.bind(5566).sync();
        System.out.println("服务器启动完成。。。。。");
        //等待服务端监听端⼝关闭 
          future.channel().closeFuture().sync(); } 
      finally {
        //优雅关闭 
        boss.shutdownGracefully();
        worker.shutdownGracefully(); 
      }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.itcast.netty.tcppackage.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class ServerHandler extends SimpleChannelInboundHandler < ByteBuf > {
    private int count;
  
    @Override
  	protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("服务端接收到消息:" + msg.toString(CharsetUtil.UTF_8));
        System.out.println("服务端接收到消息数量:" + (++count));
        ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

测试结果:

#服务端 
服务器启动完成。。。。。 
服务端接收到消息:from client a message!from client a message!from client a message!from client a message!from client a message!from client a message!from client a message!from client a message!from client a message!from client a message!

服务端接收到消息数量:1

#客户端
接收到服务端的消息:ok 接收到服务端的消息数量:1

#测试结果与期望不符
1
2
3
4
5
6
7
8
9
10

# 2.3、解决⽅法

⼀般来讲有3中⽅法解决TCP的粘包与拆包问题:

在发送的数据包中添加头,在头⾥存储数据的⼤⼩,服务端就可以按照此⼤⼩来读取数据,这样就 知道界限在哪⾥了。 以固定的⻓度发送数据,超出的分多次发送,不⾜的以0填充,接收端就以固定⻓度接收即可。 在数据包之间设置边界,如添加特殊符号,这样,接收端通过这个边界就可以将不同的数据包拆分 开。

# 2.4、实战:解决TCP的粘包/拆包问题

# 2.4.1、⾃定义协议

package cn.itcast.netty.tcppackage2;
public class MyProtocol {
    private Integer length; //数据头:⻓度
    private byte[] body; //数据体
    public Integer getLength() {
        return length;
    }
    public void setLength(Integer length) {
        this.length = length;
    }
    public byte[] getBody() {
        return body;
    }
    public void setBody(byte[] body) {
        this.body = body;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 2.4.2、编解码器

编码器:

package cn.itcast.netty.tcppackage2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class MyEncoder extends MessageToByteEncoder < MyProtocol > {
    @Override protected void encode(ChannelHandlerContext ctx, MyProtocol msg, ByteBuf out) throws Exception {
        out.writeInt(msg.getLength());
        out.writeBytes(msg.getBody());
    }
}
1
2
3
4
5
6
7
8
9
10

解码器:

package cn.itcast.netty.tcppackage2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
public class MyDecoder extends ReplayingDecoder < Void > {
    @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in , List < Object > out) throws Exception {
        int length = in.readInt(); //获取⻓度 byte[] data = new byte[length]; //根据⻓度定义byte数组 in.readBytes(data); //读取数据
        MyProtocol myProtocol = new MyProtocol();
        myProtocol.setLength(length);
        myProtocol.setBody(data);
        out.add(myProtocol);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 2.4.3、客户端

package cn.itcast.netty.tcppackage2.client;
import cn.itcast.netty.tcppackage2.MyProtocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ClientHandler extends SimpleChannelInboundHandler < MyProtocol > {
    private int count;
    @Override protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) throws Exception {
        System.out.println("接收到服务端的消息:" + new String(msg.getBody(), CharsetUtil.UTF_8));
        System.out.println("接收到服务端的消息数量:" + (++count));
    }
    @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for(int i = 0; i < 10; i++) {
            byte[] data = "from client a message!".getBytes(CharsetUtil.UTF_8);
            MyProtocol myProtocol = new MyProtocol();
            myProtocol.setLength(data.length);
            myProtocol.setBody(data);
            ctx.writeAndFlush(myProtocol);
        }
    }
    @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package cn.itcast.netty.tcppackage2.client;
import cn.itcast.netty.tcppackage2.MyDecoder;
import cn.itcast.netty.tcppackage2.MyEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            // 服务器启动类 
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer < SocketChannel > () {
                @Override protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new MyEncoder());
                    ch.pipeline().addLast(new MyDecoder());
                    ch.pipeline().addLast(new ClientHandler());
                }
            });
            ChannelFuture future = bootstrap.connect("127.0.0.1", 5566).sync();
            future.channel().closeFuture().sync();
        } finally {
            worker.shutdownGracefully();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 1.5.2**、客户端**

NettyObjectClient:

package cn.itcast.netty.tcppackage2.client;
import cn.itcast.netty.tcppackage2.MyDecoder;
import cn.itcast.netty.tcppackage2.MyEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            // 服务器启动类
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer < SocketChannel > () {
                @Override protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new MyEncoder());
                    ch.pipeline().addLast(new MyDecoder());
                    ch.pipeline().addLast(new ClientHandler());
                }
            });
            ChannelFuture future = bootstrap.connect("127.0.0.1", 5566).sync();
            future.channel().closeFuture().sync();
        } finally {
            worker.shutdownGracefully();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 2.4.4、服务端

package cn.itcast.netty.tcppackage2.server;
import cn.itcast.netty.tcppackage2.MyProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ServerHandler extends SimpleChannelInboundHandler < MyProtocol > {
    private int count;
    @Override protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) throws Exception {
        System.out.println("服务端接收到消息:" + new String(msg.getBody(), CharsetUtil.UTF_8));
        System.out.println("服务端接收到消息数量:" + (++count));
        byte[] data = "ok".getBytes(CharsetUtil.UTF_8);
        MyProtocol myProtocol = new MyProtocol();
        myProtocol.setLength(data.length);
        myProtocol.setBody(data);
        ctx.writeAndFlush(myProtocol);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package cn.itcast.netty.tcppackage2.server;
import cn.itcast.netty.tcppackage2.MyDecoder;
import cn.itcast.netty.tcppackage2.MyEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
    public static void main(String[] args) throws Exception {
        // 主线程,不处理任何业务逻辑,只是接收客户的连接请求
        EventLoopGroup boss = new NioEventLoopGroup(1); // ⼯作线程,线程数默认是:cpu*2 
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            // 服务器启动类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker); //配置server通道 
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.childHandler(new ChannelInitializer < SocketChannel > () {
                @Override protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new MyDecoder()).addLast(new MyEncoder()).addLast(new ServerHandler());
                }
            }); //worker线程的处理器
            ChannelFuture future = serverBootstrap.bind(5566).sync();
            System.out.println("服务器启动完成。。。。。");
            //等待服务端监听端⼝关闭 
            future.channel().closeFuture().sync();
        } finally { //优雅关闭 
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 2.4.5、测试

#客户端 
接收到服务端的消息:ok 
接收到服务端的消息数量:1 
接收到服务端的消息:ok 
接收到服务端的消息数量:2 
接收到服务端的消息:ok 
接收到服务端的消息数量:3 
接收到服务端的消息:ok 
接收到服务端的消息数量:4 
接收到服务端的消息:ok 
接收到服务端的消息数量:5 
接收到服务端的消息:ok 
接收到服务端的消息数量:6 
接收到服务端的消息:ok 
接收到服务端的消息数量:7 
接收到服务端的消息:ok 
接收到服务端的消息数量:8 
接收到服务端的消息:ok 
接收到服务端的消息数量:9 
接收到服务端的消息:ok 
接收到服务端的消息数量:10
  
#服务端
服务器启动完成。。。。。 
服务端接收到消息:from client a message! 
服务端接收到消息数量:1 
服务端接收到消息:from client a message!
服务端接收到消息数量:2 
服务端接收到消息:from client a message!
服务端接收到消息数量:3 
服务端接收到消息:from client a message!
服务端接收到消息数量:4 
服务端接收到消息:from client a message!
服务端接收到消息数量:5 
服务端接收到消息:from client a message!
服务端接收到消息数量:6 
服务端接收到消息:from client a message!
服务端接收到消息数量:7 
服务端接收到消息:from client a message!
服务端接收到消息数量:8 
服务端接收到消息:from client a message!
服务端接收到消息数量:9 
服务端接收到消息:from client a message!
服务端接收到消息数量:10

#测试结果符合预期
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
上次更新: 2025/04/03, 11:07:08
Netty编解码器
自研RPC实战

← Netty编解码器 自研RPC实战→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式