Netty编解码器
# 1.1、什么是编解码器
在网络中传输数据时,无论以什么的格式发送(int、String、Long等)都会以字节流的方式进行传递, 客户端将原来的格式数据转化为字节,称之为编码(encode),服务端将字节形式转化为原来的格式, 称之为解码(decode),编解码统称为codec。
编解码器包括编码器与解码器两部分,编码器负责出站数据操作,解码器负责入站数据操作。
# 1.2、解码器
解码器是负责入站的数据操作,那么解码器也一定实现了ChannelInboundHandler接口,所以编解码
器本质上也是ChannelHandler。 Netty中提供了ByteToMessageDecoder的抽象实现,自定义解码器只需要继承该类,实现decode()即
可。Netty也提供了一些常用的解码器实现,基本都是开箱即用的。比如:
- RedisDecoder 基于Redis协议的解码器
- XmlDecoder 基于XML格式的解码器
- JsonObjectDecoder 基于json数据格式的解码器
- HttpObjectDecoder 基于http协议的解码器
Netty也提供了MessageToMessageDecoder,将一种格式转化为另一种格式的解码器,也提供了一些 实现:
StringDecoder 将接收到ByteBuf转化为字符串
ByteArrayDecoder 将接收到ByteBuf转化字节数组
Base64Decoder 将由ByteBuf或US-ASCII字符串编码的Base64解码为ByteBuf。
# 1.2.1、案例
将传入的字节流转化为Integer类型。
package cn.itcast.netty.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class ByteToIntegerDecoder extends ByteToMessageDecoder {
/** *
* @param ctx 上下文
* @param in 输入的ByteBuf消息数据
* @param out 转化后输出的容器 * @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object>
out) throws Exception {
if(in.readableBytes() >= 4){ //int类型占用4个字节,所以需要判断是否存在有4个字 节,再进行读取
out.add(in.readInt()); //读取到int类型数据,放入到输出,完成数据类型的转化
}
}
在Handler中使用:
package cn.itcast.netty.codec;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
Integer i = (Integer) msg; //这里可以直接拿到Integer类型的数据
System.out.println("服务端接收到的消息为:" + i);
}
}
在pipeline中添加解码器:
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new ByteToIntegerDecoder())
.addLast(new ServerHandler());
}
# 1.3、编码器
编码器与解码器是相反的操作,将原有的格式转化为字节的过程,在Netty中提供了 MessageToByteEncoder的抽象实现,它实现了ChannelOutboundHandler接口,本质上也是 ChannelHandler。
一些实现类:
- ObjectEncoder 将对象(需要实现Serializable接口)编码为字节流
- SocksMessageEncoder 将SocksMessage编码为字节流
- HAProxyMessageEncoder 将HAProxyMessage编码成字节流
Netty也提供了MessageToMessageEncoder,将一种格式转化为另一种格式的编码器,也提供了一些 实现:
- RedisEncoder 将Redis协议的对象进行编码
- StringEncoder 将字符串进行编码操作
- Base64Encoder 将Base64字符串进行编码操作
# 1.3.1、案例
将Integer类型编码为字节进行传递。
自定义编码器:
package cn.itcast.netty.codec.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> {
@Override
protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out)
throws Exception {
out.writeInt(msg);
}
}
在Handler直接输出数字即可:
package cn.itcast.netty.codec.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws
Exception {
System.out.println("接收到服务端的消息:" + msg.toString(CharsetUtil.UTF_8));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(123);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}
在pipeline中添加编码器:
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IntegerToByteEncoder());
ch.pipeline().addLast(new ClientHandler());
}
# 1.4、案例:开发http服务器
在Netty中提供了http的解码器,我们通过该解码器进行http服务器的开发。
实现效果:
Server:
package cn.itcast.netty.codec.http;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
public class NettyHttpServer {
public static void main(String[] args) throws Exception {
() {
// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
EventLoopGroup boss = new NioEventLoopGroup(1);
// 工作线程,线程数默认是:cpu*2
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 服务器启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
//配置server通道
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new HttpRequestDecoder()) //http请求的解码器
//将http请求中的uri以及请求体聚合成一个完整的FullHttpRequest对象
.addLast(new HttpObjectAggregator(1024 * 128))
.addLast(new HttpResponseEncoder()) //http响应的编码器
.addLast(new ChunkedWriteHandler()) //支持异步的大文件传输,防止内存溢出
.addLast(new ServerHandler());
}
}); //worker线程的处理器
ChannelFuture future = serverBootstrap.bind(8080).sync();
System.out.println("服务器启动完成。。。。。");
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
ServerHandler:
package cn.itcast.netty.codec.http;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import java.util.Map;
public class ServerHandler extends SimpleChannelInboundHandler<FullHttpRequest>
{
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest
request) throws Exception {
//解析FullHttpRequest,得到请求参数
Map<String, String> paramMap = new RequestParser(request).parse();
String name = paramMap.get("name");
//构造响应对象
FullHttpResponse httpResponse = new
DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,
"text/html;charset=utf-8");
StringBuilder sb = new StringBuilder();
sb.append("<h1>");
sb.append("你好," + name);
sb.append("</h1>");
httpResponse.content().writeBytes(Unpooled.copiedBuffer(sb,
CharsetUtil.UTF_8));
ctx.writeAndFlush(httpResponse) .addListener(ChannelFutureListener.CLOSE); //操作完成后,将channel关闭
}
}
RequestParser:
package cn.itcast.netty.codec.http;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* HTTP请求参数解析器, 支持GET, POST
*/
public class RequestParser {
private FullHttpRequest fullReq;
/**
* 构造一个解析器
* @param req
*/
public RequestParser(FullHttpRequest req) {
this.fullReq = req;
}
/**
* 解析请求参数
* @return 包含所有请求参数的键值对, 如果没有参数, 则返回空Map *
* @throws IOException
*/
public Map<String, String> parse() throws Exception {
HttpMethod method = fullReq.method();
Map<String, String> parmMap = new HashMap<>();
if (HttpMethod.GET == method) {
// 是GET请求
QueryStringDecoder decoder = new QueryStringDecoder(fullReq.uri());
decoder.parameters().entrySet().forEach( entry -> {
// entry.getValue()是一个List, 只取第一个元素
parmMap.put(entry.getKey(), entry.getValue().get(0));
});
} else if (HttpMethod.POST == method) {
// 是POST请求
HttpPostRequestDecoder decoder = new
HttpPostRequestDecoder(fullReq);
decoder.offer(fullReq);
List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas();
for (InterfaceHttpData parm : parmList) {
Attribute data = (Attribute) parm;
parmMap.put(data.getName(), data.getValue());
}
} else {
// 不支持其它方法
throw new RuntimeException("不支持其它方法"); // 这是个自定义的异常, 可删掉这一行
}
return parmMap;
}
}
# 1.5、对象的编解码
对于JavaBean对象,Netty也支持了Object对象的编解码,其实也就是对象的序列化,要求java对象需 要java.io.Serializable接口。
定义javabean对象:
package cn.itcast.netty.codec.obj;
public class User implements java.io.Serializable {
private static final long serialVersionUID = -89217070354741790L;
private Long id;
private String name;
private Integer age;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}
# 1.5.1、服务端
NettyObjectServer:
package cn.itcast.netty.codec.obj;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
public class NettyObjectServer {
public static void main(String[] args) throws Exception {
// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
EventLoopGroup boss = new NioEventLoopGroup(1);
// ⼯作线程,线程数默认是:cpu*2
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 服务器启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker); //配置server通道
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel> () {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new ObjectDecoder(ClassResolvers.weakCachingResolver(this.getClass().getClassLoader())))
.addLast(new ServerHandler()); } }); //worker线程的处理器
ChannelFuture future = serverBootstrap.bind(6677).sync();
System.out.println("服务器启动完成。。。。。");
//等待服务端监听端⼝关闭
future.channel().closeFuture().sync(); } finally {
//优雅关闭
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
ServerHandler:
package cn.itcast.netty.codec.obj;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ServerHandler extends SimpleChannelInboundHandler<User> {
@Override public void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
//获取到user对象
System.out.println(user);
ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
}
}
# 1.5.2、客户端
NettyObjectClient:
package cn.itcast.netty.codec.obj;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class NettyObjectClient {
public static void main(String[] args) throws Exception{ E
ventLoopGroup worker = new NioEventLoopGroup();
try { // 服务器启动类
Bootstrap bootstrap = new Bootstrap(); bootstrap.group(worker);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", 6677).sync();
future.channel().closeFuture().sync(); }
finally {
worker.shutdownGracefully();
}
}
}
ClientHandler:
package cn.itcast.netty.codec.obj;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("接收到服务端的消息:" + msg.toString(CharsetUtil.UTF_8)); }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
User user = new User();
user.setId(1L); user.setName("张三");
user.setAge(20);
ctx.writeAndFlush(user);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
# 1.6、Hessian编解码
JDK序列化使⽤是⽐较⽅便,但是它的性能较差,序列化后的字节⼤⼩也⽐较⼤,所以⼀般在项⽬中不 会使⽤⾃带的序列化,⽽是会采⽤第三⽅的序列化框架。
我们以Hessian为例,演示下如何与Netty整合进⾏编解码处理。
导⼊Hessian依赖:
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.63</version>
</dependency>
User对象:
package cn.itcast.netty.codec.hessian;
public class User implements java.io.Serializable {
private static final long serialVersionUID = -8200798627910162221 L;
private Long id;
private String name;
private Integer age;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override public String toString() {
return "User{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + '}';
}
}
# 1.6.1、编解码器
Hessian序列化⼯具类:
package cn.itcast.netty.codec.hessian.codec;
import com.caucho.hessian.io.HessianInput;
import com.caucho.hessian.io.HessianOutput;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* Hessian序列化⼯具类
*
*/
public class HessianSerializer {
public < T > byte[] serialize(T obj) {
ByteArrayOutputStream os = new ByteArrayOutputStream();
HessianOutput ho = new HessianOutput(os);
try {
ho.writeObject(obj);
ho.flush();
return os.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
ho.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
os.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public < T > Object deserialize(byte[] bytes, Class < T > clazz) {
ByteArrayInputStream is = new ByteArrayInputStream(bytes);
HessianInput hi = new HessianInput(is);
try {
return (T) hi.readObject(clazz);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
hi.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
is.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
编码器:
package cn.itcast.netty.codec.hessian.codec;
import cn.itcast.netty.coder.hessian.User;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class HessianEncoder extends MessageToByteEncoder < User > {
private HessianSerializer hessianSerializer = new HessianSerializer();
protected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out) throws Exception {
byte[] bytes = hessianSerializer.serialize(msg);
out.writeBytes(bytes);
}
}
解码器:
package cn.itcast.netty.codec.hessian.codec;
import cn.itcast.netty.coder.hessian.User;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class HessianDecoder extends ByteToMessageDecoder {
private HessianSerializer hessianSerializer = new HessianSerializer();
protected void decode(ChannelHandlerContext ctx, ByteBuf in , List < Object > out) throws Exception {
//复制⼀份ByteBuf数据,轻复制,⾮完全拷⻉
//避免出现异常:did not read anything but decoded a message
//Netty检测没有读取任何字节就会抛出该异常
ByteBuf in2 = in.retainedDuplicate();
byte[] dst;
if(in2.hasArray()) { //堆缓冲区模式
dst = in2.array();
} else {
dst = new byte[in2.readableBytes()];
in2.getBytes(in2.readerIndex(), dst);
}
in.skipBytes(in.readableBytes()); //跳过所有的字节,表示已经读取过了
Object obj = hessianSerializer.deserialize(dst, User.class); //反序列化
out.add(obj);
}
}
# 1.6.2、服务端
package cn.itcast.netty.codec.hessian;
import cn.itcast.netty.coder.hessian.codec.HessianDecoder;
import cn.itcast.netty.coder.hessian.handler.ServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyHessianServer {
public static void main(String[] args) throws Exception {
//System.setProperty("io.netty.noUnsafe", "true");
// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
EventLoopGroup boss = new NioEventLoopGroup(1);
// ⼯作线程,线程数默认是:cpu*2
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 服务器启动类
ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker); //配置server通道
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>
() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HessianDecoder()).addLast(new ServerHandler());
}
}); //worker线程的处理器
// serverBootstrap.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
ChannelFuture future = serverBootstrap.bind(6677).sync();
System.out.println("服务器启动完成。。。。。");
//等待服务端监听端⼝关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
package cn.itcast.netty.codec.hessian.handler;
import cn.itcast.netty.coder.hessian.User;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ServerHandler extends SimpleChannelInboundHandler<User> {
@Override
public void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
//获取到user对象
System.out.println(user);
ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
}
}
# 1.6.3、客户端
package cn.itcast.netty.codec.hessian;
import cn.itcast.netty.coder.hessian.codec.HessianEncoder;
import cn.itcast.netty.coder.hessian.handler.ClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class NettyHessianClient {
public static void main(String[] args) throws Exception{
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 服务器启动类
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HessianEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", 6677).sync();
future.channel().closeFuture().sync(); }
finally {
worker.shutdownGracefully();
}
}
}
package cn.itcast.netty.codec.hessian.handler;
import cn.itcast.netty.coder.hessian.User;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ClientHandler extends SimpleChannelInboundHandler < ByteBuf > {
@Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("接收到服务端的消息:" + msg.toString(CharsetUtil.UTF_8));
}
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
User user = new User();
user.setId(1 L);
user.setName("张三");
user.setAge(20);
ctx.writeAndFlush(user);
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}