美文网首页Java技术升华分布式&高可用
分布式系统监控(五)- 日志分析

分布式系统监控(五)- 日志分析

作者: do_young | 来源:发表于2018-08-23 15:52 被阅读175次

背景

前面一章节介绍了如何使用rocketmq来归集数据信息,下面将介绍如何使用storm从rocketmq中获取日志信息,并将消息转换为流信息进行统计分析。

storm简介

Storm集群和Hadoop集群表面上看很类似。但是Hadoop上运行的是MapReduce jobs,而在Storm上运行的是拓扑(topology),这两者之间是非常不一样的。一个关键的区别是: 一个MapReduce job最终会结束, 而一个topology永远会运行(除非你手动kill掉)。
在Storm的集群里面有两种节点: 控制节点(master node)和工作节点(worker node)。控制节点上面运行一个叫Nimbus后台程序,它的作用类似Hadoop里面的JobTracker。Nimbus负责在集群里面分发代码,分配计算任务给机器,并且监控状态。
每一个工作节点上面运行一个叫做Supervisor的节点。Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程worker。每一个工作进程执行一个topology的一个子集;一个运行的topology由运行在很多机器上的很多工作进程worker组成。


storm-run-map.png

Nimbus和Supervisor之间的所有协调工作都是通过Zookeeper集群完成。另外,Nimbus进程和Supervisor进程都是快速失败(fail-fast)和无状态的。所有的状态要么在zookeeper里面, 要么在本地磁盘上。这也就意味着你可以用kill -9来杀死Nimbus和Supervisor进程, 然后再重启它们,就好像什么都没有发生过。这个设计使得Storm异常的稳定。

Topology

计算任务Topology是由不同的Spouts和Bolts,通过数据流(Stream)连接起来的图。下面是一个Topology的结构示意图:


storm-base.png

其中包含有:

Spout

Storm中的消息源,用于为Topology生产消息(数据),一般是从外部数据源(如Message Queue、RDBMS、NoSQL、Realtime Log)不间断地读取数据并发送给Topology消息(tuple元组)。Spout可以是可靠的,也可以是不可靠的。如果这个tuple没有被Storm完全处理,可靠的消息源可以重新发射一个tuple,但是不可靠的消息源一旦发出一个tuple就不能重发了。(可靠性会在下面介绍)
Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回(如果已经没有新的tuple)。要注意的是nextTuple方法不能阻塞,因为storm在同一个线程上面调用所有消息源spout的方法。
另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。

Bolt

Storm中的消息处理者,用于为Topology进行消息的处理,Bolt可以执行过滤, 聚合, 查询数据库等操作,而且可以一级一级的进行处理。

storm安装及开发

下载的包解压即可。

storm开发

开发环境搭建。

创建一个常规的java工程或maven工程就可以,并引入storm包的依赖。

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <scope>provided</scope>
        </dependency>

spout开发

由于我们的日志归集组件使用的RocketMq,所以我们需要使用RocketMq开发spout类,在storm运行环境中运行RocketMq的消费端,获取指定主题的消息队列数据。由于storm发行发布的稳定版默认没有集成RocketMq组件,所以需要将RocketMq集成到storm中,使用RocketMq定义的spout来获取rocketmq的中消息信息。
由于storm最新开发的主干上已经集成了RocketMq,但中因为与1.2.2版本的核心接口不一致。所以需要将工程源码下载下来进行修改一下。
修改步骤为:
1.导入\external\storm-rocketmq工程
2.修改工程的pom.xml文件,将storm-core的依赖版本修改为1.2.2
3.修复编译异常,主是是将接口中Map<String,Object>修改为Map。
4.为了能在Spout中输出的Tuple中获取RocketMq的tags和keys属性信息,将org.apache.storm.rocketmq.RocketMqUtils.java进行了重构。

    /**
     * Generate Storm tuple values by Message and Scheme.
     * @param msg RocketMQ Message
     * @param scheme Scheme for deserializing
     * @return tuple values
     */
    public static List<Object> generateTuples(Message msg, Scheme scheme) {
        List<Object> tup;
        String rawKey = msg.getKeys();
        ByteBuffer body = ByteBuffer.wrap(msg.getBody());
        if (rawKey != null && scheme instanceof KeyValueScheme) {
            ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes(StandardCharsets.UTF_8));
            tup = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, body);
//ADD BEGIN
        } if (scheme instanceof MessageScheme) {
            tup = ((MessageScheme)scheme).deserializeValue(msg);
//ADD END
        }  else {
            tup = scheme.deserialize(body);
        }
        return tup;
    }

并添加一个接口MessageScheme:

package org.apache.storm.rocketmq.spout.scheme;

import java.util.List;

import org.apache.rocketmq.common.message.Message;
import org.apache.storm.spout.Scheme;

public interface MessageScheme extends Scheme {
    List<Object> deserializeValue(Message msg);
}

和一个实现类 DefaultMessageScheme:

package org.apache.storm.rocketmq.spout.scheme;

import java.nio.ByteBuffer;
import java.util.List;

import org.apache.rocketmq.common.message.Message;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class DefaultMessageScheme extends StringScheme implements MessageScheme {
    public static final String FIELD_TAGS = "Tags";
    public static final String FIELD_TEYS = "Keys";
    public static final String FIELD_BODY = "Body";
    public static final Fields DEFAULT_FIELDS = new Fields("Tags", "Keys", "Body");

    @Override
    public List<Object> deserializeValue(Message msg) {
        ByteBuffer body = ByteBuffer.wrap(msg.getBody());
        String bodyStr = deserializeString(body);
        return new Values(msg.getTags(), msg.getKeys(), bodyStr);
    }

    @Override
    public Fields getOutputFields() {
        return DEFAULT_FIELDS;
    }
}

最终将工程编译的jar包放在storm的lib目录下,并且在工程中引入该组件包。

        <dependency>
                     <!-- 由storm最新主干的\external\storm-rocketmq工程改造而来  -->
            <groupId>com.going.saas</groupId>
            <artifactId>going-storm-rocketmq</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>3.6.2.Final</version>
        </dependency>

到这一步就可以直接使用RocketMqSpout了,首先创建一个Properties类,设置创建RocketMq消息端所需要的参数。

        Properties properties = new Properties();
        properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, "10.209.8.126:9876");
        properties.setProperty(SpoutConfig.CONSUMER_GROUP, "RESOURCE_INFOS_CONSUMER_GRP");
        properties.setProperty(SpoutConfig.CONSUMER_TOPIC, "RESOURCES_INFO_TOPIC");
        properties.setProperty(SpoutConfig.SCHEME,SpoutConfig.MESSAGE_SCHEME);

再根据参数创建RocketMqSpout对象即可。

new RocketMqSpout(properties)

Bolt开发

该例子中开发一个Bolt,处理从Spout中获取转为Tuple对象的消息信息。将信息存在Redis中。

import org.apache.storm.redis.bolt.AbstractRedisBolt;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.rocketmq.spout.scheme.DefaultMessageScheme;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

import redis.clients.jedis.JedisCommands;

/**
 * 获取终端发来的资源利用信息, 将新获取的资源信息替换旧的资源信息。
 * 
 * @author Administrator
 *
 */
public class ResourcesInfoSplitBolt extends AbstractRedisBolt {

    private static final String REDIS_KEY_PREFIX = "RESOURCE_INFO_";
    
    public ResourcesInfoSplitBolt(JedisPoolConfig config) {
        super(config);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    @Override
    protected void process(Tuple tuple) {
        JedisCommands jedisCommands = null;
        try {
            String body = tuple.getStringByField(DefaultMessageScheme.FIELD_BODY);
            String tags = tuple.getStringByField(DefaultMessageScheme.FIELD_TAGS).trim();
            
            jedisCommands = getInstance();
            jedisCommands.lpush(REDIS_KEY_PREFIX+tags, body);

        } finally {
            if (jedisCommands != null) {
                returnInstance(jedisCommands);
            }
            this.collector.ack(tuple);
        }
    }

Topology开发

Spout和Bolt开发完成以后,就需要开发Topology将它们串连起来,具体示例代码如下:

import java.util.Properties;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.rocketmq.SpoutConfig;
import org.apache.storm.rocketmq.spout.RocketMqSpout;
import org.apache.storm.topology.TopologyBuilder;
import com.going.storm.bolt.ResourcesInfoSplitBolt;

public class ResourceInfosTopology {

    public static void main(String[] args) throws InterruptedException {
        

        Properties properties = new Properties();
        properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, "10.209.8.126:9876");
        properties.setProperty(SpoutConfig.CONSUMER_GROUP, "RESOURCE_INFOS_CONSUMER_GRP");
        properties.setProperty(SpoutConfig.CONSUMER_TOPIC, "RESOURCES_INFO_TOPIC");
        properties.setProperty(SpoutConfig.SCHEME,SpoutConfig.MESSAGE_SCHEME);
        // RocketMqSpout spout = new RocketMqSpout(properties);
        
        properties.setProperty("REDIS_HOST", "10.209.8.126");
        properties.setProperty("REDIS_PORT", "6379");

        // 定义拓扑
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("resouece-info-reader", new RocketMqSpout(properties));
        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()//
                .setHost(properties.getProperty("REDIS_HOST"))//
                .setPort(Integer.valueOf(properties.getProperty("REDIS_PORT")))//
                .build();
         builder.setBolt("word-normalizer", new  ResourcesInfoSplitBolt(poolConfig)).shuffleGrouping("resouece-info-reader");
         
        // builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word"));

        // 配置
        Config conf = new Config();
        // conf.put("wordsFile", args[0]);
        conf.setDebug(false);

        // 运行拓扑
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Getting-Started-Resource-Infos-Topologie", conf, builder.createTopology());
        Thread.sleep(1000000000);
        cluster.shutdown();
    }

}

storm部署运行

工程开发完成以后就需要将工程进行部署运行。我这里以docker为例对开发的功能进行部署。
这里基于storm官方的docker镜像进行部署。
我们基于storm的1.2.2版本的镜像(与本地开发的storm版本保持 一致就好),将工程依赖的第三方包加入到镜像中storm组件的extlib目录下。

FROM storm:1.2.2

ADD  extlib/dubbo-2.5.8.jar  extlib/dubbo-2.5.8.jar 
ADD  extlib/fastjson-1.2.29.jar  extlib/fastjson-1.2.29.jar 
ADD  extlib/going-storm-rocketmq-0.0.1-SNAPSHOT.jar  extlib/going-storm-rocketmq-0.0.1-SNAPSHOT.jar 
ADD  extlib/guava-13.0.1.jar  extlib/guava-13.0.1.jar 
ADD  extlib/jackson-annotations-2.9.0.jar  extlib/jackson-annotations-2.9.0.jar 
ADD  extlib/jackson-core-2.9.4.jar  extlib/jackson-core-2.9.4.jar 
ADD  extlib/jackson-databind-2.9.4.jar  extlib/jackson-databind-2.9.4.jar 
ADD  extlib/javassist-3.20.0-GA.jar  extlib/javassist-3.20.0-GA.jar 
ADD  extlib/jedis-2.9.0.jar  extlib/jedis-2.9.0.jar 
ADD  extlib/netty-3.2.5.Final.jar  extlib/netty-3.2.5.Final.jar 
ADD  extlib/netty-all-4.0.42.Final.jar  extlib/netty-all-4.0.42.Final.jar 
ADD  extlib/rocketmq-client-3.6.2.Final.jar  extlib/rocketmq-client-3.6.2.Final.jar 
ADD  extlib/rocketmq-client-4.2.0.jar  extlib/rocketmq-client-4.2.0.jar 
ADD  extlib/rocketmq-common-4.2.0.jar  extlib/rocketmq-common-4.2.0.jar 
ADD  extlib/rocketmq-remoting-4.2.0.jar  extlib/rocketmq-remoting-4.2.0.jar 
ADD  extlib/storm-redis-1.2.2.jar extlib/storm-redis-1.2.2.jar

然后通过生成好的镜像ab/storm生成实例

docker run -d --name storm-resources -it -v ${LOCAL_PATH}/going-storm-0.0.1-SNAPSHOT.jar:/topology.jar ab/storm storm jar /topology.jar com.going.storm.topology.ResourceInfosTopology

到这一步日志统计功能即开发完成了,对应的分析功能也可以通过Bolt来完成。

相关文章

  • 使用ELK构建分布式日志分析系统

    分布式系统的日志散落在各个服务器上,对于监控和排错非常不利,我们基于ELK构建了整套日志收集,分析,展示系统。 架...

  • 分布式系统监控(五)- 日志分析

    背景 前面一章节介绍了如何使用rocketmq来归集数据信息,下面将介绍如何使用storm从rocketmq中获取...

  • 11.elasticsearch介绍和安装

    什么是elasticsearch?一个开源的分布式搜索引擎,可以用来实现搜索、日志统计、分析、系统监控等功能 什么...

  • 从零开始入门 K8s | 可观测性:监控与日志

    一、背景 监控和日志是大型分布式系统的重要基础设施,监控可以帮助开发者查看系统的运行状态,而日志可以协助问题的排查...

  • 分分钟拯救监控知识体系

    分分钟拯救监控知识体系5.1 硬件监控5.2 系统监控5.3 应用监控5.4 网络监控5.5 流量分析5.6 日志...

  • 后端架构的套路

    完善的监控系统流量预估与压测日志收集与分析分布式计划任务性能优化的思路服务治理的实现轻巧的发布系统 从前端请求到后...

  • ELK前端日志分析、监控系统

    ELK前端日志分析、监控系统 前端日志与后端日志不同,具有很强的自定义特性,不像后端的接口日志、服务器日志格式比较...

  • 从一份定义文件详解ELK中Logstash插件结构

    概述 当下分布式系统的 日志收集、日志分析、日志处理、可视化 的热门技术栈方案当然非 ELK(ElasticSea...

  • Graylog收集Nginx日志

    Graylog 日志监控系统 Graylog是一个开源的日志聚合、分析、审计、展现和预警工具。功能上和ELK类似,...

  • Pinpoint安装详解

    pinpoint是java的apm工具,可以监控系统性能,分析访问情况,可以适用于分布式系统,同时对spring ...

网友评论

    本文标题:分布式系统监控(五)- 日志分析

    本文链接:https://www.haomeiwen.com/subject/fpppiftx.html