美文网首页
dubbo笔记-remoting(2)Netty实现

dubbo笔记-remoting(2)Netty实现

作者: 兴浩 | 来源:发表于2018-08-07 21:27 被阅读6次

    接上篇
    dubbo笔记-remoting(1)

    1.Channel

    1.1 AbstractChannel

    AbstractChannel基本没做什么

    public abstract class AbstractChannel extends AbstractPeer implements Channel {
    
        public AbstractChannel(URL url, ChannelHandler handler) {
            super(url, handler);
        }
    
        @Override
        public void send(Object message, boolean sent) throws RemotingException {
            if (isClosed()) {
                throw new RemotingException(this, "Failed to send message "
                        + (message == null ? "" : message.getClass().getName()) + ":" + message
                        + ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress());
            }
        }
    
        @Override
        public String toString() {
            return getLocalAddress() + " -> " + getRemoteAddress();
        }
    
    }
    

    1.2 NettyChannel

    其内部实现由io.netty.channel.Channel完成,attribute则由attributes完成

    final class NettyChannel extends AbstractChannel {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyChannel.class);
    
        private final Channel channel;
    
        private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();
    
        private NettyChannel(Channel channel, URL url, ChannelHandler handler) {
            super(url, handler);
            if (channel == null) {
                throw new IllegalArgumentException("netty channel == null;");
            }
            this.channel = channel;
        }
    }
    

    2.Server

    2.1 AbstractServer

    AbstractServer实现了默认的Server接口,并从URL中获取了很多必要的信息,构造函数就是一个模板方法,将doOpen方法留给子类实现

    public abstract class AbstractServer extends AbstractEndpoint implements Server {
    
        protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
        private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
        ExecutorService executor;
        private InetSocketAddress localAddress;
        private InetSocketAddress bindAddress;
        private int accepts;
        private int idleTimeout = 600; //600 seconds
    
        public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
            super(url, handler);
            localAddress = getUrl().toInetSocketAddress();
    
            String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
            int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
            if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
                bindIp = NetUtils.ANYHOST;
            }
            bindAddress = new InetSocketAddress(bindIp, bindPort);
            this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
            this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
            try {
                doOpen();
                if (logger.isInfoEnabled()) {
                    logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
                }
            } catch (Throwable t) {
                throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                        + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
            }
            //fixme replace this with better method
            DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
            executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
        }
    }
    

    2.2 NettyServer

    NettyServer是真正启动Netty的Server端的流程

    public class NettyServer extends AbstractServer implements Server {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    
        private Map<String, Channel> channels; // <ip:port, channel>
    
        private ServerBootstrap bootstrap;
    
        private io.netty.channel.Channel channel;
    
        private EventLoopGroup bossGroup;
        private EventLoopGroup workerGroup;
    
        public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
            super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
        }
    
        @Override
        protected void doOpen() throws Throwable {
            NettyHelper.setNettyLoggerFactory();
    
            bootstrap = new ServerBootstrap();
    
            bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
            workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                    new DefaultThreadFactory("NettyServerWorker", true));
    
            final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
            channels = nettyServerHandler.getChannels();
    
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                    .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                    .addLast("decoder", adapter.getDecoder())
                                    .addLast("encoder", adapter.getEncoder())
                                    .addLast("handler", nettyServerHandler);
                        }
                    });
            // bind
            ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
            channelFuture.syncUninterruptibly();
            channel = channelFuture.channel();
    
        }
    }
    

    3.Client

    3.1 AbstractClient

    类似AbstractServer,AbstractClient也是一个模板类, doOpen方法留给子类实现

    public abstract class AbstractClient extends AbstractEndpoint implements Client {
    
        protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";
        private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
        private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger();
        private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true));
        private final Lock connectLock = new ReentrantLock();
        private final boolean send_reconnect;
        private final AtomicInteger reconnect_count = new AtomicInteger(0);
        // Reconnection error log has been called before?
        private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false);
        // reconnect warning period. Reconnect warning interval (log warning after how many times) //for test
        private final int reconnect_warning_period;
        private final long shutdown_timeout;
        protected volatile ExecutorService executor;
        private volatile ScheduledFuture<?> reconnectExecutorFuture = null;
        // the last successed connected time
        private long lastConnectedTime = System.currentTimeMillis();
    
    
        public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
            super(url, handler);
    
            send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
    
            shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
    
            // The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
            reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
    
            try {
                doOpen();
            } catch (Throwable t) {
                close();
                throw new RemotingException(url.toInetSocketAddress(), null,
                        "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                                + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
            }
            try {
                // connect.
                connect();
                if (logger.isInfoEnabled()) {
                    logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
                }
            } catch (RemotingException t) {
                if (url.getParameter(Constants.CHECK_KEY, true)) {
                    close();
                    throw t;
                } else {
                    logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                            + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
                }
            } catch (Throwable t) {
                close();
                throw new RemotingException(url.toInetSocketAddress(), null,
                        "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                                + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
            }
    
            executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
                    .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
            ExtensionLoader.getExtensionLoader(DataStore.class)
                    .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
        }
    }
    

    3.2 NettyClient

    同样NettyClient也是Netty的Client实现

    public class NettyClient extends AbstractClient {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
    
        private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
    
        private Bootstrap bootstrap;
    
        private volatile Channel channel; // volatile, please copy reference to use
    
        public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
            super(url, wrapChannelHandler(url, handler));
        }
    
        @Override
        protected void doOpen() throws Throwable {
            NettyHelper.setNettyLoggerFactory();
            final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
            bootstrap = new Bootstrap();
            bootstrap.group(nioEventLoopGroup)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                    .channel(NioSocketChannel.class);
    
            if (getTimeout() < 3000) {
                bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
            } else {
                bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
            }
    
            bootstrap.handler(new ChannelInitializer() {
    
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("handler", nettyClientHandler);
                }
            });
        }
    }
    

    4.相关适配器

    dubbo定义了很多的适配器,都是通过构造函数传入参数,内部实现均是代理实现

    public class ClientDelegate implements Client {
    
        private transient Client client;
    
        public ClientDelegate() {
        }
    
        public ClientDelegate(Client client) {
            setClient(client);
        }
    
        public Client getClient() {
            return client;
        }
    
        public void setClient(Client client) {
            if (client == null) {
                throw new IllegalArgumentException("client == null");
            }
            this.client = client;
        }
    
        @Override
        public void reset(URL url) {
            client.reset(url);
        }
    
        @Override
        @Deprecated
        public void reset(com.alibaba.dubbo.common.Parameters parameters) {
            reset(getUrl().addParameters(parameters.getParameters()));
        }
    
        @Override
        public URL getUrl() {
            return client.getUrl();
        }
    
        @Override
        public InetSocketAddress getRemoteAddress() {
            return client.getRemoteAddress();
        }
    
        @Override
        public void reconnect() throws RemotingException {
            client.reconnect();
        }
    }
    ...
    

    相关文章

      网友评论

          本文标题:dubbo笔记-remoting(2)Netty实现

          本文链接:https://www.haomeiwen.com/subject/tjczvftx.html