跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • Spring Data JPA
  • MyBatis

  • Spring

  • SpringBoot

  • Jdk

  • Tomcat

  • Netty

    • 基础

    • 进阶

      • Netty编解码器
        • 1.1、什么是编解码器
        • 1.2、解码器
          • 1.2.1、案例
        • 1.3、编码器
          • 1.3.1、案例
        • 1.4、案例:开发http服务器
        • 1.5、对象的编解码
          • 1.5.1、服务端
          • 1.5.2、客户端
        • 1.6、Hessian编解码
          • 1.6.1、编解码器
          • 1.6.2、服务端
          • 1.6.3、客户端
      • TCP粘包拆包的问题及解决
      • 自研RPC实战
      • Netty核心源码剖析
      • Netty优化建议
    • 资料
  • 若依

  • Traefik

  • Openresty

  • 开源框架
  • Netty
  • 进阶
Revin
2023-06-17
目录

Netty编解码器

# 1.1、什么是编解码器

在网络中传输数据时,无论以什么的格式发送(int、String、Long等)都会以字节流的方式进行传递, 客户端将原来的格式数据转化为字节,称之为编码(encode),服务端将字节形式转化为原来的格式, 称之为解码(decode),编解码统称为codec。

编解码器包括编码器与解码器两部分,编码器负责出站数据操作,解码器负责入站数据操作。

# 1.2、解码器

解码器是负责入站的数据操作,那么解码器也一定实现了ChannelInboundHandler接口,所以编解码

器本质上也是ChannelHandler。 Netty中提供了ByteToMessageDecoder的抽象实现,自定义解码器只需要继承该类,实现decode()即

可。Netty也提供了一些常用的解码器实现,基本都是开箱即用的。比如:

  • RedisDecoder 基于Redis协议的解码器
  • XmlDecoder 基于XML格式的解码器
  • JsonObjectDecoder 基于json数据格式的解码器
  • HttpObjectDecoder 基于http协议的解码器

Netty也提供了MessageToMessageDecoder,将一种格式转化为另一种格式的解码器,也提供了一些 实现:

  • StringDecoder 将接收到ByteBuf转化为字符串

  • ByteArrayDecoder 将接收到ByteBuf转化字节数组

  • Base64Decoder 将由ByteBuf或US-ASCII字符串编码的Base64解码为ByteBuf。

# 1.2.1、案例

将传入的字节流转化为Integer类型。

package cn.itcast.netty.codec;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;

public class ByteToIntegerDecoder extends ByteToMessageDecoder {
/** *
* @param ctx 上下文
* @param in 输入的ByteBuf消息数据
* @param out 转化后输出的容器 * @throws Exception
*/
@Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object>
out) throws Exception {
  if(in.readableBytes() >= 4){ //int类型占用4个字节,所以需要判断是否存在有4个字 节,再进行读取
    out.add(in.readInt()); //读取到int类型数据,放入到输出,完成数据类型的转化
  } 
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

在Handler中使用:

package cn.itcast.netty.codec;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
      Integer i = (Integer) msg; //这里可以直接拿到Integer类型的数据
      System.out.println("服务端接收到的消息为:" + i);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

在pipeline中添加解码器:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline()
      .addLast(new ByteToIntegerDecoder())
			.addLast(new ServerHandler());
}
1
2
3
4
5
6

# 1.3、编码器

编码器与解码器是相反的操作,将原有的格式转化为字节的过程,在Netty中提供了 MessageToByteEncoder的抽象实现,它实现了ChannelOutboundHandler接口,本质上也是 ChannelHandler。

一些实现类:

  • ObjectEncoder 将对象(需要实现Serializable接口)编码为字节流
  • SocksMessageEncoder 将SocksMessage编码为字节流
  • HAProxyMessageEncoder 将HAProxyMessage编码成字节流

Netty也提供了MessageToMessageEncoder,将一种格式转化为另一种格式的编码器,也提供了一些 实现:

  • RedisEncoder 将Redis协议的对象进行编码
  • StringEncoder 将字符串进行编码操作
  • Base64Encoder 将Base64字符串进行编码操作

# 1.3.1、案例

将Integer类型编码为字节进行传递。

自定义编码器:

package cn.itcast.netty.codec.client;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> {

   @Override
    protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out)
throws Exception {
        out.writeInt(msg);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

在Handler直接输出数字即可:

package cn.itcast.netty.codec.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

   @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws
Exception {
	System.out.println("接收到服务端的消息:" + msg.toString(CharsetUtil.UTF_8));
}
  
		@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(123);
}
  
	@Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

在pipeline中添加编码器:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(new IntegerToByteEncoder());
    ch.pipeline().addLast(new ClientHandler());
}
1
2
3
4
5

# 1.4、案例:开发http服务器

在Netty中提供了http的解码器,我们通过该解码器进行http服务器的开发。

实现效果:

01

Server:

package cn.itcast.netty.codec.http;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;

public class NettyHttpServer {
    public static void main(String[] args) throws Exception {
() {
  
	// 主线程,不处理任何业务逻辑,只是接收客户的连接请求 
  EventLoopGroup boss = new NioEventLoopGroup(1); 
  // 工作线程,线程数默认是:cpu*2
	EventLoopGroup worker = new NioEventLoopGroup();
  
try {
  // 服务器启动类
  ServerBootstrap serverBootstrap = new ServerBootstrap(); 	  	
  serverBootstrap.group(boss, worker);
  //配置server通道 
  serverBootstrap.channel(NioServerSocketChannel.class); 
  serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  				@Override
          protected void initChannel(SocketChannel ch) throws Exception {
              ch.pipeline()
                .addLast(new HttpRequestDecoder()) //http请求的解码器 
                //将http请求中的uri以及请求体聚合成一个完整的FullHttpRequest对象
                .addLast(new HttpObjectAggregator(1024 * 128)) 
                .addLast(new HttpResponseEncoder()) //http响应的编码器
								.addLast(new ChunkedWriteHandler()) //支持异步的大文件传输,防止内存溢出
                
                .addLast(new ServerHandler()); 
          	}
          }); //worker线程的处理器
					
  			ChannelFuture future = serverBootstrap.bind(8080).sync(); 				
  			System.out.println("服务器启动完成。。。。。");

  			//等待服务端监听端口关闭
    		future.channel().closeFuture().sync();
      } finally {
  			//优雅关闭 
  			boss.shutdownGracefully(); 
  			worker.shutdownGracefully();
      }
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

ServerHandler:

package cn.itcast.netty.codec.http;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import java.util.Map;

public class ServerHandler extends SimpleChannelInboundHandler<FullHttpRequest>
{
    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest
request) throws Exception {
      
    //解析FullHttpRequest,得到请求参数
    Map<String, String> paramMap = new RequestParser(request).parse();
      String name = paramMap.get("name");

    //构造响应对象
      FullHttpResponse httpResponse = new
DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,
"text/html;charset=utf-8");

      StringBuilder sb = new StringBuilder();
      sb.append("<h1>");
      sb.append("你好," + name);
      sb.append("</h1>");
        
      httpResponse.content().writeBytes(Unpooled.copiedBuffer(sb,
CharsetUtil.UTF_8));

      ctx.writeAndFlush(httpResponse) .addListener(ChannelFutureListener.CLOSE); //操作完成后,将channel关闭 
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

RequestParser:

package cn.itcast.netty.codec.http;

import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* HTTP请求参数解析器, 支持GET, POST 
*/
public class RequestParser {
    private FullHttpRequest fullReq;
  
    /**
    * 构造一个解析器 
    * @param req 
    */
    public RequestParser(FullHttpRequest req) {
        this.fullReq = req;
		}
  
  /**
  * 解析请求参数
  * @return 包含所有请求参数的键值对, 如果没有参数, 则返回空Map *
  * @throws IOException
  */
    public Map<String, String> parse() throws Exception {
        HttpMethod method = fullReq.method();
      
        Map<String, String> parmMap = new HashMap<>();
      
				if (HttpMethod.GET == method) {
          	// 是GET请求
            QueryStringDecoder decoder = new QueryStringDecoder(fullReq.uri());
            decoder.parameters().entrySet().forEach( entry -> {
                // entry.getValue()是一个List, 只取第一个元素
                parmMap.put(entry.getKey(), entry.getValue().get(0));
            });
				} else if (HttpMethod.POST == method) { 
          // 是POST请求
          HttpPostRequestDecoder decoder = new
HttpPostRequestDecoder(fullReq);
            decoder.offer(fullReq);
          
            List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas();
          
            for (InterfaceHttpData parm : parmList) {
                Attribute data = (Attribute) parm;
                parmMap.put(data.getName(), data.getValue());
            }
        } else {
        // 不支持其它方法
        throw new RuntimeException("不支持其它方法"); // 这是个自定义的异常, 可删掉这一行
        }
         return parmMap;
		}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

# 1.5、对象的编解码

对于JavaBean对象,Netty也支持了Object对象的编解码,其实也就是对象的序列化,要求java对象需 要java.io.Serializable接口。

定义javabean对象:

package cn.itcast.netty.codec.obj;
public class User implements java.io.Serializable {
    private static final long serialVersionUID = -89217070354741790L;
    private Long id;
    private String name;
    private Integer age;
  
    public Long getId() {
        return id;
		}
  
    public void setId(Long id) {
        this.id = id;
		}
  
    public String getName() {
        return name;
		}
  
    public void setName(String name) {
        this.name = name;
		}
  
    public Integer getAge() {
        return age;
		}
  
    public void setAge(Integer age) {
        this.age = age;
		}

		@Override
    public String toString() {
        return "User{" +
          "id=" + id +
          ", name='" + name + '\'' +
          ", age=" + age +
          '}';
	} 
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

# 1.5.1、服务端

NettyObjectServer:

package cn.itcast.netty.codec.obj;

import io.netty.bootstrap.ServerBootstrap; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioServerSocketChannel; 
import io.netty.handler.codec.serialization.ClassResolvers; 
import io.netty.handler.codec.serialization.ObjectDecoder;

public class NettyObjectServer {

public static void main(String[] args) throws Exception {

// 主线程,不处理任何业务逻辑,只是接收客户的连接请求 
  EventLoopGroup boss = new NioEventLoopGroup(1); 
  // ⼯作线程,线程数默认是:cpu*2 
  EventLoopGroup worker = new NioEventLoopGroup();

try {

// 服务器启动类 
  ServerBootstrap serverBootstrap = new ServerBootstrap(); 
  serverBootstrap.group(boss, worker); //配置server通道 
  serverBootstrap.channel(NioServerSocketChannel.class); 
  serverBootstrap.childHandler(new ChannelInitializer<SocketChannel> () {


    @Override 
    protected void initChannel(SocketChannel ch) throws Exception { 
      
      ch.pipeline() 
      .addLast(new ObjectDecoder(ClassResolvers.weakCachingResolver(this.getClass().getClassLoader())))
      .addLast(new ServerHandler()); } }); //worker线程的处理器


  
  ChannelFuture future = serverBootstrap.bind(6677).sync(); 
  System.out.println("服务器启动完成。。。。。");


  //等待服务端监听端⼝关闭 
  future.channel().closeFuture().sync(); } finally { 
  //优雅关闭 
  boss.shutdownGracefully(); 
  worker.shutdownGracefully(); 
	} 
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

ServerHandler:

package cn.itcast.netty.codec.obj;

import io.netty.buffer.Unpooled; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.SimpleChannelInboundHandler; 
import io.netty.util.CharsetUtil;

public class ServerHandler extends SimpleChannelInboundHandler<User> {

@Override public void channelRead0(ChannelHandlerContext ctx, User user) throws Exception { 
  
  //获取到user对象 
  System.out.println(user);

  ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));

	}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 1.5.2、客户端

NettyObjectClient:

package cn.itcast.netty.codec.obj;

import io.netty.bootstrap.Bootstrap; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioSocketChannel; 
import io.netty.handler.codec.serialization.ObjectEncoder;

public class NettyObjectClient {

public static void main(String[] args) throws Exception{ E
  ventLoopGroup worker = new NioEventLoopGroup();

      try { // 服务器启动类
        Bootstrap bootstrap = new Bootstrap(); bootstrap.group(worker); 
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {

             @Override 
             protected void initChannel(SocketChannel ch) throws Exception {

               ch.pipeline().addLast(new ObjectEncoder());
               ch.pipeline().addLast(new ClientHandler());
             }

        });

        ChannelFuture future = bootstrap.connect("127.0.0.1", 6677).sync();

        future.channel().closeFuture().sync(); }


      finally {

        worker.shutdownGracefully();
      }
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

ClientHandler:

package cn.itcast.netty.codec.obj;

import io.netty.buffer.ByteBuf; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.SimpleChannelInboundHandler; 
import io.netty.util.CharsetUtil;

public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

  @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
    System.out.println("接收到服务端的消息:" + msg.toString(CharsetUtil.UTF_8)); }

  @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
    User user = new User(); 
    user.setId(1L); user.setName("张三");
    user.setAge(20);
    ctx.writeAndFlush(user);

  }

@Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
    cause.printStackTrace();
    ctx.close(); 
  } 
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 1.6、Hessian编解码

JDK序列化使⽤是⽐较⽅便,但是它的性能较差,序列化后的字节⼤⼩也⽐较⼤,所以⼀般在项⽬中不 会使⽤⾃带的序列化,⽽是会采⽤第三⽅的序列化框架。

我们以Hessian为例,演示下如何与Netty整合进⾏编解码处理。

导⼊Hessian依赖:

<dependency> 
  <groupId>com.caucho</groupId> 
  <artifactId>hessian</artifactId> 
  <version>4.0.63</version> 
</dependency>
1
2
3
4
5

User对象:

package cn.itcast.netty.codec.hessian;
public class User implements java.io.Serializable {
    private static final long serialVersionUID = -8200798627910162221 L;
    private Long id;
    private String name;
    private Integer age;
    public Long getId() {
        return id;
    }
    public void setId(Long id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public Integer getAge() {
        return age;
    }
    public void setAge(Integer age) {
        this.age = age;
    }
    @Override public String toString() {
        return "User{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + '}';
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 1.6.1、编解码器

Hessian序列化⼯具类:

package cn.itcast.netty.codec.hessian.codec;
import com.caucho.hessian.io.HessianInput;
import com.caucho.hessian.io.HessianOutput;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/** 
* Hessian序列化⼯具类 
* 
*/
public class HessianSerializer {
    public < T > byte[] serialize(T obj) {
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        HessianOutput ho = new HessianOutput(os);
        try {
            ho.writeObject(obj);
            ho.flush();
            return os.toByteArray();
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                ho.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            try {
                os.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
    public < T > Object deserialize(byte[] bytes, Class < T > clazz) {
        ByteArrayInputStream is = new ByteArrayInputStream(bytes);
        HessianInput hi = new HessianInput(is);
        try {
            return (T) hi.readObject(clazz);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                hi.close();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            try {
                is.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

编码器:

package cn.itcast.netty.codec.hessian.codec;
import cn.itcast.netty.coder.hessian.User;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class HessianEncoder extends MessageToByteEncoder < User > {
    private HessianSerializer hessianSerializer = new HessianSerializer();
    protected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out) throws Exception {
        byte[] bytes = hessianSerializer.serialize(msg);
        out.writeBytes(bytes);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

解码器:

package cn.itcast.netty.codec.hessian.codec;
import cn.itcast.netty.coder.hessian.User;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class HessianDecoder extends ByteToMessageDecoder {
    private HessianSerializer hessianSerializer = new HessianSerializer();
    protected void decode(ChannelHandlerContext ctx, ByteBuf in , List < Object > out) throws Exception { 
      //复制⼀份ByteBuf数据,轻复制,⾮完全拷⻉ 
      //避免出现异常:did not read anything but decoded a message 
      //Netty检测没有读取任何字节就会抛出该异常 
      ByteBuf in2 = in.retainedDuplicate();
        byte[] dst;
        if(in2.hasArray()) { //堆缓冲区模式
            dst = in2.array();
        } else {
            dst = new byte[in2.readableBytes()];
            in2.getBytes(in2.readerIndex(), dst);
        }
        in.skipBytes(in.readableBytes()); //跳过所有的字节,表示已经读取过了
        Object obj = hessianSerializer.deserialize(dst, User.class); //反序列化 
      out.add(obj);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 1.6.2、服务端

package cn.itcast.netty.codec.hessian;

import cn.itcast.netty.coder.hessian.codec.HessianDecoder;
import cn.itcast.netty.coder.hessian.handler.ServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyHessianServer {
    public static void main(String[] args) throws Exception {
        //System.setProperty("io.netty.noUnsafe", "true");
            
      // 主线程,不处理任何业务逻辑,只是接收客户的连接请求 
      EventLoopGroup boss = new NioEventLoopGroup(1); 
      // ⼯作线程,线程数默认是:cpu*2
      EventLoopGroup worker = new NioEventLoopGroup();
            try {
                // 服务器启动类 
              ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker); //配置server通道 
              serverBootstrap.channel(NioServerSocketChannel.class); 
              serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>
        () {               
                @Override 
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new HessianDecoder()).addLast(new ServerHandler());
                }
            }); //worker线程的处理器

        // serverBootstrap.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
              
        ChannelFuture future = serverBootstrap.bind(6677).sync();
        System.out.println("服务器启动完成。。。。。");
              
        //等待服务端监听端⼝关闭 
              future.channel().closeFuture().sync(); 
            } finally {
              //优雅关闭 
              boss.shutdownGracefully();
              worker.shutdownGracefully(); 
            }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package cn.itcast.netty.codec.hessian.handler;

import cn.itcast.netty.coder.hessian.User; 
import io.netty.buffer.Unpooled; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.SimpleChannelInboundHandler; 
import io.netty.util.CharsetUtil;

public class ServerHandler extends SimpleChannelInboundHandler<User> {

	@Override 
  public void channelRead0(ChannelHandlerContext ctx, User user) throws Exception { 
    //获取到user对象 
    System.out.println(user);

		ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 1.6.3、客户端

package cn.itcast.netty.codec.hessian;

import cn.itcast.netty.coder.hessian.codec.HessianEncoder; 
import cn.itcast.netty.coder.hessian.handler.ClientHandler; 
import io.netty.bootstrap.Bootstrap; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioSocketChannel; 
import io.netty.handler.codec.serialization.ObjectEncoder;

public class NettyHessianClient {

public static void main(String[] args) throws Exception{
  EventLoopGroup worker = new NioEventLoopGroup();

try {

	// 服务器启动类 
  Bootstrap bootstrap = new Bootstrap(); 
  bootstrap.group(worker); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(new HessianEncoder());
    ch.pipeline().addLast(new ClientHandler());
    }
	});

		ChannelFuture future = bootstrap.connect("127.0.0.1", 6677).sync();
		future.channel().closeFuture().sync(); } 
  finally {
		worker.shutdownGracefully();
   }
 }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.itcast.netty.codec.hessian.handler;

import cn.itcast.netty.coder.hessian.User;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class ClientHandler extends SimpleChannelInboundHandler < ByteBuf > {
  
    @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("接收到服务端的消息:" + msg.toString(CharsetUtil.UTF_8));
    }
  
    @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
        User user = new User();
        user.setId(1 L);
        user.setName("张三");
        user.setAge(20);
        ctx.writeAndFlush(user);
    }
  
    @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
上次更新: 2025/04/03, 11:07:08
详解ByteBuf
TCP粘包拆包的问题及解决

← 详解ByteBuf TCP粘包拆包的问题及解决→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式