学习目标
- 什么是RPC
- RPC实现原理
- 借助netty 实现网络通信
- java 反射
- 代码实现
什么是RPC
RPC(Remote Procedure Call),即远程过程调用,它是一种通过网络从远程计算机程序 上请求服务,而不需要了解底层网络实现的技术。常见的 RPC 框架有: 源自阿里的 Dubbo, Spring 旗下的 Spring Cloud,Google 出品的 grpc 等等。
RPC原理
![](https://img.haomeiwen.com/i25041675/5f66607ed3a9bc23.png)
-
服务消费方(client)以本地调用方式调用服务
-
client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
-
client stub 将消息进行编码并发送到服务端
-
server stub 收到消息后进行解码
-
server stub 根据解码结果调用本地的服务
-
本地服务执行并将结果返回给 server stub
-
server stub 将返回导入结果进行编码并发送至消费
-
client stub 接收到消息并进行解码
-
服务消费方(client)得到结果
RPC 的目标就是将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地 方法一样即可完成远程服务调用。接下来我们基于 Netty 自己动手搞定一个 RPC。
借助netty 实现网络通信
![](https://img.haomeiwen.com/i25041675/5263e4d6e52b3229.png)
client(服务的调用方): 两个接口 + 一个包含 main 方法的测试类
Client Stub: 一个客户端代理类 + 一个客户端业务处理类
Server(服务的提供方): 两个接口 + 两个实现类
Server Stub: 一个网络处理服务器 + 一个服务器业务处理类
注意:服务调用方的接口必须跟服务提供方的接口保持一致(包路径可以不一致) 最终要实现的目标是:在 TestNettyRPC 中远程调用 HelloRPCImpl 或 HelloNettyImpl 中的方法
java 反射实现
- Class.forName("com.example.Person")
- obj.getClass()
- MyClass.class
- Reflections
代码实现
- 消费者应用
@RestController
@RequestMapping("/api")
public class ConsumerController {
@Resource
private NettyRPCProxy nettyRPCProxy;
@GetMapping("/comsumer")
private String comsumer(String str) {
Provider provider = (Provider)nettyRPCProxy.create(Provider.class);
String res = provider.provider(str);
System.out.println(res);// 打印 12312_provider123
return res;
}
}
public interface Provider {
String provider(String str);
}
-
生产者应用
public class ProviderImpl implements Provider { @Override public String provider(String str) { return str + "_provider123"; } } public interface Provider { String provider(String str); }
-
rpc核心代码
clientsub
public class NettyRPCProxy { //根据接口创建代理对象 public Object create(Class target) { return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //封装ClassInfo ClassInfo classInfo = new ClassInfo(); classInfo.setClassName(target.getName()); classInfo.setMethodName(method.getName()); classInfo.setObjects(args); classInfo.setTypes(method.getParameterTypes()); //开始用Netty发送数据 EventLoopGroup group = new NioEventLoopGroup(); ResultHandler resultHandler = new ResultHandler(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //编码器 pipeline.addLast("encoder", new ObjectEncoder()); //解码器 构造方法第一个参数设置二进制数据的最大字节数 第二个参数设置具体使用哪个类解析器 pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); //客户端业务处理类 pipeline.addLast("handler", resultHandler); } }); ChannelFuture future = b.connect("127.0.0.1", 9999).sync(); future.channel().writeAndFlush(classInfo).sync(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } return resultHandler.getResponse(); } }); } }
public class ResultHandler extends ChannelInboundHandlerAdapter {
private Object response;
public Object getResponse() {
return response;
}
@Override //读取服务器端返回的数据(远程调用的结果)
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = msg;
ctx.close();
}
}
serverstub
public class NettyRPCServer {
private int port;
public NettyRPCServer(int port) {
this.port = port;
}
public void start() throws InterruptedException {
//主线程池-负责处理客户端连接
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
//主线程池-负责处理网络IO读写
NioEventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)//设置线程队列的连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持活动连接状态
.localAddress(port)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//编码
pipeline.addLast("encoder", new ObjectEncoder());
//解码
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
//服务器端业务处理类
pipeline.addLast(new InvokeHandler());
}
});
/*异步绑定到服务器,sync()会阻塞到完成*/
ChannelFuture f = server.bind().sync();
System.out.println("......server is ready......");
/*阻塞当前线程,直到服务器的ServerChannel被关闭*/
f.channel().closeFuture().sync();
}
public static void main(String[] args) throws Exception {
new NettyRPCServer(9999).start();
}
}
public class InvokeHandler extends ChannelInboundHandlerAdapter {
//得到某接口下某个实现类的名字
private String getImplClassName(ClassInfo classInfo) throws Exception{
String interfacePath="org.example.netty.rpc.provider.service";//服务提供者的 包路径
int lastDot = classInfo.getClassName().lastIndexOf(".");
String interfaceName=classInfo.getClassName().substring(lastDot);
Class superClass= null;//拼接 全限定类 是服务提供者的
try {
superClass = Class.forName(interfacePath + interfaceName);
} catch (ClassNotFoundException e) {
e.printStackTrace();
throw e;
}
List<Class> classes2 = getClassesForPackage(interfacePath, superClass);
if (classes2 != null && classes2.size() > 0) {
return classes2.get(0).getName();
}
Reflections reflections = new Reflections(interfacePath);
// 得到某接口下的所有实现类
Set<Class> ImplClassSet=reflections.getSubTypesOf(superClass);
if(ImplClassSet.size()==0){
System.out.println("未找到实现类");
return null;
}else if(ImplClassSet.size()>1){
System.out.println("找到多个实现类,未明确使用哪一个");
return null;
}else {
//把集合转换为数组
Class[] classes=ImplClassSet.toArray(new Class[0]);
return classes[0].getName(); //得到实现类的名字
}
}
@Override //读取客户端发来的数据并通过反射调用实现类的方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ClassInfo classInfo = (ClassInfo) msg;
Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();
Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
//通过反射调用实现类的方法
Object result = method.invoke(clazz, classInfo.getObjects());
ctx.writeAndFlush(result);
}
private List<Class> getClassesForPackage(String packageName, Class<?> targetInterface) {
List<Class> classes = new ArrayList<>();
String path = packageName.replace('.', '/');
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
try {
Enumeration<URL> resources = classLoader.getResources(path);
while (resources.hasMoreElements()) {
File file = new File(resources.nextElement().getFile());
if (file.isDirectory()) {
classes.addAll(findClasses(file, packageName, targetInterface));
}
}
} catch (IOException e) {
e.printStackTrace();
}
return classes;
}
// private static Collection<Class> findClasses2(File file, String packageName) {
//
// return null;
// }
private List<Class> findClasses(File directory, String packageName, Class<?> targetInterface) {
List<Class> classes = new ArrayList<>();
File[] files = directory.listFiles();
for (File file : files) {
if (file.isDirectory()) {
classes.addAll(findClasses(file, packageName + "." + file.getName(), targetInterface));
} else if (file.getName().endsWith(".class")) {
String className = packageName + '.' + file.getName().substring(0, file.getName().length() - 6);
try {
Class<?> clzz = Class.forName(className);
//判断 targetInterface 是 当前clzz 父类或接口, 排除接口类
if (targetInterface.isAssignableFrom(clzz) && !clzz.isInterface()) {
classes.add(Class.forName(className));
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
return classes;
}
}
![](https://img.haomeiwen.com/i25041675/d6656ef4ef2fa16c.png)
网友评论