项目结构:
api:对外提供的服务接口。
protocol:自定义传输协议的内容。
registry:主要负责保存所有可用的服务名称和服务地址。
provider:服务接口实现类。
consumer:客户端调用。
![](https://img.haomeiwen.com/i16701032/8f94d39d205bcd01.png)
写一个最简单的api接口
package com.wei.rpc.api;
public interface IRpcHelloService {
String hello(String name);
}
写对应的provider实现类
package com.wei.rpc.provider;
import com.wei.rpc.api.IRpcHelloService;
public class RpcHelloServiceImpl implements IRpcHelloService {
public String hello(String name) {
return "Hello " + name + "!";
}
}
如果目前直接new 对象调用,肯定是没问题,目标是模拟rpc的方式调用
package com.wei.rpc.consumer;
import com.wei.rpc.api.IRpcHelloService;
import com.wei.rpc.consumer.proxy.RpcProxy;
public class RpcConsumer {
public static void main(String [] args){
// IRpcHelloService rpcHello = new RpcHelloServiceImpl(); 不能直接new
IRpcHelloService rpcHello = RpcProxy.create(IRpcHelloService.class); //用rpc的方式,调用到远程的服务
System.out.println(rpcHello.hello("wei"));
}
}
我们知道RPC的方式肯定要进行网络传输,所有需要定义一个自定义传输协议类
package com.wei.rpc.protocol;
import java.io.Serializable;
/**
* 自定义传输协议
*/
public class InvokerProtocol implements Serializable {
private String className;//类名
private String methodName;//函数名称
private Class<?>[] parames;//形参列表
private Object[] values;//实参列表
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParames() {
return parames;
}
public void setParames(Class<?>[] parames) {
this.parames = parames;
}
public Object[] getValues() {
return values;
}
public void setValues(Object[] values) {
this.values = values;
}
}
协议中主要包含的信息有类名、函数名、形参列表和实参列表,通过这些信息就可以定位到一
个具体的方法调用。
Registry 注册中心主要功能就是负责将所有 Provider 的服务名称和服务引用地址注册到一个容器中,并对外发布。
Registry 应该要启动一个对外的服务,很显然应该作为服务端,并提供一个对外可以访问的端口。先启动一个 Netty服务,创建 RpcRegistry 类,具体代码如下:
package com.wei.rpc.registry;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class RpcRegistry {
private int port;
public RpcRegistry(int port){
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//自定义协议解码器
/** 入参有5个,分别解释如下
maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8)
lengthAdjustment:要添加到长度字段值的补偿值
initialBytesToStrip:从解码帧中去除的第一个字节数
*/
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
//自定义协议编码器
pipeline.addLast(new LengthFieldPrepender(4));
//对象参数类型编码器
pipeline.addLast("encoder",new ObjectEncoder());
//对象参数类型解码器
pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)));
//这个handler就是具体执行的注册实现类,主要实现服务注册和服务调用的功能
pipeline.addLast(new RegistryHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = b.bind(port).sync();
System.out.println("RPC Registry start listen at " + port );
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new RpcRegistry(8080).start();
}
}
RegistryHandler的代码如下,主要实现服务注册和服务调用的功能,服务注册很简单,构造方法里面扫描本地class,放到map中保存,然后反射调用接口,真正dubbo当然是通过配置文件获取,
服务调用必须要继承ChannelInboundHandlerAdapter类,重写channelRead方法,这个方法就是客户端远程调用时触发的方法
package com.wei.rpc.registry;
import com.wei.rpc.protocol.InvokerProtocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.File;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
public class RegistryHandler extends ChannelInboundHandlerAdapter {
//用保存所有可用的服务
public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap<String,Object>();
//保存所有相关的服务类
private List<String> classNames = new ArrayList<String>();
public RegistryHandler(){
//完成递归扫描
scannerClass("com.wei.rpc.provider");
doRegister();
}
/**
* 客户端远程调用时触发
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Object result = new Object();
InvokerProtocol request = (InvokerProtocol)msg;
//当客户端建立连接时,需要从自定义协议中获取信息,拿到具体的服务和实参
//使用反射调用
if(registryMap.containsKey(request.getClassName())){
Object clazz = registryMap.get(request.getClassName());
Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParames());
result = method.invoke(clazz, request.getValues());
}
ctx.write(result);
ctx.flush();
ctx.close();
}
/**
* 调用异常时触发
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/*
* 递归扫描
*/
private void scannerClass(String packageName){
URL url = this.getClass().getClassLoader().getResource(packageName.replaceAll("\\.", "/"));
File dir = new File(url.getFile());
for (File file : dir.listFiles()) {
//如果是一个文件夹,继续递归
if(file.isDirectory()){
scannerClass(packageName + "." + file.getName());
}else{
classNames.add(packageName + "." + file.getName().replace(".class", "").trim());
}
}
}
/**
* 完成注册
*/
private void doRegister(){
if(classNames.size() == 0){ return; }
for (String className : classNames) {
try {
Class<?> clazz = Class.forName(className);
Class<?> i = clazz.getInterfaces()[0];
registryMap.put(i.getName(), clazz.newInstance());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
以上,注册中心的基本功能就已完成,下面来写客户端的代码实现。
梳理一下基本的实现思路,主要完成一个这样的功能:API 模块中的接口功能在服务端实现(并没有在客户端实现)。
因此,客户端调用 API 中定义的某一个接口方法时,实际上是要发起一次网络请求去调用服务端的某一个服务。而这个网络请求首先被注册中心接收,由注册中心先确定需要调用的服务的位置,再将请求转发至真实的服务实现,最终调用服务端代码,将返回值通过网络传输给客户端。整个过程对于客户端而言是完全无感知的,就像调用本地方法一样。具体调用过程如下图所示:
![](https://img.haomeiwen.com/i16701032/0577215f6f1d863c.png)
所以先要创建的代理类,目前就用jdk的动态代理实现,代码如下:
package com.wei.rpc.consumer.proxy;
import com.wei.rpc.protocol.InvokerProtocol;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
public class RpcProxy {
public static <T> T create(Class<?> clazz){
//clazz传进来本身就是interface
MethodProxy proxy = new MethodProxy(clazz);
Class<?> [] interfaces = clazz.isInterface() ?
new Class[]{clazz} :
clazz.getInterfaces();
T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,proxy);
return result;
}
private static class MethodProxy implements InvocationHandler {
private Class<?> clazz;
public MethodProxy(Class<?> clazz){
this.clazz = clazz;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//如果传进来是一个已实现的具体类(本次演示略过此逻辑)
if (Object.class.equals(method.getDeclaringClass())) {
try {
return method.invoke(this, args);
} catch (Throwable t) {
t.printStackTrace();
}
//如果传进来的是一个接口(核心)
} else {
return rpcInvoke(proxy,method, args);
}
return null;
}
/**
* 实现接口的核心方法
* @param method
* @param args
* @return
*/
public Object rpcInvoke(Object proxy,Method method,Object[] args){
//传输协议封装
InvokerProtocol msg = new InvokerProtocol();
msg.setClassName(this.clazz.getName());
msg.setMethodName(method.getName());
msg.setValues(args);
msg.setParames(method.getParameterTypes());
//接受服务端的数据处理类
final RpcProxyHandler consumerHandler = new RpcProxyHandler();
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//自定义协议解码器
/** 入参有5个,分别解释如下
maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
lengthFieldLength:长度字段的长度:如:长度字段是int型表示,那么这个值就是4(long型就是8)
lengthAdjustment:要添加到长度字段值的补偿值
initialBytesToStrip:从解码帧中去除的第一个字节数
*/
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
//自定义协议编码器
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
//对象参数类型编码器
pipeline.addLast("encoder", new ObjectEncoder());
//对象参数类型解码器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast("handler",consumerHandler);
}
});
ChannelFuture future = b.connect("localhost", 8080).sync();
future.channel().writeAndFlush(msg).sync();
future.channel().closeFuture().sync();
} catch(Exception e){
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
return consumerHandler.getResponse();
}
}
}
你会看到netty的客户端代码和服务端代码差不多,也是有一个核心处理类consumerHandler,进去看下也是一样继承ChannelInboundHandlerAdapter类,实现两个方法,不过我们为了拿到返回值,所以多加一个getResponse方法
package com.wei.rpc.consumer.proxy;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class RpcProxyHandler extends ChannelInboundHandlerAdapter {
private Object response;
public Object getResponse() {
return response;
}
/**
* 服务端数据处理后,会调用此方法
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = msg;
}
/**
* 调用异常时触发
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exception is general");
}
}
代码全部完成,先运行下注册中心 RpcRegistry类,运行结果:
![](https://img.haomeiwen.com/i16701032/94428fb55cbe2e32.png)
再运行客户端调用代码:
package com.wei.rpc.consumer;
import com.wei.rpc.api.IRpcHelloService;
import com.wei.rpc.consumer.proxy.RpcProxy;
public class RpcConsumer {
public static void main(String[] args) {
// IRpcHelloService rpcHello = new RpcHelloServiceImpl(); 不能直接new
IRpcHelloService rpcHello = RpcProxy.create(IRpcHelloService.class); //用rpc的方式,调用到远程的服务
System.out.println(rpcHello.hello("wei"));
}
}
运行结果如下,大功告成
![](https://img.haomeiwen.com/i16701032/69e713c3812dce58.png)
——学自咕泡学院
网友评论