本文将用netty实现一个简单的RPC框架。
一、什么叫RPC?
RPC,远程调用,就是A程序部署在1号机器上,B程序部署在2号机器上,A可以像调本地方法一样地去调用B程序,而不需要程序员额外地编写这个交互过程,这就叫RPC远程调用。dubbo、Ribbon、openfeign都是RPC框架。
二、RPC调用过程
- 请求:服务消费者发送请求 ---> 编码 ---> 通过网络发送 ---> 服务提供者接收请求 ---> 解码 ---> 处理请求
- 响应:服务提供者处理完请求进行响应 ---> 编码 ---> 通过网络发送 ---> 消费者接收响应 ---> 解码 ---> 处理响应
这两个流程中,我们只需要关心发送请求和处理响应即可,中间那些过程对程序员来说都是透明的。所以,要实现RPC框架,要处理的就是封装中间那些步骤。
欢迎大家关注我的公众号 javawebkf,目前正在慢慢地将简书文章搬到公众号,以后简书和公众号文章将同步更新,且简书上的付费文章在公众号上将免费。
三、实现思路
假如现在我们需要调用服务提供者HelloServiceImpl的hello方法,调用流程如下:
- 客户端创建代理对象,通过Netty,远程连接服务端,将参数发送过去。
- 服务端收到参数,传给hello方法,接收到返回值,再发送给客户端。
四、代码实操
1、HelloService接口:
public interface HelloService {
String hello(String msg);
}
2、服务端:
- HelloServiceImpl:
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String msg) {
if (StringUtils.isNotBlank(msg)) {
return"收到客户端消息:[" + msg + "]";
}
else return "收到客户端消息";
}
}
- NettyServerHandler:
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("msg = " + msg);
if (msg.toString().startsWith("HelloService#hello#")){
String param = msg.toString().substring(msg.toString().lastIndexOf("#") + 1);
String result = new HelloServiceImpl().hello(param);
ctx.writeAndFlush(result);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
- NettyServer:
public class NettyServer {
public static void startService(String host, int port){
startServer0(host, port);
}
private static void startServer0(String host, int port){
EventLoopGroup bossgroup = new NioEventLoopGroup(1);
EventLoopGroup workergroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossgroup, workergroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync();
System.out.println("provider is in active");
channelFuture.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
bossgroup.shutdownGracefully();
workergroup.shutdownGracefully();
}
}
}
- ServerBootStrap
public class ServerBootStrap {
public static void main(String[] args){
NettyServer.startService("127.0.0.1", 8848);
}
}
3、客户端:
- NettyClientHandler:
/**
* 流程:业务代码远程调用某个方法,被代理对象拦截,进而执行call方法,
* call方法请求服务端,然后在waiting着,服务器处理完请求将响应返回给另外一个方法,
* 即read方法,read方法收到响应后,所以就应该唤醒call方法中等待的线程,继续往下执行。
* @author: zhu
* @date: 2020/8/22 16:09
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;
private String result;
private String param;
/**
* 被代理对象调用,发送数据给服务器,然后等待被唤醒
* @return
* @throws Exception
*/
@Override
public synchronized Object call() throws Exception {
// 将参数发送给服务端
context.writeAndFlush(param);
// 发送之后就等待被唤醒
wait();
// 被唤醒的时候read方法已经拿到result了,这里直接return即可
return result;
}
/**
* 与服务器连接创建成功后被调用
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx;
}
/**
* 收到服务器数据后被调用
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg.toString();
notify(); // 唤醒call方法
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
public void setParam(String param) {
this.param = param;
}
}
- NettyClient :
public class NettyClient {
private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler clientHandler;
/**
* 初始化
*/
private static void initClient(){
clientHandler = new NettyClientHandler();
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(clientHandler);
}
});
try {
bootstrap.connect("127.0.0.1", 8848).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Object getBean(final Class<?> serviceClass, final String providerName){
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[] {serviceClass}, (proxy, method, args) -> {
if (clientHandler == null){
initClient();
}
// 设置要发给服务器端的参数
clientHandler.setParam(providerName + args[0]);
return executorService.submit(clientHandler).get();
});
}
}
- ClientBootStrap:
public class ClientBootStrap {
// 定义协议头
private static final String providerName = "HelloService#hello#";
public static void main(String[] args){
// 创建消费者
NettyClient customer = new NettyClient();
HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);
String result = service.hello("扣你吉瓦");
System.out.println(result);
}
}
先启动ServerBootStrap,然后启动ClientBootStrap去调用服务端的hello方法,最后可以成功拿到返回结果。
网友评论