最近对公司的监控系统Cat从源码角度去做了一下了解。为方便后续学习Cat的同学做一下借鉴,现尝试从Java客户端的初始化流程、Java客户端消息发送、消息协议、服务端消息分发等几个方面来浅析Cat源码。有不对的地方,望大家多多指正!
备注:
Cat版本:v3.0.0
github:https://github.com/dianping/cat
本系列博客图片主要来自官方说明文档。ps: 人家图画的太清晰了
Java客户端的初始化流程
一、消息的组织
Cat使用消息树(MessageTree)组织日志,下面为消息树的类定义
image.png
我们每次操作的实体都是消息树,其中有个domain字段,这是cat中一个非常重要的概念,一个domain可以对应成一个project( 比如bluewhale-app),每个消息树拥有一个唯一的MessageId, 不同的消息树(比如微服务中A服务调用B服务,A,B都会生成消息树) 通过 parenMessageId、rootMessageId 串联起来,消息树下的所有实体都是Message,一共有5种类型的Message, 分别是Transaction, Event, Trace, Metric和Heartbeat。
二、Java客户端的初始化
客户端操作对象Cat封装了所有的接口。下面通过上报一个Transaction类型的消息来了解客户端的初始化流程。
class Cat{
public static Transaction newTransaction(String type, String name) {
return Cat.getProducer().newTransaction(type, name);
}
public static MessageProducer getProducer() {
checkAndInitialize();
return s_instance.m_producer;
}
private static void checkAndInitialize() {
initialize(new File(getCatHome(), "client.xml"));
}
public static void initialize(File configFile) {
//IOC容器
PlexusContainer container = ContainerLoader.getDefaultContainer();
ModuleContext ctx = new DefaultModuleContext(container);
//主要逻辑在CatClientModule.class
Module module = ctx.lookup(Module.class, CatClientModule.ID);
if (!module.isInitialized()) {
ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);
ctx.setAttribute("cat-client-config-file", configFile);
initializer.execute(ctx, module);
}
}
}
创建transaction首先会通过getProducer函数获取消息生产者MessageProducer对象,在返回MessageProducer对象之前,函数会对客户端进行初始化。初始化plexus容器,调用CatClientModule的excute方法。
class CatClientModule {
@Override
protected void execute(final ModuleContext ctx) throws Exception {
...
// bring up TransportManager
ctx.lookup(TransportManager.class);
ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class);
if (clientConfigManager.isCatEnabled()) {
// start status update task
StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);
Threads.forGroup("cat").start(statusUpdateTask);
}
}
}
上面代码主要需要关注两个点:
1、DefaultTransportManager的实例化,最终启动Netty Tcp客户端。
2、启动一个StatusUpdateTask线程每隔一段时间发送一个HeartBeatMessage,其中包括了客户端能拿到的各种信息,包括CPU,Memory,Disk等等,通过访问者模式采集这些信息到StatusInfo。之后将status封装到HeartBeatMessage中,按照一般对于message的处理流程,flush到消息传输层中。
接下来我们来看Netty Tcp客户端的启动。
class DefaultTransportManager {
@Inject
private TcpSocketSender m_tcpSocketSender;
public void initialize() {
...
List<Server> servers = m_configManager.getServers();
List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
for (Server server : servers) {
addresses.add(new InetSocketAddress(server.getIp(), server.getPort()));
}
m_tcpSocketSender.initialize(addresses);
...
}
}
上面代码根据server配置信息去初始化TcpSocketSender。
class TcpSocketSender {
private MessageQueue m_queue = new DefaultMessageQueue(SIZE);
public void initialize(List<InetSocketAddress> addresses) {
//初始化netty客户端,建立TCP连接。
m_channelManager = new ChannelManager(m_logger, addresses, m_configManager, m_factory);
//启动一个线程从消息队列获取消息并上报
Threads.forGroup("cat").start(this);
//监听server配置信息变化,重连
Threads.forGroup("cat").start(m_channelManager);
}
//从消息队列获取消息并上报
public void run() {
m_active = true;
while (true) {
MessageTree tree = m_queue.poll();
if (tree != null) {
ChannelFuture channel = m_channelManager.channel();
if (channel != null) {
sendInternal(channel, tree);
} else {
offer(tree);
}
} else {
break;
}
}
}
}
上面代码主要干了两件事,实例化ChannelManager,初始化netty客户端,建立TCP长连接。启动一个线程执行run()函数从消息队列获取消息并上报。
class ChannelManager {
public ChannelManager(List<InetSocketAddress>serverAddresses) {
EventLoopGroup group = new NioEventLoopGroup(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
}
});
m_bootstrap = bootstrap;
...
initChannel(configedAddresses, routerConfig);
...
}
private ChannelFuture createChannel(InetSocketAddress address) {
...
ChannelFuture future = m_bootstrap.connect(address);
...
}
}
最后,来一张图。
image.png
网友评论