美文网首页
美团Cat源码浅析(四)服务端消息分发

美团Cat源码浅析(四)服务端消息分发

作者: 李亚林1990 | 来源:发表于2018-11-23 11:35 被阅读39次

本篇将从TcpSocketReceiver接收消息、解码消费开始来分析Cat服务端消息分发。
备注:
Cat版本:v3.0.0
github:https://github.com/dianping/cat

public final class TcpSocketReceiver implements LogEnabled {
    public void init() {
        try {
            startServer(m_port);
        } catch (Throwable e) {
            m_logger.error(e.getMessage(), e);
        }
    }

    public synchronized void startServer(int port) throws InterruptedException {
        boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");
        int threads = 24;
        ServerBootstrap bootstrap = new ServerBootstrap();

        m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
        m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
        bootstrap.group(m_bossGroup, m_workerGroup);
        bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);

        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast("decode", new MessageDecoder());
                pipeline.addLast("encode", new ClientMessageEncoder());
            }
        });

        bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
        bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

        try {
            m_future = bootstrap.bind(port).sync();
            m_logger.info("start netty server!");
        } catch (Exception e) {
            m_logger.error("Started Netty Server Failed:" + port, e);
        }
    }
}

MessageDecoder的祖先类实现了ChannelInboundHandler接口,是网络IO事件具体处理类,当客户端将日志数据上传到服务器之后,会交给MessageDecoder 解码数据,然后进行后续处理。

class MessageDecoder {
    @Inject
    private MessageHandler m_handler;
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) {
        int length = buffer.readInt();
        ByteBuf readBytes = buffer.readBytes(length + 4);
        DefaultMessageTree tree = (DefaultMessageTree) CodecHandler.decode(readBytes);
        tree.setBuffer(readBytes);
        m_handler.handle(tree);
    }
}
class DefaultMessageHandler {
    public void handle(MessageTree tree) {
        if (m_consumer == null) {
            m_consumer = lookup(MessageConsumer.class);
        }
        m_consumer.consume(tree);
    }
}

消息解码完成后最终调用MessageConsumer.consume方法传递给消费方。

class RealtimeConsumer {
    @Inject
    private MessageAnalyzerManager m_analyzerManager;
    @Override
    public void consume(MessageTree tree) {
        long timestamp = tree.getMessage().getTimestamp();
        //找到对应周期
        Period period = m_periodManager.findPeriod(timestamp);
        if (period != null) {
            //分发
            period.distribute(tree);
        } else {
            m_serverStateManager.addNetworkTimeError(1);
        }
    }
    @Override
    public void initialize() throws InitializationException {
        //初始化周期管理器
        m_periodManager = new PeriodManager(HOUR, m_analyzerManager, m_serverStateManager, m_logger);
        m_periodManager.init();
        Threads.forGroup("cat").start(m_periodManager);
    }
}

consumer()根据消息的时间戳找到对应的周期开始消息的分发。以下为消息分发流程图:


image.png

从图上可以看出:
1、每个周期为整点开始的一个小时,一个周期对应一系列周期任务;
2、每个周期任务对应一个线程,周期任务、线程、分析器一一对应;
3、每个类别的分析器数量可配,TransactionAnalyzer默认两个分析器。
首先看注入的分析管理器MessageAnalyzerManager的初始化逻辑:

class DefaultMessageAnalyzerManager {
    //存放每个周期对应的分析器
    private Map<Long, Map<String, List<MessageAnalyzer>>> m_analyzers = new HashMap<Long, Map<String, List<MessageAnalyzer>>>();
    //根据周期时间和分析器类别,获取当前周期、该类别分析器所有实例
    @Override
    public List<MessageAnalyzer> getAnalyzer(String name, long startTime) {
        、、、
        //获取当前周期分析器
        Map<String, List<MessageAnalyzer>> map = m_analyzers.get(startTime);
        if (map == null) {
            synchronized (m_analyzers) {
                map = m_analyzers.get(startTime);
                if (map == null) {
                    map = new HashMap<String, List<MessageAnalyzer>>();
                    m_analyzers.put(startTime, map);
                }
            }
        }
        //该类别分析器所有实例
        List<MessageAnalyzer> analyzers = map.get(name);

        if (analyzers == null) {
            synchronized (map) {
                analyzers = map.get(name);
                if (analyzers == null) {
                    analyzers = new ArrayList<MessageAnalyzer>();
                    MessageAnalyzer analyzer = lookup(MessageAnalyzer.class, name);
                    analyzer.setIndex(0);
                    analyzer.initialize(startTime, m_duration, m_extraTime);
                    analyzers.add(analyzer);
                    //该分析器的实例数(除了TransactionAnalyzer默认为2,其他都是1)
                    int count = analyzer.getAnanlyzerCount(name);
                    for (int i = 1; i < count; i++) {
                        MessageAnalyzer tempAnalyzer = lookup(MessageAnalyzer.class, name);
                        tempAnalyzer.setIndex(i);
                        tempAnalyzer.initialize(startTime, m_duration, m_extraTime);
                        analyzers.add(tempAnalyzer);
                    }
                    map.put(name, analyzers);
                }
            }
        }
        return analyzers;
        、、、
    }
    //所有有效的分析器
    @Override
    public void initialize() throws InitializationException {
        Map<String, MessageAnalyzer> map = lookupMap(MessageAnalyzer.class);
        m_analyzerNames = new ArrayList<String>(map.keySet());
        、、、
        //移除禁用的分析器
        、、、
    }
}

接下来来看initialize()方法中周期管理器的初始化逻辑。

class PeriodManager {
    public void init() {
        long startTime = m_strategy.next(System.currentTimeMillis());
        startPeriod(startTime);
    }

    private void startPeriod(long startTime) {
        long endTime = startTime + m_strategy.getDuration();
        Period period = new Period(startTime, endTime, m_analyzerManager, m_serverStateManager, m_logger);
        m_periods.add(period);
//启动一个周期的所有周期任务
        period.start();
    }
}

startPeriod()初始化当前周期,并启动当前周期的所有周期任务。

class Period {
    public Period(long startTime, long endTime, MessageAnalyzerManager analyzerManager,
                  ServerStatisticManager serverStateManager, Logger logger) {
        //每一种类别的分析器,都会有至少一个MessageAnalyzer的实例,每个MessageAnalyzer都由一个对应的PeriodTask来分配任务,MessageAnalyzer与PeriodTask是1对1的关系
        List<MessageAnalyzer> messageAnalyzers = m_analyzerManager.getAnalyzer(name, startTime);
        for (MessageAnalyzer analyzer : messageAnalyzers) {
            //队列大小30000
            MessageQueue queue = new DefaultMessageQueue(QUEUE_SIZE);
            PeriodTask task = new PeriodTask(analyzer, queue, startTime);
            List<PeriodTask> analyzerTasks = m_tasks.get(name);
            if (analyzerTasks == null) {
                analyzerTasks = new ArrayList<PeriodTask>();
                m_tasks.put(name, analyzerTasks);
            }
            analyzerTasks.add(task);
        }
    }
    //启动一个周期的所有周期任务
    public void start() {
        for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
            List<PeriodTask> taskList = tasks.getValue();
            for (int i = 0; i < taskList.size(); i++) {
                PeriodTask task = taskList.get(i);
                task.setIndex(i);
                Threads.forGroup("Cat-RealtimeConsumer").start(task);
            }
        }
    }
}

最后回过头来看RealTimeConsumer.consume()方法中Period.distribute(messageTree)消息分发逻辑:

    public void distribute(MessageTree tree) {
        for (Entry<String, List<PeriodTask>> entry : m_tasks.entrySet()) {
            List<PeriodTask> tasks = entry.getValue();
            int length = tasks.size();
            int index = 0;
            boolean manyTasks = length > 1;
            if (manyTasks) {
                index = Math.abs(domain.hashCode()) % length;
            }
            PeriodTask task = tasks.get(index);
            boolean enqueue = task.enqueue(tree);
            if (!enqueue) {
                if (manyTasks) {
                    task = tasks.get((index + 1) % length);
                    enqueue = task.enqueue(tree);
                    if (!enqueue) {
                        success = false;
                    }
                } else {
                    success = false;
                }
            }
        }
    }

遍历当前周期的所有周期任务,异步入队。

相关文章

网友评论

      本文标题:美团Cat源码浅析(四)服务端消息分发

      本文链接:https://www.haomeiwen.com/subject/buxjqqtx.html