美文网首页
美团Cat源码浅析(一)Java客户端初始化流程

美团Cat源码浅析(一)Java客户端初始化流程

作者: 李亚林1990 | 来源:发表于2018-11-23 11:06 被阅读64次

    最近对公司的监控系统Cat从源码角度去做了一下了解。为方便后续学习Cat的同学做一下借鉴,现尝试从Java客户端的初始化流程、Java客户端消息发送、消息协议、服务端消息分发等几个方面来浅析Cat源码。有不对的地方,望大家多多指正!
    备注:
    Cat版本:v3.0.0
    github:https://github.com/dianping/cat
    本系列博客图片主要来自官方说明文档。ps: 人家图画的太清晰了

    Java客户端的初始化流程
    一、消息的组织
    Cat使用消息树(MessageTree)组织日志,下面为消息树的类定义


    image.png

    我们每次操作的实体都是消息树,其中有个domain字段,这是cat中一个非常重要的概念,一个domain可以对应成一个project( 比如bluewhale-app),每个消息树拥有一个唯一的MessageId, 不同的消息树(比如微服务中A服务调用B服务,A,B都会生成消息树) 通过 parenMessageId、rootMessageId 串联起来,消息树下的所有实体都是Message,一共有5种类型的Message, 分别是Transaction, Event, Trace, Metric和Heartbeat。
    二、Java客户端的初始化
    客户端操作对象Cat封装了所有的接口。下面通过上报一个Transaction类型的消息来了解客户端的初始化流程。

    class Cat{
        public static Transaction newTransaction(String type, String name) {
            return Cat.getProducer().newTransaction(type, name);
        }
    
        public static MessageProducer getProducer() {
            checkAndInitialize();
            return s_instance.m_producer;
        }
    
        private static void checkAndInitialize() {
            initialize(new File(getCatHome(), "client.xml"));
        }
    
        public static void initialize(File configFile) {
            //IOC容器
            PlexusContainer container = ContainerLoader.getDefaultContainer();
            ModuleContext ctx = new DefaultModuleContext(container);
            //主要逻辑在CatClientModule.class
            Module module = ctx.lookup(Module.class, CatClientModule.ID);
    
            if (!module.isInitialized()) {
                ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);
                ctx.setAttribute("cat-client-config-file", configFile);
                initializer.execute(ctx, module);
            }
        }
    }
    

    创建transaction首先会通过getProducer函数获取消息生产者MessageProducer对象,在返回MessageProducer对象之前,函数会对客户端进行初始化。初始化plexus容器,调用CatClientModule的excute方法。

    class CatClientModule {
        @Override
        protected void execute(final ModuleContext ctx) throws Exception {
        ...
            // bring up TransportManager
            ctx.lookup(TransportManager.class);
    
            ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class);
    
            if (clientConfigManager.isCatEnabled()) {
                // start status update task
                StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);
                Threads.forGroup("cat").start(statusUpdateTask);
            }
        }
    }
    

    上面代码主要需要关注两个点:
    1、DefaultTransportManager的实例化,最终启动Netty Tcp客户端。
    2、启动一个StatusUpdateTask线程每隔一段时间发送一个HeartBeatMessage,其中包括了客户端能拿到的各种信息,包括CPU,Memory,Disk等等,通过访问者模式采集这些信息到StatusInfo。之后将status封装到HeartBeatMessage中,按照一般对于message的处理流程,flush到消息传输层中。
    接下来我们来看Netty Tcp客户端的启动。

    class DefaultTransportManager {
        @Inject
        private TcpSocketSender m_tcpSocketSender;
        public void initialize() {
            ...
            List<Server> servers = m_configManager.getServers();
            List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
            for (Server server : servers) {
                addresses.add(new InetSocketAddress(server.getIp(), server.getPort()));
            }
            m_tcpSocketSender.initialize(addresses);
            ...
        }
    }
    

    上面代码根据server配置信息去初始化TcpSocketSender。

    class TcpSocketSender {
        private MessageQueue m_queue = new DefaultMessageQueue(SIZE);
        public void initialize(List<InetSocketAddress> addresses) {
            //初始化netty客户端,建立TCP连接。
            m_channelManager = new ChannelManager(m_logger, addresses, m_configManager, m_factory);
            //启动一个线程从消息队列获取消息并上报
            Threads.forGroup("cat").start(this);
            //监听server配置信息变化,重连
            Threads.forGroup("cat").start(m_channelManager);
        }
        //从消息队列获取消息并上报
        public void run() {
            m_active = true;
            while (true) {
                MessageTree tree = m_queue.poll();
                if (tree != null) {
                    ChannelFuture channel = m_channelManager.channel();
                    if (channel != null) {
                        sendInternal(channel, tree);
                    } else {
                        offer(tree);
                    }
                } else {
                    break;
                }
            }
        }
    }
    

    上面代码主要干了两件事,实例化ChannelManager,初始化netty客户端,建立TCP长连接。启动一个线程执行run()函数从消息队列获取消息并上报。

    class ChannelManager {
        public ChannelManager(List<InetSocketAddress>serverAddresses) {
            EventLoopGroup group = new NioEventLoopGroup(1, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setDaemon(true);
                    return t;
                }
            });
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.handler(new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel ch) throws Exception {
                }
            });
            m_bootstrap = bootstrap;
            ...
            initChannel(configedAddresses, routerConfig);
            ...
        }
        private ChannelFuture createChannel(InetSocketAddress address) {
              ...
            ChannelFuture future = m_bootstrap.connect(address);
            ...
        }
    }
    

    最后,来一张图。


    image.png

    相关文章

      网友评论

          本文标题:美团Cat源码浅析(一)Java客户端初始化流程

          本文链接:https://www.haomeiwen.com/subject/nqzdqqtx.html