美文网首页
Netty源码系列2--基于 Netty 手写DUBBO 框架

Netty源码系列2--基于 Netty 手写DUBBO 框架

作者: WEIJAVA | 来源:发表于2019-06-20 11:28 被阅读0次

项目结构:
api:对外提供的服务接口。
protocol:自定义传输协议的内容。
registry:主要负责保存所有可用的服务名称和服务地址。
provider:服务接口实现类。
consumer:客户端调用。

image.png

写一个最简单的api接口

package com.wei.rpc.api;

public interface IRpcHelloService {
    String hello(String name);  
}  

写对应的provider实现类

package com.wei.rpc.provider;

import com.wei.rpc.api.IRpcHelloService;

public class RpcHelloServiceImpl implements IRpcHelloService {

    public String hello(String name) {  
        return "Hello " + name + "!";  
    }  
  
}  

如果目前直接new 对象调用,肯定是没问题,目标是模拟rpc的方式调用

package com.wei.rpc.consumer;

import com.wei.rpc.api.IRpcHelloService;
import com.wei.rpc.consumer.proxy.RpcProxy;

public class RpcConsumer {
    
    public static void main(String [] args){
      //  IRpcHelloService rpcHello = new RpcHelloServiceImpl(); 不能直接new
        IRpcHelloService rpcHello = RpcProxy.create(IRpcHelloService.class); //用rpc的方式,调用到远程的服务
        System.out.println(rpcHello.hello("wei"));
    }
}

我们知道RPC的方式肯定要进行网络传输,所有需要定义一个自定义传输协议类

package com.wei.rpc.protocol;

import java.io.Serializable;

/**
 * 自定义传输协议
 */
public class InvokerProtocol implements Serializable {

    private String className;//类名
    private String methodName;//函数名称 
    private Class<?>[] parames;//形参列表
    private Object[] values;//实参列表

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class<?>[] getParames() {
        return parames;
    }

    public void setParames(Class<?>[] parames) {
        this.parames = parames;
    }

    public Object[] getValues() {
        return values;
    }

    public void setValues(Object[] values) {
        this.values = values;
    }
}

协议中主要包含的信息有类名、函数名、形参列表和实参列表,通过这些信息就可以定位到一
个具体的方法调用。

Registry 注册中心主要功能就是负责将所有 Provider 的服务名称和服务引用地址注册到一个容器中,并对外发布。
Registry 应该要启动一个对外的服务,很显然应该作为服务端,并提供一个对外可以访问的端口。先启动一个 Netty服务,创建 RpcRegistry 类,具体代码如下:

package com.wei.rpc.registry;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class RpcRegistry {  
    private int port;  
    public RpcRegistry(int port){  
        this.port = port;  
    }  
    public void start(){  
        EventLoopGroup bossGroup = new NioEventLoopGroup();  
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)  
                    .childHandler(new ChannelInitializer<SocketChannel>() {
  
                        @Override  
                        protected void initChannel(SocketChannel ch) throws Exception {  
                            ChannelPipeline pipeline = ch.pipeline();
                            //自定义协议解码器
                            /** 入参有5个,分别解释如下
                             maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
                             lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
                             lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8)
                             lengthAdjustment:要添加到长度字段值的补偿值
                             initialBytesToStrip:从解码帧中去除的第一个字节数
                             */
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                            //自定义协议编码器
                            pipeline.addLast(new LengthFieldPrepender(4));
                            //对象参数类型编码器
                            pipeline.addLast("encoder",new ObjectEncoder());
                            //对象参数类型解码器
                            pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)));
                            //这个handler就是具体执行的注册实现类,主要实现服务注册和服务调用的功能
                            pipeline.addLast(new RegistryHandler());
                        }  
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)       
                    .childOption(ChannelOption.SO_KEEPALIVE, true);  
            ChannelFuture future = b.bind(port).sync();      
            System.out.println("RPC Registry start listen at " + port );
            future.channel().closeFuture().sync();    
        } catch (Exception e) {  
             bossGroup.shutdownGracefully();    
             workerGroup.shutdownGracefully();  
        }  
    }
    
    
    public static void main(String[] args) throws Exception {    
        new RpcRegistry(8080).start();    
    }    
}  

RegistryHandler的代码如下,主要实现服务注册和服务调用的功能,服务注册很简单,构造方法里面扫描本地class,放到map中保存,然后反射调用接口,真正dubbo当然是通过配置文件获取,
服务调用必须要继承ChannelInboundHandlerAdapter类,重写channelRead方法,这个方法就是客户端远程调用时触发的方法

package com.wei.rpc.registry;

import com.wei.rpc.protocol.InvokerProtocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.io.File;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

public class RegistryHandler  extends ChannelInboundHandlerAdapter {

    //用保存所有可用的服务
    public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap<String,Object>();

    //保存所有相关的服务类
    private List<String> classNames = new ArrayList<String>();
    
    public RegistryHandler(){
        //完成递归扫描
        scannerClass("com.wei.rpc.provider");
        doRegister();
    }

    /**
     * 客户端远程调用时触发
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Object result = new Object();
        InvokerProtocol request = (InvokerProtocol)msg;

        //当客户端建立连接时,需要从自定义协议中获取信息,拿到具体的服务和实参
        //使用反射调用
        if(registryMap.containsKey(request.getClassName())){ 
            Object clazz = registryMap.get(request.getClassName());
            Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParames());    
            result = method.invoke(clazz, request.getValues());   
        }
        ctx.write(result);  
        ctx.flush();    
        ctx.close();  
    }

    /**
     * 调用异常时触发
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {    
         cause.printStackTrace();    
         ctx.close();    
    }
    

    /*
     * 递归扫描
     */
    private void scannerClass(String packageName){
        URL url = this.getClass().getClassLoader().getResource(packageName.replaceAll("\\.", "/"));
        File dir = new File(url.getFile());
        for (File file : dir.listFiles()) {
            //如果是一个文件夹,继续递归
            if(file.isDirectory()){
                scannerClass(packageName + "." + file.getName());
            }else{
                classNames.add(packageName + "." + file.getName().replace(".class", "").trim());
            }
        }
    }

    /**
     * 完成注册
     */
    private void doRegister(){
        if(classNames.size() == 0){ return; }
        for (String className : classNames) {
            try {
                Class<?> clazz = Class.forName(className);
                Class<?> i = clazz.getInterfaces()[0];
                registryMap.put(i.getName(), clazz.newInstance());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
  
}

以上,注册中心的基本功能就已完成,下面来写客户端的代码实现。

梳理一下基本的实现思路,主要完成一个这样的功能:API 模块中的接口功能在服务端实现(并没有在客户端实现)。
因此,客户端调用 API 中定义的某一个接口方法时,实际上是要发起一次网络请求去调用服务端的某一个服务。而这个网络请求首先被注册中心接收,由注册中心先确定需要调用的服务的位置,再将请求转发至真实的服务实现,最终调用服务端代码,将返回值通过网络传输给客户端。整个过程对于客户端而言是完全无感知的,就像调用本地方法一样。具体调用过程如下图所示:


image.png

所以先要创建的代理类,目前就用jdk的动态代理实现,代码如下:

package com.wei.rpc.consumer.proxy;

import com.wei.rpc.protocol.InvokerProtocol;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

public class RpcProxy {  
    
    public static <T> T create(Class<?> clazz){
        //clazz传进来本身就是interface
        MethodProxy proxy = new MethodProxy(clazz);
        Class<?> [] interfaces = clazz.isInterface() ?
                                new Class[]{clazz} :
                                clazz.getInterfaces();
        T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,proxy);
        return result;
    }

    private static class MethodProxy implements InvocationHandler {
        private Class<?> clazz;
        public MethodProxy(Class<?> clazz){
            this.clazz = clazz;
        }

        public Object invoke(Object proxy, Method method, Object[] args)  throws Throwable {
            //如果传进来是一个已实现的具体类(本次演示略过此逻辑)
            if (Object.class.equals(method.getDeclaringClass())) {
                try {
                    return method.invoke(this, args);
                } catch (Throwable t) {
                    t.printStackTrace();
                }
                //如果传进来的是一个接口(核心)
            } else {
                return rpcInvoke(proxy,method, args);
            }
            return null;
        }


        /**
         * 实现接口的核心方法
         * @param method
         * @param args
         * @return
         */
        public Object rpcInvoke(Object proxy,Method method,Object[] args){

            //传输协议封装
            InvokerProtocol msg = new InvokerProtocol();
            msg.setClassName(this.clazz.getName());
            msg.setMethodName(method.getName());
            msg.setValues(args);
            msg.setParames(method.getParameterTypes());

            //接受服务端的数据处理类
            final RpcProxyHandler consumerHandler = new RpcProxyHandler();
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                //自定义协议解码器
                                /** 入参有5个,分别解释如下
                                 maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
                                 lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
                                 lengthFieldLength:长度字段的长度:如:长度字段是int型表示,那么这个值就是4(long型就是8)
                                 lengthAdjustment:要添加到长度字段值的补偿值
                                 initialBytesToStrip:从解码帧中去除的第一个字节数
                                 */
                                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                                //自定义协议编码器
                                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                                //对象参数类型编码器
                                pipeline.addLast("encoder", new ObjectEncoder());
                                //对象参数类型解码器
                                pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                pipeline.addLast("handler",consumerHandler);
                            }
                        });

                ChannelFuture future = b.connect("localhost", 8080).sync();
                future.channel().writeAndFlush(msg).sync();
                future.channel().closeFuture().sync();
            } catch(Exception e){
                e.printStackTrace();
            }finally {
                group.shutdownGracefully();
            }
            return consumerHandler.getResponse();
        }

    }
}

你会看到netty的客户端代码和服务端代码差不多,也是有一个核心处理类consumerHandler,进去看下也是一样继承ChannelInboundHandlerAdapter类,实现两个方法,不过我们为了拿到返回值,所以多加一个getResponse方法

package com.wei.rpc.consumer.proxy;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class RpcProxyHandler extends ChannelInboundHandlerAdapter {  
      
    private Object response;    
      
    public Object getResponse() {    
        return response;    
    }

    /**
     * 服务端数据处理后,会调用此方法
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override    
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    
        response = msg;
    }

    /**
     * 调用异常时触发
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override    
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {    
        System.out.println("client exception is general");    
    }    
} 

代码全部完成,先运行下注册中心 RpcRegistry类,运行结果:


image.png

再运行客户端调用代码:

package com.wei.rpc.consumer;

import com.wei.rpc.api.IRpcHelloService;
import com.wei.rpc.consumer.proxy.RpcProxy;

public class RpcConsumer {

    public static void main(String[] args) {
        //  IRpcHelloService rpcHello = new RpcHelloServiceImpl(); 不能直接new
        IRpcHelloService rpcHello = RpcProxy.create(IRpcHelloService.class); //用rpc的方式,调用到远程的服务
        System.out.println(rpcHello.hello("wei"));
    }

}

运行结果如下,大功告成


image.png

——学自咕泡学院

相关文章

网友评论

      本文标题:Netty源码系列2--基于 Netty 手写DUBBO 框架

      本文链接:https://www.haomeiwen.com/subject/zxkdqctx.html