分布式架构网络通信
在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现远程通讯的技术,例如:RMI、Hessian、SOAP、ESB和JMS等,它们背后到底是基于什么原理实现的呢
# 1 基本原理
要实现网络机器间的通讯,首先得来看看计算机系统网络通信的基本原理,在底层层面去看,网络通信需要做的就是将流从一台计算机传输到另外一台计算机,基于传输协议和网络IO来实现,其中传输协议比较出名的有tcp、udp等等,tcp、udp都是在基于Socket概念上为某类应用场景而扩展出的传输协议,网络IO,主要有bio、nio、aio三种方式,所有的分布式应用通讯都基于这个原理而实现,只是为了应用的易用,各种语言通常都会提供一些更为贴近应用易用的应用层协议。
# 2 什么是RPC
RPC全称为remote procedure call,即远程过程调用。
借助RPC可以做到像本地调用一样调用远程服务,是一种进程间的通信方式
比如两台服务器A和B,A服务器上部署一个应用,B服务器上部署一个应用,A服务器上的应用想调用B服务器上的应用提供的方法,由于两个应用不在一个内存空间,不能直接调用,所以需要通过网络来表达调用的语义和传达调用的数据。
需要注意的是RPC并不是一个具体的技术,而是指整个网络远程调用过程。
RPC架构
一个完整的RPC架构里面包含了四个核心的组件,分别是Client,Client Stub,Server以及Server Stub,这个Stub可以理解为存根。
客户端(Client),服务的调用方。
客户端存根(Client Stub),存放服务端的地址消息,再将客户端的请求参数打包成网络消息,然后通过网络远程发送给服务方。
服务端(Server),真正的服务提供者。
服务端存根(Server Stub),接收客户端发送过来的消息,将消息解包,并调用本地的方法。
(1) 客户端(client)以本地调用方式(即以接口的方式)调用服务;
(2) 客户端存根(client stub)接收到调用后,负责将方法、参数等组装成能够进行网络传输的消息体(将消息体对象序列化为二进制);
(3) 客户端通过sockets将消息发送到服务端;
(4) 服务端存根( server stub)收到消息后进行解码(将消息对象反序列化);
(5) 服务端存根( server stub)根据解码结果调用本地的服务;
(6) 本地服务执行并将结果返回给服务端存根( server stub);
(7) 服务端存根( server stub)将返回结果打包成消息(将结果消息对象序列化);
(8) 服务端(server)通过sockets将消息发送到客户端;
(9) 客户端存根(client stub)接收到结果消息,并进行解码(将结果消息发序列化);
(10) 客户端(client)得到最终结果。
RPC的目标是要把2、3、4、7、8、9这些步骤都封装起来。
注意:无论是何种类型的数据,最终都需要转换成二进制流在网络上进行传输,数据的发送方需要将对象转换为二进制流,而数据的接收方则需要把二进制流再恢复为对象。
在java中RPC框架比较多,常见的有Hessian、gRPC、Thrix、HSF (High Speed Service Framework)、Dubbo 等,其实对 于RPC框架而言,核心模块 就是通讯和序列化
# 3 RMI
# 3.1 简介
Java RMI 指的是远程方法调用 (Remote Method Invocation),是java原生支持的远程调用 ,采用JRMP(Java Remote Messageing protocol)作为通信协议,可以认为是纯java版本的分布式远程调用解决方案, RMI主要用于不同虚拟机之间的通信,这些虚拟机可以在不同的主机上、也可以在同一个主机上,这里的通信可以理解为一个虚拟机上的对象调用另一个虚拟机上对象的方法。
1.客户端:
1)存根/桩(Stub):远程对象在客户端上的代理;
2)远程引用层(Remote Reference Layer):解析并执行远程引用协议;
3)传输层(Transport):发送调用、传递远程方法参数、接收远程方法执行结果。
2.服务端:
1)骨架(Skeleton):读取客户端传递的方法参数,调用服务器方的实际对象方法,并接收方法执行后的返回值;
2)远程引用层(Remote Reference Layer):处理远程引用后向骨架发送远程方法调用;
3)传输层(Transport):监听客户端的入站连接,接收并转发调用到远程引用层。
3.注册表(Registry):以URL形式注册远程对象,并向客户端回复对远程对象的引用。
远程调用过程:
1)客户端从远程服务器的注册表中查询并获取远程对象引用。
2)桩对象与远程对象具有相同的接口和方法列表,当客户端调用远程对象时,实际上是由相应的桩对象代理完成的。
3) 远程引用层在将桩的本地引用转换为服务器上对象的远程引用后,再将调用传递给传输层(Transport),由传输层通过TCP协议发送调用;
4)在服务器端,传输层监听入站连接,它一旦接收到客户端远程调用后,就将这个引用转发给其上层的远程引用层;
5)服务器端的远程引用层将客户端发送的远程应用转换为本地虚拟机的引用后,再将请求传递给骨架(Skeleton);
6)骨架读取参数,又将请求传递给服务器,最后由服务器进行实际的方法调用。
结果返回过程:
1)如果远程方法调用后有返回值,则服务器将这些结果又沿着“骨架->远程引用层->传输层”向下传递;
2)客户端的传输层接收到返回值后,又沿着“传输层->远程引用层->桩”向上传递,然后由桩来反序列化这些返回值,并将最终的结果传递给客户端程序。
# 3.2 开发流程
1.服务端:
1)定义Remote子接口,在其内部定义要发布的远程方法,并且这些方法都要Throws RemoteException;
2)定义实现远程接口,并且继承:UnicastRemoteObject
3)启动服务器:依次完成注册表的启动和远程对象绑定。
2.客户端:
1)通过符合JRMP规范的URL字符串在注册表中获取并强转成Remote子接口对象;
2)调用这个Remote子接口对象中的某个方法就是为一次远程方法调用行为。
# 3.3 代码实现
1.创建远程接口
import java.rmi.Remote;
import java.rmi.RemoteException;
/**
* 远程服务对象接口必须继承Remote接口;同时方法必须抛出RemoteExceptino异常
*/
public interface Hello extends Remote {
public String sayHello(User user) throws RemoteException;
}
其中有一个引用对象作为参数
import java.io.Serializable;
/**
* 引用对象应该是可序列化对象,这样才能在远程调用的时候:1. 序列化对象 2. 拷贝 3. 在网络中传输
* 4. 服务端反序列化 5. 获取参数进行方法调用; 这种方式其实是将远程对象引用传递的方式转化为值传递的方式
*/
public class User implements Serializable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
- 实现远程服务对象
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
/**
* 远程服务对象实现类写在服务端;必须继承UnicastRemoteObject或其子类
**/
public class HelloImpl extends UnicastRemoteObject implements Hello {
/**
* 因为UnicastRemoteObject的构造方法抛出了RemoteException异常,因此这里默认的构造方法必须写,必须声明抛出RemoteException异常
*
* @throws RemoteException
*/
private static final long serialVersionUID = 3638546195897885959 L;
protected HelloImpl() throws RemoteException {
super();
// TODO Auto-generated constructor stub
}
@Override
public String sayHello(User user) throws RemoteException {
System.out.println("this is server, hello:" + user.getName());
return "success";
}
}
- 服务端程序
import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
/**
* 服务端程序
**/
public class Server {
public static void main(String[] args) {
try {
Hello hello = new HelloImpl(); // 创建一个远程对象,同时也会创建stub对象、skeleton对象
//本地主机上的远程对象注册表Registry的实例,并指定端口为8888,这一步必不可少(Java默认端口是
1099), 必不可缺的一步, 缺少注册表创建, 则无法绑定对象到远程注册表上
LocateRegistry.createRegistry(8080); //启动注册服务
try {
//绑定的URL标准格式为:rmi://host:port/name(其中协议名可以省略,下面两种写法都是正确的)
Naming.bind("//127.0.0.1:8080/zm", hello); //将stub引用绑定到服务地址上
} catch (MalformedURLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("service bind already!!");
} catch (RemoteException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
- 客户端程序
import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;
/**
* 客户端程序
* @author zm
*
*/
public class Client {
public static void main(String[] args) {
try {
//在RMI服务注册表中查找名称为RHello的对象,并调用其上的方法
Hello hello = (Hello) Naming.lookup("//127.0.0.1:8080/zm"); //获取远程对象
User user = new User();
user.setName("james");
System.out.println(hello.sayHello(user));
} catch (MalformedURLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (RemoteException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (NotBoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
- 启动服务端程序
- 客户端调用
服务端:
客户端
# 4 BIO、NIO、AIO
# 4.1 同步和异步
同步(synchronize)、异步(asychronize)是指应用程序和内核的交互而言的.
同步:
指用户进程触发IO操作等待或者轮训的方式查看IO操作是否就绪。
同步举例:
银行取钱,我自己去取钱,取钱的过程中等待.
异步:
当一个异步进程调用发出之后,调用者不会立刻得到结果。而是在调用发出之后,被调用者通过状态、通知来通知调用者,或者通过回调函数来处理这个调用。
使用异步IO时,Java将IO读写委托给OS处理,需要将数据缓冲区地址和大小传给OS,OS需要支持异步IO操作
举个例子:
异步举例:
我请朋友帮我取钱,他取到钱后返回给我. (委托给操作系统OS, OS需要支持IO异步API)
# 4.2 阻塞和非阻塞
阻塞和非阻塞是针对于进程访问数据的时候,根据IO操作的就绪状态来采取不同的方式.
简单点说就是一种读写操作方法的实现方式. 阻塞方式下读取和写入将一直等待, 而非阻塞方式下,读取和写入方法会理解返回一个状态值.
举个例子:
阻塞:
ATM机排队取款,你只能等待排队取款(使用阻塞IO的时候,Java调用会一直阻塞到读写完成才返回。)
非阻塞:
柜台取款,取个号,然后坐在椅子上做其他事,等广播通知,没到你的号你就不能去,但你可以不断的问大堂经理排到了没有。(使用非阻塞IO时,如果不能读写Java调用会马上返回,当IO事件分发器会通知可读写时再继续进行读写,不断循环直到读写完成)
例子
老张煮开水。 老张,水壶两把(普通水壶,简称水壶;会响的水壶,简称响水壶)。
1 老张把水壶放到火上,站立着等水开。(同步阻塞)
2 老张把水壶放到火上,去客厅看电视,时不时去厨房看看水开没有。(同步非阻塞)
3 老张把响水壶放到火上,立等水开。(异步阻塞)
4 老张把响水壶放到火上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶。(异步非阻塞)
# 4.3 BIO
同步阻塞IO。B代表blocking
服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。
适用场景:Java1.4之前唯一的选择,简单易用但资源开销太高
服务端代码
package com.lagou.bio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
public class IOServer {
public static void main(String[] args) throws Exception {
//首先创建了一个serverSocket
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress("127.0.0.1", 8081));
while(true) {
Socket socket = serverSocket.accept(); //同步阻塞
new Thread(() - > {
try {
byte[] bytes = new byte[1024];
int len = socket.getInputStream().read(bytes); //同步阻塞
System.out.println(new String(bytes, 0, len));
socket.getOutputStream().write(bytes, 0, len);
socket.getOutputStream().flush();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
}
客户端代码
package com.lagou.bio;
import java.io.IOException;
import java.net.Socket;
public class IOClient {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 8081);
socket.getOutputStream().write("hello".getBytes());
socket.getOutputStream().flush();
System.out.println("server send back data =====");
byte[] bytes = new byte[1024];
int len = socket.getInputStream().read(bytes);
System.out.println(new String(bytes, 0, len));
socket.close();
}
}
执行结果:
# 4.4 NIO
# 4.4.1 NIO介绍
同步非阻塞IO (non-blocking IO / new io)是指JDK 1.4 及以上版本。
服务器实现模式为一个请求一个通道,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有IO请求时才启动一个线程进行处理。
通道(Channels)
NIO 新引入的最重要的抽象是通道的概念。Channel 数据连接的通道。 数据可以从Channel读到Buwer中,也可以从Buwer 写到Channel中 .
缓冲区(Buwers)
通道channel可以向缓冲区Buwer中写数据,也可以像buwer中存数据。
选择器(Selector)
使用选择器,借助单一线程,就可对数量庞大的活动 I/O 通道实时监控和维护。
# 4.4.2 特点
当一个连接创建后,不会需要对应一个线程,这个连接会被注册到多路复用器,所以一个连接只需要一个线程即可,所有的连接需要一个线程就可以操作,该线程的多路复用器会轮训,发现连接有请求时,才开启一个线程处理。
如上图所示,IO模型中,一个连接来了,会创建一个线程,对应一个while死循环,死循环的目的就是不断监测这条连接上是否有数据可以读,大多数情况下,1w个连接里面同一时刻只有少量的连接有数据可读,因此,很多个while死循环都白白浪费掉了,因为读不出啥数据。
而在NIO模型中,他把这么多while死循环变成一个死循环,这个死循环由一个线程控制,那么他又是如何做到一个线程,一个while死循环就能监测1w个连接是否有数据可读的呢? 这就是NIO模型中selector的作用,一条连接来了之后,现在不创建一个while死循环去监听是否有数据可读了,而是直接把这条连接注册到selector上,然后,通过检查这个selector,就可以批量监测出有数据可读的连接,进而读取数据,下面我再举个非常简单的生活中的例子
说明IO与NIO的区别
在一家幼儿园里,小朋友有上厕所的需求,小朋友都太小以至于你要问他要不要上厕所,他才会告诉你。幼儿园一共有100个小朋友,有两种方案可以解决小朋友上厕所的问题:
每个小朋友配一个老师。每个老师隔段时间询问小朋友是否要上厕所,如果要上,就领他去厕所,100个小朋友就需要100个老师来询问,并且每个小朋友上厕所的时候都需要一个老师领着他去上,这就是IO模型,一个连接对应一个线程。
所有的小朋友都配同一个老师。这个老师隔段时间询问所有的小朋友是否有人要上厕所,然后每一时刻把所有要上厕所的小朋友批量领到厕所,这就是NIO模型,所有小朋友都注册到同一个老师,对应的就是所有的连接都注册到一个线程,然后批量轮询。
# 4.4.3 NIO使用
简单讲完了JDK NIO的解决方案之后,我们接下来使用NIO的方案替换掉IO的方案,我们先来看看,如果用JDK原生的NIO来实现服务端,该怎么做
NIOServer.java
package com.lagou.nio.server;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Scanner;
public class NIOServer extends Thread {
//1.声明多路复用器
private Selector selector;
//2.定义读写缓冲区
private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
//3.定义构造方法初始化端口
public NIOServer(int port) {
init(port);
}
//4.main方法启动线程
public static void main(String[] args) {
new Thread(new NIOServer(8888)).start();
}
//5.初始化
private void init(int port) {
try {
System.out.println("服务器正在启动......");
//1)开启多路复用器
this.selector = Selector.open();
//2) 开启服务通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//3)设置为非阻塞
serverSocketChannel.configureBlocking(false);
//4)绑定端口
serverSocketChannel.bind(new InetSocketAddress(port));
//5)注册,标记服务通标状态
serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器启动完毕");
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
while(true) {
try {
//1.当有至少一个通道被选中,执行此方法
this.selector.select();
//2.获取选中的通道编号集合
Iterator < SelectionKey > keys = this.selector.selectedKeys().iterator();
//3.遍历keys
while(keys.hasNext()) {
SelectionKey key = keys.next();
//4.当前key需要从动刀集合中移出,如果不移出,下次循环会执行对应的逻辑,造成业务错乱
keys.remove();
//5.判断通道是否有效
if(key.isValid()) {
try {
//6.判断是否可读
if(key.isAcceptable()) {
accept(key);
}
} catch (CancelledKeyException e) {
//出现异常断开连接
key.cancel();
}
try {
//7.判断是否可读
if(key.isReadable()) {
read(key);
}
} catch (CancelledKeyException e) {
//出现异常断开连接
key.cancel();
}
try {
//8.判断是否可写
if(key.isWritable()) {
write(key);
}
} catch (CancelledKeyException e) {
//出现异常断开连接
key.cancel();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void accept(SelectionKey key) {
try {
//1.当前通道在init方法中注册到了selector中的ServerSocketChannel
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
//2.阻塞方法, 客户端发起后请求返回.
SocketChannel channel = serverSocketChannel.accept();
///3.serverSocketChannel设置为非阻塞
channel.configureBlocking(false);
//4.设置对应客户端的通道标记,设置次通道为可读时使用
channel.register(this.selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
//使用通道读取数据
private void read(SelectionKey key) {
try {
//清空缓存
this.readBuffer.clear();
//获取当前通道对象
SocketChannel channel = (SocketChannel) key.channel();
//将通道的数据(客户发送的data)读到缓存中.
int readLen = channel.read(readBuffer);
//如果通道中没有数据
if(readLen == -1) {
//关闭通道
key.channel().close();
//关闭连接
key.cancel();
return;
}
//Buffer中有游标,游标不会重置,需要我们调用flip重置. 否则读取不一致
this.readBuffer.flip();
//创建有效字节长度数组
byte[] bytes = new byte[readBuffer.remaining()];
//读取buffer中数据保存在字节数组
readBuffer.get(bytes);
System.out.println("收到了从客户端 " + channel.getRemoteAddress() + " : " + new String(bytes, "UTF-8"));
//注册通道,标记为写操作
channel.register(this.selector, SelectionKey.OP_WRITE);
} catch (Exception e) {}
}
//给通道中写操作
private void write(SelectionKey key) {
//清空缓存
this.readBuffer.clear();
//获取当前通道对象
SocketChannel channel = (SocketChannel) key.channel();
//录入数据
Scanner scanner = new Scanner(System.in);
try {
System.out.println("即将发送数据到客户端..");
String line = scanner.nextLine();
//把录入的数据写到Buffer中
writeBuffer.put(line.getBytes("UTF-8"));
//重置缓存游标
writeBuffer.flip();
channel.write(writeBuffer);
channel.register(this.selector, SelectionKey.OP_READ);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Client客户端
package com.lagou.nio.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class NIOClient {
public static void main(String[] args) {
//创建远程地址
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
SocketChannel channel = null;
//定义缓存
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
//开启通道
channel = SocketChannel.open();
//连接远程远程服务器
channel.connect(address);
Scanner sc = new Scanner(System.in);
while(true) {
System.out.println("客户端即将给 服务器发送数据..");
String line = sc.nextLine();
if(line.equals("exit")) {
break;
}
//控制台输入数据写到缓存
buffer.put(line.getBytes("UTF-8"));
//重置buffer 游标
buffer.flip();
//数据发送到数据
channel.write(buffer);
//清空缓存数据
buffer.clear();
//读取服务器返回的数据
int readLen = channel.read(buffer);
if(readLen == -1) {
break;
}
//重置buffer游标
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
//读取数据到字节数组
buffer.get(bytes);
System.out.println("收到了服务器发送的数据 : " + new String(bytes, "UTF-8"));
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if(null != channel) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
效果
# 4.5 AIO
异步非阻塞IO。A代表asynchronize
当有流可以读时,操作系统会将可以读的流传入read方法的缓冲区,并通知应用程序,对于写操作,OS将write方法的流写入完毕是操作系统会主动通知应用程序。因此read和write都是异步 的,完成后会调用回调函数。
使用场景:连接数目多且连接比较长(重操作)的架构,比如相册服务器。重点调用了OS参与并发操作,编程比较复杂。Java7开始支持
# 5 Netty
# 5.1 Netty认识
Netty 是由 JBOSS 提供一个异步的、 基于事件驱动的网络编程框架。
Netty 可以帮助你快速、 简单的开发出一 个网络应用, 相当于简化和流程化了 NIO 的开发过程。 作为当前最流行的 NIO 框架, Netty 在互联网领域、 大数据分布式计算领域、 游戏行业、 通信行业等获得了广泛的应用, 知名的Elasticsearch 、 Dubbo 框架内部都采用了 Netty。
为什么使用Netty
NIO缺点
NIO 的类库和 API 繁杂,使用麻烦。你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuwer 等.
可靠性不强,开发工作量和难度都非常大
NIO 的 Bug。例如 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。
Netty优点
对各种传输协议提供统一的 API
高度可定制的线程模型——单线程、一个或多个线程池
更好的吞吐量,更低的等待延迟
更少的资源消耗
最小化不必要的内存拷贝
# 5.2 线程模型
1 单线程模型
2 线程池模型
3 Netty 模型
Netty 抽象出两组线程池, BossGroup 专门负责接收客 户端连接, WorkerGroup 专门负责网络读写操作。
NioEventLoop 表示一个不断循环执行处理 任务的线程, 每个 NioEventLoop 都有一个 selector, 用于监听绑定在其上的 socket 网络通道。 NioEventLoop 内部采用串行化设计, 从消息的读取->解码->处理->编码->发送, 始终由 IO线 程 NioEventLoop 负责。
# 5.3 Netty核心组件
ChannelHandler及其实现类
ChannelHandler 接口定义了许多事件处理的方法, 我们可以通过重写这些方法去实现具 体的业务逻辑
我们经常需要自定义一个 Handler 类去继承 ChannelInboundHandlerAdapter, 然后通过 重写相应方法实现业务逻辑, 我们接下来看看一般都需要重写哪些方法:
- public void channelActive(ChannelHandlerContext ctx), 通道就绪事件
- public void channelRead(ChannelHandlerContext ctx, Object msg), 通道读取数据事件
- public void channelReadComplete(ChannelHandlerContext ctx) , 数据读取完毕事件
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause), 通道发生异常事件
ChannelPipeline
ChannelPipeline 是一个 Handler 的集合, 它负责处理和拦截 inbound 或者 outbound 的事 件和操作, 相当于一个贯穿 Netty 的链。
- ChannelPipeline addFirst(ChannelHandler... handlers), 把一个业务处理类(handler) 添加到链中的第一个位置
- ChannelPipeline addLast(ChannelHandler... handlers), 把一个业务处理类(handler) 添加到链中的最后一个位置
ChannelHandlerContext
这 是 事 件 处 理 器 上 下 文 对 象 , Pipeline 链 中 的 实 际 处 理 节 点 。 每 个 处 理 节 点ChannelHandlerContext中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler , 同 时 ChannelHandlerContext 中也绑定了对应的 pipeline和 Channel 的信息,方便对 ChannelHandler 进行调用。 常用方法如下所示
- ChannelFuture close(), 关闭通道
- ChannelOutboundInvoker flush(), 刷新
- ChannelFuture writeAndFlush(Object msg) , 将 数 据 写 到 ChannelPipeline 中 当 前
- ChannelHandler 的下一个 ChannelHandler 开始处理(出站)
ChannelFuture
表示 Channel 中异步 I/O 操作的结果, 在 Netty 中所有的 I/O 操作都是异步的, I/O 的调 用会直接返回, 调用者并不能立刻获得结果, 但是可以通过 ChannelFuture 来获取 I/O 操作 的处理状态。 常用方法如下所示:
Channel channel(), 返回当前正在进行 IO 操作的通道
ChannelFuture sync(), 等待异步操作执行完毕
EventLoopGroup和其实现类NioEventLoopGroup
EventLoopGroup 是一组 EventLoop 的抽象, Netty 为了更好的利用多核 CPU 资源, 一般 会有多个 EventLoop 同时工作, 每个 EventLoop 维护着一个 Selector 实例。 EventLoopGroup 提供 next 接口, 可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。 在 Netty 服务器端编程中, 我们一般都需要提供两个 EventLoopGroup, 例如: BossEventLoopGroup 和 WorkerEventLoopGroup。
- public NioEventLoopGroup(), 构造方法
- public Future<?> shutdownGracefully(), 断开连接, 关闭线程
ServerBootstrap和Bootstrap
ServerBootstrap 是 Netty 中的服务器端启动助手,通过它可以完成服务器端的各种配置; Bootstrap 是 Netty 中的客户端启动助手, 通过它可以完成客户端的各种配置。 常用方法如下 所示:
- public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup),该方法用于服务器端, 用来设置两个 EventLoop
- public B group(EventLoopGroup group) , 该方法用于客户端, 用来设置一个 EventLoop
- public B channel(Class<? extends C> channelClass), 该方法用来设置一个服务器端的通道实现
- public <T> B option(ChannelOption<T> option, T value), 用来给 ServerChannel 添加配置
- public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value), 用来给接收到的 通道添加配置
- public ServerBootstrap childHandler(ChannelHandler childHandler), 该方法用来设置业务处理类(自定义的 handler)
- public ChannelFuture bind(int inetPort) , 该方法用于服务器端, 用来设置占用的端口号
- public ChannelFuture connect(String inetHost, int inetPort) 该方法用于客户端, 用来连接服务器端
# 5.4 Netty版案例实现
目标: 使用netty客户端给服务端发送数据,服务端接收消息打印.
首先引入Maven依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
然后,下面是服务端实现部分
NettyServer.java
package com.lagou.Netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//1.创建 NioEventLoopGroup的两个实例:bossGroup workerGroup
// 当前这两个实例代表两个线程池,默认线程数为CPU核心数乘2
// bossGroup接收客户端传过来的请求
// workerGroup处理请求
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
//2、创建服务启动辅助类:组装一些必要的组件
ServerBootstrap serverBootstrap = new ServerBootstrap();
//设置组,第一个bossGroup负责连接, workerGroup负责连接之后的io处理
serverBootstrap.group(bossGroup, workerGroup)
//channel方法指定服务器监听的通道类型
.channel(NioServerSocketChannel.class)
//设置channel handler , 每一个客户端连接后,给定一个监听器进行处理
.childHandler(new ChannelInitializer < NioSocketChannel > () {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//传输通道
ChannelPipeline pipeline = ch.pipeline();
//在通道上添加对通道的处理器 , 该处理器可能还是一个监听器
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
//监听器队列上添加我们自己的处理方式..
pipeline.addLast(new SimpleChannelInboundHandler < String > () {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s);
}
});
}
});
//bind监听端口
ChannelFuture f = serverBootstrap.bind(8000).sync();
System.out.println("tcp server start success..");
//会阻塞等待直到服务器的channel关闭
f.channel().closeFuture().sync();
}
}
在编写 Netty 程序时,一开始都会生成 NioEventLoopGroup 的两个实例,分别是 bossGroup 和 workerGroup,也可以称为 parentGroup 和 childGroup,为什么创建这两个实例,作用是什么?可以这么理解,bossGroup 和workerGroup 是两个线程池, 它们默认线程数为 CPU 核心数乘以 2,bossGroup 用于接收客户端传过来的请求,接收到请求后将后续操作交由 workerGroup 处理。
接下来我们生成了一个服务启动辅助类的实例 bootstrap,boostrap 用来为 Netty 程序的启动组装配置一些必须要组件,例如上面的创建的两个线程组。channel 方法用于指定服务器端监听套接字通道 NioServerSocketChannel,其内部管理了一个 Java NIO 中的ServerSocketChannel实例。
channelHandler 方法用于设置业务职责链,责任链是我们下面要编写的,责任链具体是什么,它其实就是由一个个的 ChannelHandler 串联而成,形成的链式结构。正是这一个个的 ChannelHandler 帮我们完成了要处理的事情。
ChannelInitializer 继承 ChannelInboundHandlerAdapter,用于初始化 Channel 的 ChannelPipeline。通过 initChannel方法参数 sc 得到 ChannelPipeline 的一个实例。
当一个新的连接被接受时, 一个新的 Channel 将被创建,同时它会被自动地分配到它专属的 ChannelPipeline
通过 addLast 方法将一个一个的 ChannelHandler 添加到责任链上并给它们取个名称(不取也可以,Netty 会给它个默认名称),这样就形成了链式结构。在请求进来或者响应出去时都会经过链上这些 ChannelHandler 的处理。
最后再向链上加入我们自定义的 ChannelHandler 组件,处理自定义的业务逻辑
接着我们调用了 bootstrap 的 bind 方法将服务绑定到 8080 端口上,bind 方法内部会执行端口绑定等一系列操,使得前面的配置都各就各位各司其职,sync 方法用于阻塞当前 Thread,一直到端口绑定操作完成。最后是应用程序将会阻塞等待直到服务器的 Channel 关闭。
客户端NIO的实现部分
NettyClient.java
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//客户端的启动辅助类
Bootstrap bootstrap = new Bootstrap();
//线程池的实例
NioEventLoopGroup group = new NioEventLoopGroup();
//添加到组中
bootstrap.group(group)
//channel方法指定通道类型
.channel(NioSocketChannel.class)
//通道初始化了
.handler(new ChannelInitializer < Channel > () {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
});
Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();
while(true) {
channel.writeAndFlush(new Date() + ": hello world!");
Thread.sleep(2000);
}
}
}
使用Netty之后,一方面Netty对NIO封装得如此完美,写出来的代码非常优雅,另外一方面,使用Netty之后,网络通信这块的性能问题几乎不用操心
# 6 基于Netty自定义RPC
RPC又称远程过程调用,我们所知的远程调用分为两种,现在在服务间通信的方式也基本以这两种为主
1.是基于HTTP的restful形式的广义远程调用,以spring could的feign和restTemplate为代表,采用的协议是HTTP的7层调用协议,并且协议的参数和响应序列化基本以JSON格式和XML格式为主。
2.是基于TCP的狭义的RPC远程调用,以阿里的Dubbo为代表,主要通过netty来实现4层网络协议,NIO来异步传输,序列化也可以是JSON或者hessian2以及java自带的序列化等,可以配置。
接下来我们主要以第二种的RPC远程调用来自己实现
需求:
模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty
步骤
- 创建一个公共的接口项目以及创建接口及方法,用于消费者和提供者之间的约定。
- 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
- 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据
# 6.1 公共模块
首先,在公共模块中添加netty的maven依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
提供者及消费者工程都需依赖公共模块,这样提供者来实现接口并且提供网络调用,消费者直接通过接口来进行TCP通信及一定的协议定制获取提供者的实现返回值
接口的定义
public interface UserService {
String sayHello(String word);
}
只是一个普通的接口,参数是支持序列化的String类型,返回值同理
# 6.2 提供者的实现
首先是接口的实现,这一点和普通接口实现是一样的
public class UserServiceImpl implements UserService {
@Override
public String sayHello(String word) {
System.out.println("调用成功--参数:" + word);
return "调用成功--参数:" + word;
}
public static void startServer(String hostName, int port) {
try {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer < SocketChannel > () {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(new UserServerHandler());
}
});
bootstrap.bind(hostName, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在实现中加入了netty的服务器启动程序,上面的代码中添加了 String类型的编解码 handler,添加了一个自定义handler
自定义 handler 逻辑如下:
public class UserServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 如何符合约定,则调用本地方法,返回数据
if(msg.toString().startsWith("UserService")) {
String result = new UserServiceImpl().sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(result);
}
}
}
这里显示判断了是否符合约定(并没有使用复杂的协议,只是一个字符串判断),然后创建一个具体实现类,并调用方法写回客户端。
还需要一个启动类:
public class ServerBootstrap {
public static void main(String[] args) {
UserServiceImpl.startServer("localhost", 8990);
}
}
关于提供者的代码就写完了,主要就是创建一个 netty 服务端,实现一个自定义的 handler,自定义 handler 判断是否符合之间的约定(协议),如果符合,就创建一个接口的实现类,并调用他的方法返回字符串。
# 6.3 消费者相关实现
消费者有一个需要注意的地方,就是调用需要透明,也就是说,框架使用者不用关心底层的网络实现。这里我们可以使用 JDK 的动态代理来实现这个目的。
思路:客户端调用代理方法,返回一个实现了 HelloService 接口的代理对象,调用代理对象的方法,返回结果。
我们需要在代理中做手脚,当调用代理方法的时候,我们需要初始化 Netty 客户端,还需要向服务端请求数据,并返回数据。
首先创建代理相关的类:
public class RpcConsumer {
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static UserClientHandler client;
/**
* 创建一个代理对象
*/
public Object createProxy(final Class < ? > serviceClass, final String providerName) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class < ? > [] {
serviceClass
}, (proxy, method, args) - > {
if(client == null) {
initClient();
}
// 设置参数
client.setPara(providerName + args[0]);
return executor.submit(client).get();
});
}
/**
* 初始化客户端
*/
private static void initClient() {
client = new UserClientHandler();
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer < SocketChannel > () {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(client);
}
});
try {
b.connect("localhost", 8990).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
该类有 2 个方法,创建代理和初始化客户端。
创建代理逻辑:使用 JDK 的动态代理技术,代理对象中的 invoke 方法实现如下: 如果 client 没有初始化,则初始化 client,这个 client 既是 handler ,也是一个 Callback。将参数设置进 client ,使用线程池调用 client 的 call 方法并阻塞等待数据返回
初始化客户端逻辑: 创建一个 Netty 的客户端,并连接提供者,并设置一个自定义 handler,和一些 String 类型的序列化方式。
UserClientHandler 的实现:
public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;
private String result;
private String para;
@Override
public void channelActive(ChannelHandlerContext ctx) {
context = ctx;
}
/**
\* 收到服务端数据,唤醒等待线程
*/
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) {
result = msg.toString();
notify();
}
/**
* 写出数据,开始等待唤醒
*/
@Override
public synchronized Object call() throws InterruptedException {
context.writeAndFlush(para);
wait();
return result;
}
public void setPara(String para) {
this.para = para;
}
}
该类缓存了 ChannelHandlerContext,用于下次使用,有两个属性:返回结果和请求参数。
当成功连接后,缓存 ChannelHandlerContext,当调用 call 方法的时候,将请求参数发送到服务端,等待。当服务端收到并返回数据后,调用 channelRead 方法,将返回值赋值个 result,并唤醒等待在 call 方法上的线程。此时,代理对象返回数据。
再看看消费者调用方式,一般的TCP的RPC只需要这样调用即可,无需关心具体的协议和通信方式:
public class ClientBootstrap {
public static final String providerName = "UserService#sayHello#";
public static void main(String[] args) throws InterruptedException {
RpcConsumer consumer = new RpcConsumer();
// 创建一个代理对象
UserService service = (UserService) consumer.createProxy(UserService.class, providerName);
for(;;) {
Thread.sleep(1000);
System.out.println(service.sayHello("are you ok ?"));
}
}
}
调用者首先创建了一个代理对象,然后每隔一秒钟调用代理的 sayHello 方法,并打印服务端返回的结果
可以看到,消费者无需通过jar包的形式引入具体的实现项目,而是通过远程TCP通信的形式,以一定的协议和代理通过接口直接调用了方法,实现远程service间的调用,是分布式服务的基础