美文网首页
netty网络编程-7.RPC

netty网络编程-7.RPC

作者: 笨鸡 | 来源:发表于2020-04-01 03:07 被阅读0次

    NettyRPCServer

    package com.ctgu.netty.rpc.serverStub;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.serialization.ClassResolvers;
    import io.netty.handler.codec.serialization.ObjectDecoder;
    import io.netty.handler.codec.serialization.ObjectEncoder;
    
    public class NettyRPCServer {
    
        private int port;
    
        public NettyRPCServer(int port) {
            this.port = port;
        }
    
        public void start() {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .localAddress(port)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast("encoder", new ObjectEncoder());//编码器
                                pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,
                                        ClassResolvers.cacheDisabled(null)));//解码器
                                pipeline.addLast(new InvokeHandler());
                            }
                        });
                ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
                System.out.println("-------server is ready------");
                channelFuture.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) {
            NettyRPCServer nettyRPCServer = new NettyRPCServer(9999);
            nettyRPCServer.start();
        }
    }
    

    InvokeHandler

    package com.ctgu.netty.rpc.serverStub;
    
    import com.ctgu.netty.rpc.entity.ClassInfo;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import org.reflections.Reflections;
    
    import java.lang.reflect.Method;
    import java.util.Set;
    
    public class InvokeHandler extends ChannelInboundHandlerAdapter {
    
    
        /**
         * 得到某个接口下某个实现类的名字
         *
         * @param classInfo
         * @return
         * @throws Exception
         */
        private String getImplClassName(ClassInfo classInfo) throws Exception {
            //服务方接口和实现类所在包路径
            String interfacePath = "com.ctgu.netty.rpc.server";
            int lastDot = classInfo.getClassName().lastIndexOf(".");
            String interfaceName = classInfo.getClassName().substring(lastDot);
            Class superaClass = Class.forName(interfacePath + interfaceName);
            //得到某接口下所有实现类
            Reflections reflections = new Reflections(interfacePath);
            Set<Class> implClassSet = reflections.getSubTypesOf(superaClass);
            if (implClassSet.size() == 0) {
                System.out.println("未找到实现类");
                return null;
            } else if (implClassSet.size() > 1) {
                System.out.println("找到多个实现类,未明确使用哪一个");
                return null;
            } else {
                //把集合转化成数组
                Class[] classes = implClassSet.toArray(new Class[0]);
                return classes[0].getName();//得到实现类的名称
            }
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ClassInfo classInfo = (ClassInfo) msg;
            Object object = Class.forName(getImplClassName(classInfo)).newInstance();
            Method method = object.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
            //通过反射调用实现类的方法
            Object result = method.invoke(object, classInfo.getObjects());
            ctx.writeAndFlush(result);
        }
    }
    

    NettyRPCProxy

    package com.ctgu.netty.rpc.clientStub;
    
    import com.ctgu.netty.rpc.entity.ClassInfo;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.serialization.ClassResolvers;
    import io.netty.handler.codec.serialization.ObjectDecoder;
    import io.netty.handler.codec.serialization.ObjectEncoder;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    
    public class NettyRPCProxy {
    
        //根据接口创建代理对象
        public static Object create(Class target) {
    
            return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    //封装ClassInfo
                    ClassInfo classInfo = new ClassInfo();
                    classInfo.setClassName(target.getName());
                    classInfo.setMethodName(method.getName());
                    classInfo.setObjects(args);
                    classInfo.setTypes(method.getParameterTypes());
    
                    //开始用Netty发送数据
                    EventLoopGroup group = new NioEventLoopGroup();
                    ResultHandler resultHandler = new ResultHandler();
                    try {
                        Bootstrap bootstrap = new Bootstrap();
                        bootstrap.group(group)
                                .channel(NioSocketChannel.class)
                                .handler(new ChannelInitializer<SocketChannel>() {
    
                                    @Override
                                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                                        ChannelPipeline pipeline = socketChannel.pipeline();
                                        pipeline.addLast("encoder", new ObjectEncoder());//编码器
                                        pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,
                                                ClassResolvers.cacheDisabled(null)));//解码器
                                        pipeline.addLast(resultHandler);
                                    }
                                });
                        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
                        channelFuture.channel().writeAndFlush(classInfo).sync();
                        channelFuture.channel().closeFuture().sync();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        group.shutdownGracefully();
                    }
                    return resultHandler.getResponse();
                }
            });
        }
    }
    

    ResultHandler

    package com.ctgu.netty.rpc.clientStub;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class ResultHandler extends ChannelInboundHandlerAdapter {
    
        private Object response;
    
        public Object getResponse() {
            return response;
        }
    
        //读取客户端返回的数据(远程调用的结果)
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            response = msg;
            ctx.close();
        }
    }
    

    相关文章

      网友评论

          本文标题:netty网络编程-7.RPC

          本文链接:https://www.haomeiwen.com/subject/vrgnuhtx.html