美文网首页
dubbo源码6-transporter-netty4

dubbo源码6-transporter-netty4

作者: modou1618 | 来源:发表于2019-01-05 21:57 被阅读0次

transporter层支持netty,mina,http等协议。本文介绍基于netty4的实现。

一 NettyClient

1.1 类关系图

NettyClient类关系图.png

1.2 初始化

1.2.1 NettyClient初始化

    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

    public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
        super(url, wrapChannelHandler(url, handler));
    }

    protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
        url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
        url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
        return ChannelHandlers.wrap(handler, url);
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
  • 设置threadpool属性默认为cache
  • handler封装

1.2.2 AbstractClient初始化

  • send.reconnect配置支持发送时重连断开的连接
  • 初始化netty client,配置channel属性,配置编解码和收包函数,配置通信事件分发器
  • 保存收包异步处理线程池,停机时做销毁处理
protected void doOpen() throws Throwable {
        final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
        bootstrap = new Bootstrap();
        bootstrap.group(nioEventLoopGroup)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                .channel(NioSocketChannel.class);

        if (getConnectTimeout() < 3000) {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
        } else {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
        }

        bootstrap.handler(new ChannelInitializer() {

            @Override
            protected void initChannel(Channel ch) throws Exception {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                        .addLast("decoder", adapter.getDecoder())
                        .addLast("encoder", adapter.getEncoder())
                        .addLast("handler", nettyClientHandler);
            }
        });
    }
  • 启动线程池定时调度任务检查channel连接状态,对断开的连接进行重连
  • 和服务端建立tcp连接
    ChannelFuture future = bootstrap.connect(getConnectAddress());

1.2.3 AbstractEndpoint初始化

  • 保存连接超时和通信超时

1.2.4 AbstractPeer初始化

  • 保存MultiMessageHandler,netty通信时调用handler处理对应通信事件

1.3 收包handler介绍

1.3.1 MultiMessageHandler

  • 如果一次收包包含多个消息时,会解码成MultiMessage批量消息类型。
  • 对批量消息,遍历依次调用下层收包处理函数。
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof MultiMessage) {
            MultiMessage list = (MultiMessage) message;
            for (Object obj : list) {
                handler.received(channel, obj);
            }
        } else {
            handler.received(channel, message);
        }
    }

1.3.2 HeartbeatHandler

心跳报文处理,见exchanger层介绍

1.3.3 异步线程池收包处理

  • 根据dispather配置选择异步收包处理函数。
配置值 说明
all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,⼼跳等。
direct 所有消息都不派发到线程池,全部在 IO 线程上直接执⾏。
message 只有请求响应消息派发到线程池。其它连接断开事件,⼼跳等消息,直接在 IO 线程上执⾏。
execution 只有请求消息派发到线程池。响应和其它连接断开事件,⼼跳等消息,直接在 IO 线程上执⾏。
connection 一个单线程线程池,处理连接断开事件,connect.queue.capacity配置指定可缓存的连接请求最大数量。其它消息派发到另一个线程池。
  • WrappedChannelHandler根据threadpool配置创建线程池。queues属性值可设置等待blockqueue的长度。
配置值 说明
fixed 固定⼤⼩线程池,new ThreadPoolExecutor(200, 200, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url))
cached 缓存线程池。new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60*1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url))
limited 最大线程数量线程池。new ThreadPoolExecutor(0, 200, Long.MAX_VALUE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url))
  • 异步线程执行任务ChannelEventRunnable
    根据channel的状态调用handler的不同接口函数,这里调用的exchange层的DecodeHandler。
public enum ChannelState {
        CONNECTED,
        DISCONNECTED,
        SENT,[图片上传中...(image.png-b4f324-1546695374507-0)]

        RECEIVED,
        CAUGHT
    }

1.3.4 收包handler完整层次图

收包handler.png

二 NettyServer

2.1 类关系图

NettyServer类.png

2.2 初始化

2.2.1 NettyServer初始化

  • 封装收包处理函数
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

2.2.2 AbstractServer初始化

  • 配置连接请求事件分发器bossGroup,通信事件分发器workerGroup
  • 配置channel属性
  • 配置编解码函数和收包处理函数
  • 创建netty server
protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

2.3 收包处理

收包处理函数与NettyClient流程相同。

相关文章

网友评论

      本文标题:dubbo源码6-transporter-netty4

      本文链接:https://www.haomeiwen.com/subject/yctrrqtx.html