美文网首页
使用netty手写一个简单的RPC框架

使用netty手写一个简单的RPC框架

作者: 守住阳光 | 来源:发表于2018-08-15 15:48 被阅读0次

            RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。

            RPC 可基于 HTTP 或 TCP 协议,Web Service 就是基于 HTTP 协议的 RPC,它具有良好的跨平台性,但其性能却不如基于 TCP 协议的 RPC。会两方面会直接影响 RPC 的性能,一是传输方式,二是序列化。

            众所周知,TCP 是传输层协议,HTTP 是应用层协议,而传输层较应用层更加底层,在数据传输方面,越底层越快,因此,在一般情况下,TCP 一定比 HTTP 快。就序列化而言,Java 提供了默认的序列化方式,但在高并发的情况下,这种方式将会带来一些性能上的瓶颈,于是市面上出现了一系列优秀的序列化框架,比如:Protobuf、Kryo、Hessian、Jackson 等,它们可以取代 Java 默认的序列化,从而提供更高效的性能。 

            下面是简单实现的基于netty的RPC调用。

    一、首先定义消息传递的实体类

    public class ClassInfo  implements  Serializable{

            private static final long serialVersionUID = -8970942815543515064L; 

            private String className;//类名     

            private String methodName;//函数名称      

            private Class[] types;//参数类型        

            private Object[] objects;//参数列表        

            public String getClassName() { 

                    return className; 

            } 

            public void setClassName(String className) { 

                    this.className = className; 

            } 

            public String getMethodName() { 

                    return methodName; 

            } 

            public void setMethodName(String methodName) { 

                    this.methodName = methodName; 

             } 

             public Class[] getTypes() { 

                    return types; 

              } 

             public void setTypes(Class[] types) { 

                   this.types = types; 

            } 

            public Object[] getObjects() { 

                    return objects; 

            } 

            public void setObjects(Object[] objects) { 

                 this.objects = objects; 

            } 

    }

    二、创建Netty操作的服务端,以及具体操作 

    1. 服务端

    public class RPCServer {

             private int port; 

             public RPCServer(int port){ 

                     this.port = port; 

             } 

             public void start(){ 

                      EventLoopGroup bossGroup = new NioEventLoopGroup();

                      EventLoopGroup workerGroup = new NioEventLoopGroup();

                     try { 

                         ServerBootstrap serverBootstrap = new ServerBootstrap().

                        group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .localAddress(port).

                        childHandler(new ChannelInitializer() {

                                @Override

                                protected void initChannel(SocketChannel ch) throws Exception { 

                                    ChannelPipeline pipeline = ch.pipeline();   

                                    pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));   

                                    pipeline.addLast(new LengthFieldPrepender(4));   

                                    pipeline.addLast("encoder", new ObjectEncoder());     

                                    pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));   

                                    pipeline.addLast(new InvokerHandler()); 

                            } 

                        }).option(ChannelOption.SO_BACKLOG, 128)     

                        .childOption(ChannelOption.SO_KEEPALIVE, true); 

                        ChannelFuture future = serverBootstrap.bind(port).sync();     

                         System.out.println("Server start listen at " + port );   

                        future.channel().closeFuture().sync();   

            } catch (Exception e) { 

                bossGroup.shutdownGracefully();   

                workerGroup.shutdownGracefully(); 

            } 

        } 

        public static void main(String[] args) throws Exception {   

            int port;   

            if (args.length > 0) {   

                port = Integer.parseInt(args[0]);   

            } else {   

                port = 8899;   

            }   

            new RPCServer(port).start();   

        }   

    }

    2、服务端操作

            由服务端我们看到具体的数据传输操作是进行序列化的,具体的操作还是比较简单的,就是获取发送过来的信息,这样就可以通过反射获得类名,根据函数名和参数值,执行具体的操作,将执行结果发送给客户端。

    public class InvokerHandler extends ChannelInboundHandlerAdapter {

         public static ConcurrentHashMapclassMap = new ConcurrentHashMap();

         @Override   

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {   

            ClassInfo classInfo = (ClassInfo)msg; 

            Object claszz = null; 

            if(!classMap.containsKey(classInfo.getClassName())){ 

                try { 

                    claszz = Class.forName(classInfo.getClassName()).newInstance(); 

                    classMap.put(classInfo.getClassName(), claszz); 

                } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { 

                    e.printStackTrace(); 

                } 

            }else { 

                claszz = classMap.get(classInfo.getClassName()); 

            } 

            Method method = claszz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());   

            Object result = method.invoke(claszz, classInfo.getObjects()); 

            ctx.write(result); 

            ctx.flush();   

            ctx.close(); 

        } 

        @Override   

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {   

            cause.printStackTrace();   

            ctx.close();   

        }   

    }

    三、客户端,通过代理机制来触发远程调用

    1、客户端

            当执行具体的函数时会调用远程操作,将具体操作的类、函数及参数信息发送到服务端

    public class RPCProxy{ 

         @SuppressWarnings("unchecked")

         public staticT create(final Object target){

                 return (T) Proxy.newProxyInstance(target.getClass().getClassLoader(),target.getClass().getInterfaces(), 

                        new InvocationHandler(){ 

                             @Override

                             public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

                                 ClassInfo classInfo = new ClassInfo(); 

                                 classInfo.setClassName(target.getClass().getName());

                                 classInfo.setMethodName(method.getName()); 

                                 classInfo.setObjects(args); 

                                 classInfo.setTypes(method.getParameterTypes());

                                 final ResultHandler resultHandler = new ResultHandler();

                                 EventLoopGroup group = new NioEventLoopGroup(); 

                                 try {

                                         Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class)                                     .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() {

                                                 @Override   

                                                  public void initChannel(SocketChannel ch) throws Exception {   

                                                ChannelPipeline pipeline = ch.pipeline();   

                                                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));   

                                                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));   

                                                pipeline.addLast("encoder", new ObjectEncoder());     

                                                pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));   

                                                pipeline.addLast("handler",resultHandler); 

                                   }   

                        });   

                        ChannelFuture future = b.connect("localhost", 8899).sync();   

                        future.channel().writeAndFlush(classInfo).sync(); 

                        future.channel().closeFuture().sync();   

                    } finally {   

                        group.shutdownGracefully();   

                    } 

                    return resultHandler.getResponse(); 

                } 

            }); 

        } 

    }

    2、获取远程调用返回的结果值

    public  class  ResultHandler  extends  ChannelInboundHandlerAdapter{

            private  Object response;

            public   Object  getResponse() {

                 return  response;    

            }

            @Override

            public  void  channelRead(ChannelHandlerContext ctx, Object msg)throwsException {           

                     response=msg;            

                    System.out.println("client接收到服务器返回的消息:"+ msg);       

             }

            @Override

            public  void  exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {  

                  System.out.println("client exception is general");       

            }    

    }

    四、 接口、实现类及Main操作

    public interface HelloRpc { 

         String hello(String name);

     }

    public  class  HelloRpcImpl  implements  HelloRpc{

            @Override

            public  String  hello(String name) {

                return"hello "+name;     

             }  

    }

    public class NettyRpcMain {

        public static void main(String[] args){

            HelloRpc helloRpc = new HelloRpcImpl();

            HelloRpc echo = RPCProxy.create(helloRpc);

            System.out.println(echo.hello("这是我的第一个手写rpc!"));

        }

    }

    相关文章

      网友评论

          本文标题:使用netty手写一个简单的RPC框架

          本文链接:https://www.haomeiwen.com/subject/rzdwbftx.html