本篇将从TcpSocketReceiver接收消息、解码消费开始来分析Cat服务端消息分发。
备注:
Cat版本:v3.0.0
github:https://github.com/dianping/cat
public final class TcpSocketReceiver implements LogEnabled {
public void init() {
try {
startServer(m_port);
} catch (Throwable e) {
m_logger.error(e.getMessage(), e);
}
}
public synchronized void startServer(int port) throws InterruptedException {
boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");
int threads = 24;
ServerBootstrap bootstrap = new ServerBootstrap();
m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
bootstrap.group(m_bossGroup, m_workerGroup);
bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decode", new MessageDecoder());
pipeline.addLast("encode", new ClientMessageEncoder());
}
});
bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
try {
m_future = bootstrap.bind(port).sync();
m_logger.info("start netty server!");
} catch (Exception e) {
m_logger.error("Started Netty Server Failed:" + port, e);
}
}
}
MessageDecoder的祖先类实现了ChannelInboundHandler接口,是网络IO事件具体处理类,当客户端将日志数据上传到服务器之后,会交给MessageDecoder 解码数据,然后进行后续处理。
class MessageDecoder {
@Inject
private MessageHandler m_handler;
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) {
int length = buffer.readInt();
ByteBuf readBytes = buffer.readBytes(length + 4);
DefaultMessageTree tree = (DefaultMessageTree) CodecHandler.decode(readBytes);
tree.setBuffer(readBytes);
m_handler.handle(tree);
}
}
class DefaultMessageHandler {
public void handle(MessageTree tree) {
if (m_consumer == null) {
m_consumer = lookup(MessageConsumer.class);
}
m_consumer.consume(tree);
}
}
消息解码完成后最终调用MessageConsumer.consume方法传递给消费方。
class RealtimeConsumer {
@Inject
private MessageAnalyzerManager m_analyzerManager;
@Override
public void consume(MessageTree tree) {
long timestamp = tree.getMessage().getTimestamp();
//找到对应周期
Period period = m_periodManager.findPeriod(timestamp);
if (period != null) {
//分发
period.distribute(tree);
} else {
m_serverStateManager.addNetworkTimeError(1);
}
}
@Override
public void initialize() throws InitializationException {
//初始化周期管理器
m_periodManager = new PeriodManager(HOUR, m_analyzerManager, m_serverStateManager, m_logger);
m_periodManager.init();
Threads.forGroup("cat").start(m_periodManager);
}
}
consumer()根据消息的时间戳找到对应的周期开始消息的分发。以下为消息分发流程图:
image.png
从图上可以看出:
1、每个周期为整点开始的一个小时,一个周期对应一系列周期任务;
2、每个周期任务对应一个线程,周期任务、线程、分析器一一对应;
3、每个类别的分析器数量可配,TransactionAnalyzer默认两个分析器。
首先看注入的分析管理器MessageAnalyzerManager的初始化逻辑:
class DefaultMessageAnalyzerManager {
//存放每个周期对应的分析器
private Map<Long, Map<String, List<MessageAnalyzer>>> m_analyzers = new HashMap<Long, Map<String, List<MessageAnalyzer>>>();
//根据周期时间和分析器类别,获取当前周期、该类别分析器所有实例
@Override
public List<MessageAnalyzer> getAnalyzer(String name, long startTime) {
、、、
//获取当前周期分析器
Map<String, List<MessageAnalyzer>> map = m_analyzers.get(startTime);
if (map == null) {
synchronized (m_analyzers) {
map = m_analyzers.get(startTime);
if (map == null) {
map = new HashMap<String, List<MessageAnalyzer>>();
m_analyzers.put(startTime, map);
}
}
}
//该类别分析器所有实例
List<MessageAnalyzer> analyzers = map.get(name);
if (analyzers == null) {
synchronized (map) {
analyzers = map.get(name);
if (analyzers == null) {
analyzers = new ArrayList<MessageAnalyzer>();
MessageAnalyzer analyzer = lookup(MessageAnalyzer.class, name);
analyzer.setIndex(0);
analyzer.initialize(startTime, m_duration, m_extraTime);
analyzers.add(analyzer);
//该分析器的实例数(除了TransactionAnalyzer默认为2,其他都是1)
int count = analyzer.getAnanlyzerCount(name);
for (int i = 1; i < count; i++) {
MessageAnalyzer tempAnalyzer = lookup(MessageAnalyzer.class, name);
tempAnalyzer.setIndex(i);
tempAnalyzer.initialize(startTime, m_duration, m_extraTime);
analyzers.add(tempAnalyzer);
}
map.put(name, analyzers);
}
}
}
return analyzers;
、、、
}
//所有有效的分析器
@Override
public void initialize() throws InitializationException {
Map<String, MessageAnalyzer> map = lookupMap(MessageAnalyzer.class);
m_analyzerNames = new ArrayList<String>(map.keySet());
、、、
//移除禁用的分析器
、、、
}
}
接下来来看initialize()方法中周期管理器的初始化逻辑。
class PeriodManager {
public void init() {
long startTime = m_strategy.next(System.currentTimeMillis());
startPeriod(startTime);
}
private void startPeriod(long startTime) {
long endTime = startTime + m_strategy.getDuration();
Period period = new Period(startTime, endTime, m_analyzerManager, m_serverStateManager, m_logger);
m_periods.add(period);
//启动一个周期的所有周期任务
period.start();
}
}
startPeriod()初始化当前周期,并启动当前周期的所有周期任务。
class Period {
public Period(long startTime, long endTime, MessageAnalyzerManager analyzerManager,
ServerStatisticManager serverStateManager, Logger logger) {
//每一种类别的分析器,都会有至少一个MessageAnalyzer的实例,每个MessageAnalyzer都由一个对应的PeriodTask来分配任务,MessageAnalyzer与PeriodTask是1对1的关系
List<MessageAnalyzer> messageAnalyzers = m_analyzerManager.getAnalyzer(name, startTime);
for (MessageAnalyzer analyzer : messageAnalyzers) {
//队列大小30000
MessageQueue queue = new DefaultMessageQueue(QUEUE_SIZE);
PeriodTask task = new PeriodTask(analyzer, queue, startTime);
List<PeriodTask> analyzerTasks = m_tasks.get(name);
if (analyzerTasks == null) {
analyzerTasks = new ArrayList<PeriodTask>();
m_tasks.put(name, analyzerTasks);
}
analyzerTasks.add(task);
}
}
//启动一个周期的所有周期任务
public void start() {
for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
List<PeriodTask> taskList = tasks.getValue();
for (int i = 0; i < taskList.size(); i++) {
PeriodTask task = taskList.get(i);
task.setIndex(i);
Threads.forGroup("Cat-RealtimeConsumer").start(task);
}
}
}
}
最后回过头来看RealTimeConsumer.consume()方法中Period.distribute(messageTree)消息分发逻辑:
public void distribute(MessageTree tree) {
for (Entry<String, List<PeriodTask>> entry : m_tasks.entrySet()) {
List<PeriodTask> tasks = entry.getValue();
int length = tasks.size();
int index = 0;
boolean manyTasks = length > 1;
if (manyTasks) {
index = Math.abs(domain.hashCode()) % length;
}
PeriodTask task = tasks.get(index);
boolean enqueue = task.enqueue(tree);
if (!enqueue) {
if (manyTasks) {
task = tasks.get((index + 1) % length);
enqueue = task.enqueue(tree);
if (!enqueue) {
success = false;
}
} else {
success = false;
}
}
}
}
遍历当前周期的所有周期任务,异步入队。
网友评论