美文网首页
dubbo源码6-transporter-netty4

dubbo源码6-transporter-netty4

作者: modou1618 | 来源:发表于2019-01-05 21:57 被阅读0次

    transporter层支持netty,mina,http等协议。本文介绍基于netty4的实现。

    一 NettyClient

    1.1 类关系图

    NettyClient类关系图.png

    1.2 初始化

    1.2.1 NettyClient初始化

        public Client connect(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyClient(url, listener);
        }
    
        public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
            super(url, wrapChannelHandler(url, handler));
        }
    
        protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
            url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
            url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
            return ChannelHandlers.wrap(handler, url);
        }
    
        protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
            return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                    .getAdaptiveExtension().dispatch(handler, url)));
        }
    
    • 设置threadpool属性默认为cache
    • handler封装

    1.2.2 AbstractClient初始化

    • send.reconnect配置支持发送时重连断开的连接
    • 初始化netty client,配置channel属性,配置编解码和收包函数,配置通信事件分发器
    • 保存收包异步处理线程池,停机时做销毁处理
    protected void doOpen() throws Throwable {
            final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
            bootstrap = new Bootstrap();
            bootstrap.group(nioEventLoopGroup)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                    .channel(NioSocketChannel.class);
    
            if (getConnectTimeout() < 3000) {
                bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
            } else {
                bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
            }
    
            bootstrap.handler(new ChannelInitializer() {
    
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("handler", nettyClientHandler);
                }
            });
        }
    
    • 启动线程池定时调度任务检查channel连接状态,对断开的连接进行重连
    • 和服务端建立tcp连接
      ChannelFuture future = bootstrap.connect(getConnectAddress());

    1.2.3 AbstractEndpoint初始化

    • 保存连接超时和通信超时

    1.2.4 AbstractPeer初始化

    • 保存MultiMessageHandler,netty通信时调用handler处理对应通信事件

    1.3 收包handler介绍

    1.3.1 MultiMessageHandler

    • 如果一次收包包含多个消息时,会解码成MultiMessage批量消息类型。
    • 对批量消息,遍历依次调用下层收包处理函数。
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof MultiMessage) {
                MultiMessage list = (MultiMessage) message;
                for (Object obj : list) {
                    handler.received(channel, obj);
                }
            } else {
                handler.received(channel, message);
            }
        }
    

    1.3.2 HeartbeatHandler

    心跳报文处理,见exchanger层介绍

    1.3.3 异步线程池收包处理

    • 根据dispather配置选择异步收包处理函数。
    配置值 说明
    all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,⼼跳等。
    direct 所有消息都不派发到线程池,全部在 IO 线程上直接执⾏。
    message 只有请求响应消息派发到线程池。其它连接断开事件,⼼跳等消息,直接在 IO 线程上执⾏。
    execution 只有请求消息派发到线程池。响应和其它连接断开事件,⼼跳等消息,直接在 IO 线程上执⾏。
    connection 一个单线程线程池,处理连接断开事件,connect.queue.capacity配置指定可缓存的连接请求最大数量。其它消息派发到另一个线程池。
    • WrappedChannelHandler根据threadpool配置创建线程池。queues属性值可设置等待blockqueue的长度。
    配置值 说明
    fixed 固定⼤⼩线程池,new ThreadPoolExecutor(200, 200, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url))
    cached 缓存线程池。new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60*1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url))
    limited 最大线程数量线程池。new ThreadPoolExecutor(0, 200, Long.MAX_VALUE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url))
    • 异步线程执行任务ChannelEventRunnable
      根据channel的状态调用handler的不同接口函数,这里调用的exchange层的DecodeHandler。
    public enum ChannelState {
            CONNECTED,
            DISCONNECTED,
            SENT,[图片上传中...(image.png-b4f324-1546695374507-0)]
    
            RECEIVED,
            CAUGHT
        }
    

    1.3.4 收包handler完整层次图

    收包handler.png

    二 NettyServer

    2.1 类关系图

    NettyServer类.png

    2.2 初始化

    2.2.1 NettyServer初始化

    • 封装收包处理函数
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyServer(url, listener);
        }
    
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
            super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
        }
    

    2.2.2 AbstractServer初始化

    • 配置连接请求事件分发器bossGroup,通信事件分发器workerGroup
    • 配置channel属性
    • 配置编解码函数和收包处理函数
    • 创建netty server
    protected void doOpen() throws Throwable {
            bootstrap = new ServerBootstrap();
    
            bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
            workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                    new DefaultThreadFactory("NettyServerWorker", true));
    
            final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
            channels = nettyServerHandler.getChannels();
    
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                    .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                    .addLast("decoder", adapter.getDecoder())
                                    .addLast("encoder", adapter.getEncoder())
                                    .addLast("handler", nettyServerHandler);
                        }
                    });
            // bind
            ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
            channelFuture.syncUninterruptibly();
            channel = channelFuture.channel();
    
        }
    

    2.3 收包处理

    收包处理函数与NettyClient流程相同。

    相关文章

      网友评论

          本文标题:dubbo源码6-transporter-netty4

          本文链接:https://www.haomeiwen.com/subject/yctrrqtx.html