美文网首页
fescar源码分析-AbstractRpcRemotingCl

fescar源码分析-AbstractRpcRemotingCl

作者: do_young | 来源:发表于2019-02-12 15:59 被阅读43次

    fescar源码分析-AbstractRpcRemotingClient中主要从代码结构上介绍了AbstractRpcRemotingClient,但从功能上来说AbstractRpcRemotingClient类主要实现了RPC客户端的实例创建。

    • 1.通过构造函数初始化RPC客户端的相关配置。
    • 2.通过init初始化创建RPC客户端的必要属性。
    • 3.start配置RPC客户端并启动服务。
    • 4.shutdown关闭RPC客户端服务。

    AbstractRpcRemotingClient是使用Netty的Bootstrap类引导创建一个RPC客户端,利用无连接协议和在调用 bind() 或 connect() 之后。下图展示了如何工作

    Figure%209
    1. 当 bind() 调用时,Bootstrap 将创建一个新的管道, 当 connect() 调用在 Channel 来建立连接
    2. Bootstrap 将创建一个新的管道, 当 connect() 调用时
    3. 新的 Channel

    要使用Netty创建PRC的客户端,需要实现以下逻辑:

    Bootstrap bootstrap = new Bootstrap(); //1
    bootstrap.group(new NioEventLoopGroup()) //2
        .channel(NioSocketChannel.class) //3
        .option(ChannelOption.TCP_NODELAY, true)//4
        .handler(new SimpleChannelInboundHandler<ByteBuf>() { //5
            @Override
            protected void channeRead0(
                ChannelHandlerContext channelHandlerContext,
                ByteBuf byteBuf) throws Exception {
                    System.out.println("Received data");
                    byteBuf.clear();
                }
            });
    ChannelFuture future = bootstrap.connect(
        new InetSocketAddress("www.manning.com", 80)); //6
    
    • 1.创建一个新的 Bootstrap 来创建和连接到新的客户端管道
    • 2.指定 EventLoopGroup
    • 3.指定 Channel 实现来使用
    • 4.设置 客户端属性
    • 5.设置Handler初始化Channel 的Pipline,设置事件中对数据的处理逻辑。
    • 6.连接到远端主机

    下面看一下AbstractRpcRemotingClient的具体实现:

    创建Bootstrap

    AbstractRpcRemotingClient类中定义了一个Bootstrap成员属性,并在子类创建实例对象的时候及创建实例。

    private final Bootstrap bootstrap = new Bootstrap();
    

    指定 EventLoopGroup

    在构造函数中,根据配置文件,初始化EventLoopGroup。
    其中wrokerGroupSelector线程数大小及名称可以通过配置文数的参数配置。

    this.eventLoopGroupWorker = new NioEventLoopGroup(
        selectorThreadSizeThreadSize, 
        new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), 
            selectorThreadSizeThreadSize));
    

    在start()方法中将EventLoopGroup实例设置在Bootstrap属性中

    this.bootstrap.group(this.eventLoopGroupWorker)
    

    指定Channel

    在start()方法中将根据配置文件及默认参数设置客户端的通道类。

    .channel(nettyClientConfig.getClientChannelClazz())//
    

    设置客户端属性

    .option(ChannelOption.TCP_NODELAY, true)//
    .option(ChannelOption.SO_KEEPALIVE, true)//
    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
    .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
    .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
    
    ......
      bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
             .option(EpollChannelOption.TCP_QUICKACK, true);
    

    设置Handler

    首先根据配置参数判断是否使用通道池,如果使用则创建一个通道池,并初始化对应的Hadnler

            if (nettyClientConfig.isUseConnPool()) {
                clientChannelPool = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {//1
                    @Override
                    protected FixedChannelPool newPool(InetSocketAddress key) {//2
                        FixedChannelPool fixedClientChannelPool = new FixedChannelPool(bootstrap.remoteAddress(key),//3
                                new DefaultChannelPoolHandler() {
                                    @Override
                                    public void channelCreated(Channel ch) throws Exception {//4
                                        super.channelCreated(ch);
                                        final ChannelPipeline pipeline = ch.pipeline();
                                        pipeline.addLast(defaultEventExecutorGroup,//5
                                                new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
                                                        nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                                                        nettyClientConfig.getChannelMaxAllIdleSeconds()));
                                        pipeline.addLast(defaultEventExecutorGroup, new RpcClientHandler());
                                    }
                                }, ChannelHealthChecker.ACTIVE, AcquireTimeoutAction.FAIL,//6
                                nettyClientConfig.getMaxAcquireConnMills(), nettyClientConfig.getPerHostMaxConn(),
                                nettyClientConfig.getPendingConnSize(), false);
                        return fixedClientChannelPool;
    
                    }
                };
            }
    
    • 1.继承AbstractChannelPoolMap,创建一个匿名类。
    • 2.重载newPool方法,创建一个固定大小的通道池。
    • 3.根据端口,即通道池处理器(ChannelPoolHandler)实例创建通道池。
    • 4.创建通道池处理器实例,重载channelCreated方法,实现创建通道实例的逻辑。
    • 5.在新创建的通道实例中设置Handler(重点是RpcClientHandler),并用初始化的线程池来执行pipline中的业务逻辑。
    • 6.设置其它参数
      大致情况如下图所示:


      image.png

    否则按下面逻辑初始化Handler,

     else {
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {//1
                    @Override
                    public void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();//2
                        pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
                                nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                                nettyClientConfig.getChannelMaxAllIdleSeconds()))
                        .addLast(new MessageCodecHandler());//添加消息加解码器
                        if (null != channelHandlers) {
                            addChannelPipelineLast(ch, channelHandlers);
                        }
                    }
                });
            }
    
    • 1.创建ChannelInitializer实例
    • 2.在pipline中添加Handler,主要是MessageCodecHandler
      1. 添加子类配置的其它Handler

    相关文章

      网友评论

          本文标题:fescar源码分析-AbstractRpcRemotingCl

          本文链接:https://www.haomeiwen.com/subject/pyksjqtx.html