一起学RPC(一)

作者: MR丿VINCENT | 来源:发表于2018-08-02 16:43 被阅读17次

    在上一篇中废话了很多“大概”不相关的东西。而这篇就要认认真真的开始讲干货了(也有可能是水货,谁知道呢)。

    上一篇文章主要介绍了与rpc中间件相关的但联系不是很大的spring xml标签的自定义实现。可以说是没有太多核心的东西,全文中的关键字就是“抄”。没错,只要有官方文档,什么都能照着抄。实在不行,对着源码的实现也能抄一把。联系到目前的工作中,也是复制粘贴一把梭。不得不说现在的编码要求是越来越低了。

    总所周知,rpc顾名思义是远程过程调用,所谓的远程就是不在一个机器上。因此机器与机器之间的可靠通信可以说是rpc的基础设施了。那么本文的重点就是深入剖析这个基础设施的具体实现(的其中一部分,其余的还没认真看)。

    在jupiter中,对这些基础设施的设计可算是下了一番功夫的。至少我看明白花了一点时间的。在jupiter的代码组织中,将网络传输这一块单独整成一个模块。很多开源项目也是这样做的,算是中规中矩了。

    structure-2018-08-01-15-55-21.jpg

    同时,为了以后的拓展,传输层还定义了一个高层次的抽象模块api。然后根据自己的喜好可以自由去切换传输层实现。这里默认只有基于Netty的实现。如果想添加Mina的实现也很容易,添加Mina依赖然后实现api中的接口就行了。

    接下来就仔细探索一下基于Netty的服务端的实现细节。

    抽象接口

    jupiter的服务端层次结构十分简单。继承关系也很清晰。这张图很清晰的描述了继承关系:


    acceptor-2018-08-01-15-50-40.jpg

    通过命名可以体现出来这些抽象类或接口的含义。我想写代码的最高境界就是能做到变量名能恰如其分的表达其功用。

    public interface Transporter {
    
        /**
         * Returns the transport protocol
         */
        Protocol protocol();
    
        /**
         * 传输层协议.
         */
        enum Protocol {
            TCP,
            DOMAIN  // Unix domain socket
        }
    }
    
    

    最高层次的接口仅仅只定义了一个方法,返回到底使用的是什么协议。这里可选的只有TCP或者DOMAIN。关于tcp无需多言,但是这个unix domain socket就不是那么常见了。简单来讲就是用于机器内的通信,不是机器间的通信。具体使用场景我问了一下作者feng.jc,他回复了一个词:service mesh.然后就没有然后了。对此咱暂且不管。

    接下来就是比较细化的一个接口了。

    public interface JAcceptor extends Transporter {
    
        /**
         * 绑定的地址
         */
        SocketAddress localAddress();
    
        /**
         * 绑定的端口.
         */
        int boundPort();
    
        /**
         * Acceptor options [parent, child].
         */
        JConfigGroup configGroup();
    
        /**
         * 返回rpc处理器 
         */
        ProviderProcessor processor();
    
        /**
         * 设置ProviderProcessor 也就是实际的业务逻辑全部由这个东西处理
         */
        void withProcessor(ProviderProcessor processor);
    
        /**
         * Start the server and wait until the server socket is closed.
         * 默认调用start(true)
         */
        void start() throws InterruptedException;
    
        /**
         * Start the server.
         */
        void start(boolean sync) throws InterruptedException;
    
        /**
         * Shutdown the server gracefully.
         */
        void shutdownGracefully();
    }
    
    

    这个接口也很清晰简单。符合一般的思路。接下来就是这些接口的抽象实现。

    抽象实现

    在走读抽象实现逻辑之前,有必要看看如果要直接启动这个transporter该怎么做。

    public static void main(String[] args) throws InterruptedException {
            JAcceptor acceptor = new JNettyTcpAcceptor(9999);
            acceptor.start();
        }
    

    不得不说是非常简单。但是背后的工作可谓是非常多。

    JNettyTcpAcceptor是最底层的实现类。在实例化的时候会传入参数端口号,这点无可厚非毫无争议。不传也是可以的,因为构造器有重载,会传入默认端口号18090。而实际上是去调用的父类的构造器。父类构造器的重载方法很多,就贴出一个全参数的重载实现,其余的请自行脑补。

    public NettyTcpAcceptor(SocketAddress localAddress, int nBosses, int nWorkers, boolean isNative) {
            super(Protocol.TCP, localAddress, nBosses, nWorkers);
            this.isNative = isNative;
            init();
        }
    

    然而恶心心的是这个构造器也去调用父类的构造函数。对于聪明的人来说这都不是事儿。

    public NettyAcceptor(Protocol protocol, SocketAddress localAddress, int nBosses, int nWorkers) {
            this.protocol = protocol;
            this.localAddress = localAddress;
            this.nBosses = nBosses;
            this.nWorkers = nWorkers;
        }
    

    值得一提的仅仅只有后面两个参数。顾名思义代表的是boss的线程数和worker的线程数。如果对netty很熟悉这点就不需要解释太多。然后就是init()方法了。这个init方法的核心实现实际上是在顶层父类中完成的。

    protected void init() {
            ThreadFactory bossFactory = bossThreadFactory("jupiter.acceptor.boss");
            ThreadFactory workerFactory = workerThreadFactory("jupiter.acceptor.worker");
            boss = initEventLoopGroup(nBosses, bossFactory);
            worker = initEventLoopGroup(nWorkers, workerFactory);
    
            bootstrap = new ServerBootstrap().group(boss, worker);
    
            // parent options
            JConfig parent = configGroup().parent();
            parent.setOption(JOption.IO_RATIO, 100);
    
            // child options
            JConfig child = configGroup().child();
            child.setOption(JOption.IO_RATIO, 100);
        }
    

    这段代码做了3件事。创建了boss和worker;实例化了ServerBootstrap;把参数配置起来了。仅仅只做了这些事情,很符合抽象类的风格。需要细化的操作请继承,然后自定义实现,爱咋咋地。反正最后肯定会去调用的子类实现,前提是别把我全部覆盖掉,增量去拓展就行。

    说了这么多,实际上抽象实现就是对server的“大致”实现。具体的定制得交给子类完成。

    具体实现

    在上面的demo中实例化的一定是一个具体子类。子类通过一系列父类中的初始化方法完成了前期的准备工作:tcp参数设置、boss和worker的设置等。而正真开启一个server的方法是start()

        @Override
        public void start() throws InterruptedException {
            start(true);
        }
    
        @Override
        public void start(boolean sync) throws InterruptedException {
            // wait until the server socket is bind succeed.
            ChannelFuture future = bind(localAddress).sync();
    
            if (logger.isInfoEnabled()) {
                logger.info("Jupiter TCP server start" + (sync ? ", and waits until the server socket closed." : ".")
                        + JConstants.NEWLINE + " {}.", toString());
            }
    
            if (sync) {
                // wait until the server socket is closed.
                future.channel().closeFuture().sync();
            }
        }
    

    start()方法只是入口,核心是bind().

        @Override
        public ChannelFuture bind(SocketAddress localAddress) {
            ServerBootstrap boot = bootstrap();
    
            initChannelFactory();
    
            boot.childHandler(new ChannelInitializer<Channel>() {
    
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(
                            new IdleStateChecker(timer, JConstants.READER_IDLE_TIME_SECONDS, 0, 0),
                            idleStateTrigger,
                            CodecConfig.isCodecLowCopy() ? new LowCopyProtocolDecoder() : new ProtocolDecoder(),
                            encoder,
                            handler);
                }
            });
    
            setOptions();
    
            return boot.bind(localAddress);
        }
        
        protected void initChannelFactory() {
            SocketChannelProvider.SocketType socketType = socketType();
            switch (socketType) {
                case NATIVE_EPOLL:
                    bootstrap().channelFactory(SocketChannelProvider.NATIVE_EPOLL_ACCEPTOR);
                    break;
                case NATIVE_KQUEUE:
                    bootstrap().channelFactory(SocketChannelProvider.NATIVE_KQUEUE_ACCEPTOR);
                    break;
                case JAVA_NIO:
                    bootstrap().channelFactory(SocketChannelProvider.JAVA_NIO_ACCEPTOR);
                    break;
                default:
                    throw new IllegalStateException("Invalid socket type: " + socketType);
            }
        }
    

    不得不说,bind方法层次也很清晰。其中调用了一个initChannelFactory()方法,其实没有什么高深莫测的地方。简单理解为和下面的代码类似:

    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
    

    到此为止,整个server的启动流程就结束了。整个流程十分干净,没有任何涉及到业务的地方。可能稍微有一点和业务沾边的地方就是编解码器。这个的确是完全耦合到这个acceptor中去的,也就是说如果你想单纯的去用这个acceptor是不行的。因为只能针对特定的网络数据格式进行处理。但是针对这个项目而言是没有任何问题的,我想也没有人会仅仅去用其中的acceptor,再说也不是提供给开发者用的,这是给自己用的。

    当然,其中的比较核心的东西没有去分析。因为实在是很复杂。我打算采用抽丝剥茧的方式将其逐步细化,毕竟害怕贪多嚼不烂。接下来要讨论的是jupiter的业务编解码器的实现。

    相关文章

      网友评论

        本文标题:一起学RPC(一)

        本文链接:https://www.haomeiwen.com/subject/qedsvftx.html