美文网首页
Netty高级篇

Netty高级篇

作者: 贪挽懒月 | 来源:发表于2020-08-22 17:29 被阅读0次

    本文将用netty实现一个简单的RPC框架。

    一、什么叫RPC?

    RPC,远程调用,就是A程序部署在1号机器上,B程序部署在2号机器上,A可以像调本地方法一样地去调用B程序,而不需要程序员额外地编写这个交互过程,这就叫RPC远程调用。dubbo、Ribbon、openfeign都是RPC框架。

    二、RPC调用过程

    • 请求:服务消费者发送请求 ---> 编码 ---> 通过网络发送 ---> 服务提供者接收请求 ---> 解码 ---> 处理请求
    • 响应:服务提供者处理完请求进行响应 ---> 编码 ---> 通过网络发送 ---> 消费者接收响应 ---> 解码 ---> 处理响应

    这两个流程中,我们只需要关心发送请求和处理响应即可,中间那些过程对程序员来说都是透明的。所以,要实现RPC框架,要处理的就是封装中间那些步骤。


    欢迎大家关注我的公众号 javawebkf,目前正在慢慢地将简书文章搬到公众号,以后简书和公众号文章将同步更新,且简书上的付费文章在公众号上将免费。


    三、实现思路

    假如现在我们需要调用服务提供者HelloServiceImpl的hello方法,调用流程如下:

    • 客户端创建代理对象,通过Netty,远程连接服务端,将参数发送过去。
    • 服务端收到参数,传给hello方法,接收到返回值,再发送给客户端。

    四、代码实操

    1、HelloService接口:

    public interface HelloService {
        String hello(String msg);
    }
    

    2、服务端:

    • HelloServiceImpl:
    public class HelloServiceImpl implements HelloService {
        @Override
        public String hello(String msg) {
            if (StringUtils.isNotBlank(msg)) {
                return"收到客户端消息:[" + msg + "]";
            }
            else return "收到客户端消息";
        }
    }
    
    • NettyServerHandler:
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("msg = " + msg);
            if (msg.toString().startsWith("HelloService#hello#")){
                String param = msg.toString().substring(msg.toString().lastIndexOf("#") + 1);
                String result = new HelloServiceImpl().hello(param);
                ctx.writeAndFlush(result);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            super.exceptionCaught(ctx, cause);
        }
    }
    
    • NettyServer:
    public class NettyServer {
    
        public static void startService(String host, int port){
            startServer0(host, port);
        }
    
        private static void startServer0(String host, int port){
            EventLoopGroup bossgroup = new NioEventLoopGroup(1);
            EventLoopGroup workergroup = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossgroup, workergroup)
                               .channel(NioServerSocketChannel.class)
                               .childHandler(new ChannelInitializer<SocketChannel>() {
                                   @Override
                                   protected void initChannel(SocketChannel socketChannel) throws Exception {
                                       ChannelPipeline pipeline = socketChannel.pipeline();
                                       pipeline.addLast(new StringDecoder());
                                       pipeline.addLast(new StringEncoder());
                                       pipeline.addLast(new NettyServerHandler());
                                   }
                               });
                ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync();
                System.out.println("provider is in active");
                channelFuture.channel().closeFuture().sync();
            } catch (Exception e){
                e.printStackTrace();
            } finally {
                bossgroup.shutdownGracefully();
                workergroup.shutdownGracefully();
            }
        }
    }
    
    • ServerBootStrap
    public class ServerBootStrap {
        public static void main(String[] args){
            NettyServer.startService("127.0.0.1", 8848);
        }
    }
    

    3、客户端:

    • NettyClientHandler:
    /**
     * 流程:业务代码远程调用某个方法,被代理对象拦截,进而执行call方法,
     * call方法请求服务端,然后在waiting着,服务器处理完请求将响应返回给另外一个方法,
     * 即read方法,read方法收到响应后,所以就应该唤醒call方法中等待的线程,继续往下执行。
     * @author: zhu
     * @date: 2020/8/22 16:09
     */
    public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    
        private ChannelHandlerContext context;
        private String result;
        private String param;
    
        /**
         * 被代理对象调用,发送数据给服务器,然后等待被唤醒
         * @return
         * @throws Exception
         */
        @Override
        public synchronized Object call() throws Exception {
            // 将参数发送给服务端
            context.writeAndFlush(param);
            // 发送之后就等待被唤醒
            wait();
            // 被唤醒的时候read方法已经拿到result了,这里直接return即可
            return result;
        }
    
        /**
         * 与服务器连接创建成功后被调用
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            context = ctx;
        }
    
        /**
         * 收到服务器数据后被调用
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            result = msg.toString();
            notify(); // 唤醒call方法
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
           ctx.close();
        }
    
        public void setParam(String param) {
            this.param = param;
        }
    }
    
    • NettyClient :
    public class NettyClient {
    
        private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
        private static NettyClientHandler clientHandler;
    
        /**
         * 初始化
         */
        private static void initClient(){
            clientHandler = new NettyClientHandler();
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(clientHandler);
                        }
                    });
            try {
                bootstrap.connect("127.0.0.1", 8848).sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public Object getBean(final Class<?> serviceClass, final String providerName){
            return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                    new Class<?>[] {serviceClass}, (proxy, method, args) -> {
               if (clientHandler == null){
                   initClient();
               }
               // 设置要发给服务器端的参数
               clientHandler.setParam(providerName + args[0]);
               return executorService.submit(clientHandler).get();
            });
        }
    }
    
    • ClientBootStrap:
    public class ClientBootStrap {
    
        // 定义协议头
        private static final String providerName = "HelloService#hello#";
    
        public static void main(String[] args){
            // 创建消费者
            NettyClient customer = new NettyClient();
            HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);
            String result = service.hello("扣你吉瓦");
            System.out.println(result);
        }
    }
    

    先启动ServerBootStrap,然后启动ClientBootStrap去调用服务端的hello方法,最后可以成功拿到返回结果。

    相关文章

      网友评论

          本文标题:Netty高级篇

          本文链接:https://www.haomeiwen.com/subject/sfevjktx.html