美文网首页Dubbo
Netty在dubbo中的应用浅析

Netty在dubbo中的应用浅析

作者: 南宋临安府 | 来源:发表于2018-03-10 20:10 被阅读0次

    “上善若水,水善利万物而不争。
    处众人所恶,故几於道。
    居善地,心善渊,与善仁,言善信,正善治,事善能,动善时。
    夫唯不争,故无尤。”[1]

    Netty是一款高性能的网络传输框架,在各类中间件和分布式框架中都能见到它的身影。今天就从dubbo的源码中说一说netty,netty本质上是负责网络传输,网络传输自然离不开socket,socket是端到端的连接。dobbo采用的是无中心化,每个client端都能与server端连接,每个client端同时也可以作为server端。
    Dubbo的client端主要实现AbstractClient,NettyClient扩展继承了它。一般来说对于同一个server端来说(ip和port相同),只有一个client实例对应,也就是dubbo所说的共享连接。
    从DubboProtocol类实现可以找到:

        private ExchangeClient[] getClients(URL url) {
            // whether to share connection
            boolean service_share_connect = false;
            int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
            // if not configured, connection is shared, otherwise, one connection for one service
            if (connections == 0) {
                service_share_connect = true;
                connections = 1;
            }
    
            ExchangeClient[] clients = new ExchangeClient[connections];
            for (int i = 0; i < clients.length; i++) {
                if (service_share_connect) {
                    clients[i] = getSharedClient(url);
                } else {
                    clients[i] = initClient(url);
                }
            }
            return clients;
        }
    

    NettyClient 打开连接:

      protected void doOpen() throws Throwable {
           NettyHelper.setNettyLoggerFactory();
           final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
           bootstrap = new Bootstrap();
           bootstrap.group(nioEventLoopGroup)
                   .option(ChannelOption.SO_KEEPALIVE, true)
                   .option(ChannelOption.TCP_NODELAY, true)
                   .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                   //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                   .channel(NioSocketChannel.class);
    
           if (getTimeout() < 3000) {
               bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
           } else {
               bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
           }
    
           bootstrap.handler(new ChannelInitializer() {
    
               protected void initChannel(Channel ch) throws Exception {
                   NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                   ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                           .addLast("decoder", adapter.getDecoder())
                           .addLast("encoder", adapter.getEncoder())
                           .addLast("handler", nettyClientHandler);
               }
           });
       }
    

    dubbo为了实现对Channel的抽象,不依赖Netty的实现,自己设计了Channel类,而NettyChannel只不过是dubbo的Channel其中一种实现而已。NettyChannel类保存了一个静态变量channelMap,这个是map型变量。原生的Channel和dubbo定制化的NettyChannel一对一对应绑定起来。
    NettyChannel:

    private static final ConcurrentMap<Channel, NettyChannel> channelMap = new ConcurrentHashMap<Channel, NettyChannel>();
    

    一对一绑定实现:

    static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) {
            if (ch == null) {
                return null;
            }
            NettyChannel ret = channelMap.get(ch);
            if (ret == null) {
                NettyChannel nettyChannel = new NettyChannel(ch, url, handler);
                if (ch.isActive()) {
                    ret = channelMap.putIfAbsent(ch, nettyChannel);
                }
                if (ret == null) {
                    ret = nettyChannel;
                }
            }
            return ret;
        }
    

    NettyHandler是对ChannelHandler一层封装。ChannelHandler大量采用装饰器模式和委托模式,这类似Java中的IO中Stream。通过装饰器模式使得ChannelHandler具有解码,统计,分发等等功能。最里层ExchangeHandler是DubboProtocol类中的内部类。reply方法看起来不来,主要做了2件事:获取对应的Invoker,执行invoke调用。

    
    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
    
            public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    Invocation inv = (Invocation) message;
                    Invoker<?> invoker = getInvoker(channel, inv);
                    // need to consider backward-compatibility if it's a callback
                    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                        String methodsStr = invoker.getUrl().getParameters().get("methods");
                        boolean hasMethod = false;
                        if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                            hasMethod = inv.getMethodName().equals(methodsStr);
                        } else {
                            String[] methods = methodsStr.split(",");
                            for (String method : methods) {
                                if (inv.getMethodName().equals(method)) {
                                    hasMethod = true;
                                    break;
                                }
                            }
                        }
                        if (!hasMethod) {
                            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                                    + " not found in callback service interface ,invoke will be ignored."
                                    + " please update the api interface. url is:"
                                    + invoker.getUrl()) + " ,invocation is :" + inv);
                            return null;
                        }
                    }
                    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                    return invoker.invoke(inv);
                }
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }
    
            @Override
            public void received(Channel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    reply((ExchangeChannel) channel, message);
                } else {
                    super.received(channel, message);
                }
            }
    
            @Override
            public void connected(Channel channel) throws RemotingException {
                invoke(channel, Constants.ON_CONNECT_KEY);
            }
    
            @Override
            public void disconnected(Channel channel) throws RemotingException {
                if (logger.isInfoEnabled()) {
                    logger.info("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
                }
                invoke(channel, Constants.ON_DISCONNECT_KEY);
            }
    
            private void invoke(Channel channel, String methodKey) {
                Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
                if (invocation != null) {
                    try {
                        received(channel, invocation);
                    } catch (Throwable t) {
                        logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                    }
                }
            }
    
            private Invocation createInvocation(Channel channel, URL url, String methodKey) {
                String method = url.getParameter(methodKey);
                if (method == null || method.length() == 0) {
                    return null;
                }
                RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
                invocation.setAttachment(Constants.PATH_KEY, url.getPath());
                invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
                invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
                invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
                if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
                    invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
                }
                return invocation;
            }
        };
    

    NettyHandler继承了SimpleChannelHandler,是我们最需要关注和设计的类,因为它是Netty提供开发者最有控制权的类。任何依赖Netty的框架都需要定制化NettyHandler类。dubbo也不例外,对NettyHandler进行了大量抽象和封装,使其能满足自身功能的需要。

    接下来会在下一篇中详细解读dubbo源码来分析发布服务时如何启动socket监听,启动netty服务。


    1. 老子《道德经》第八章,老子故里,中国鹿邑。

    相关文章

      网友评论

        本文标题:Netty在dubbo中的应用浅析

        本文链接:https://www.haomeiwen.com/subject/uxubfftx.html