美文网首页Elastic StackJAVA架构师实验室
基于Logback底层原理实现ELK日志分析系统

基于Logback底层原理实现ELK日志分析系统

作者: Michael孟良 | 来源:发表于2018-03-07 17:47 被阅读75次

    看了很多ELK日子分析系统的实现,多数是写怎么搭建,至于怎么在不影响主项目的情况下、异步去记录日志没有过多说明,为此我以Logback为列,通过Logback底层原理,在不影响主代码的情况下,实现ELK日志分析系统。
    PS:至于安装ELK,和ELK的说明的,可以参考一下连接:
    https://www.cnblogs.com/kevingrace/p/5919021.html
    https://www.cnblogs.com/yuhuLin/p/7018858.html
    https://my.oschina.net/itblog/blog/547250

    Logback:
    Logback是由log4j创始人设计的另一个开源日志组件,官方网站: http://logback.qos.ch

    看看(最)简单的Logback配置logback.xml:

        <configuration>
        
            <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
                <!-- encoder 默认配置为PatternLayoutEncoder -->
                <encoder>
                    <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
                </encoder>
            </appender>
        
            <root level="INFO">
                <appender-ref ref="STDOUT" />
            </root>
        
        </configuration>  
    

    我们配置一个ConsoleAppender就可以在控制台上打印出日志。

    实现思路:
    Logback(我用的是logback-classic-1.2.3版本)有个DBAppender,功能就是将日志插入数据库,这样的话我们可以仿照它,实现将日志插入到elasticsearch的功能。
    我们看看DBAppender所需要加的配置:

    <appender name="db-classic-mysql" class="ch.qos.logback.classic.db.DBAppender">
    <connectionSource class="ch.qos.logback.core.db.DataSourceConnectionSource">
        <dataSource class="com.mchange.v2.c3p0.ComboPooledDataSource">
            <driverClass>com.mysql.jdbc.Driver</driverClass>
            <jdbcUrl>jdbc:mysql://{$server ip}:3306/{$dbname}</jdbcUrl>
            <user>{$user}</user>
            <password>{$password}</password>
        </dataSource>
    </connectionSource>
    </appender>
    

    从配置上看就是把参数传到DBAppender里面,我们ES的Appender只需仿照它的做法就可以。

    目标:找出DBAppender的sql在哪里commit
    我们再看看DBAppender:

    image.png

    其初始化参数:

    protected String insertPropertiesSQL;
    protected String insertExceptionSQL;
    protected String insertSQL;
    protected static final Method GET_GENERATED_KEYS_METHOD;
    private DBNameResolver dbNameResolver;
    static final int TIMESTMP_INDEX = 1;
    static final int FORMATTED_MESSAGE_INDEX = 2;
    static final int LOGGER_NAME_INDEX = 3;
    static final int LEVEL_STRING_INDEX = 4;
    static final int THREAD_NAME_INDEX = 5;
    static final int REFERENCE_FLAG_INDEX = 6;
    static final int ARG0_INDEX = 7;
    static final int ARG1_INDEX = 8;
    static final int ARG2_INDEX = 9;
    static final int ARG3_INDEX = 10;
    static final int CALLER_FILENAME_INDEX = 11;
    static final int CALLER_CLASS_INDEX = 12;
    static final int CALLER_METHOD_INDEX = 13;
    static final int CALLER_LINE_INDEX = 14;
    static final int EVENT_ID_INDEX = 15;
    static final StackTraceElement EMPTY_CALLER_DATA = CallerData.naInstance();
    

    涉及sql的就只有
    insertPropertiesSQL,insertExceptionSQL,insertSQL。而我们顺藤摸瓜,发现最后他们都做这样的操作:

    super.start();
    

    我们看看他的类继承关系图(idea自带这个插件,eclipse可以安装类似的插件):

    image.png

    看看DBAppender继承的DBAppenderBase:
    protected abstract的就有getGeneratedKeysMethod();getInsertSQL();subAppend(E var1, Connection var2, PreparedStatement var3);secondarySubAppend(E var1, Connection var2, long var3);
    就说我们仿照DBAppender继承DBAppenderBase就要实现上面4个方法;

    看看DBAppenderBase继承的UnsynchronizedAppenderBase:
    protected abstract的就只有有append(E var1);

    再看回DBAppenderBase的append(E eventObject):

    connection.commit();
    

    !!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    !!!!!!!!!!!!就是在这里!!!!!!!!!!!!
    !!!!!!!!!!!!!!!!!!!!!!!!!!!!!

    这样的话我们仿照DBAppenderBase写一个ESAppender,继承UnsynchronizedAppenderBase,实现append方法,将日志插入ES里!

    下面就是贴代码、贴图片时间:
    项目结构图:


    image.png

    主类ESAppender:

    package com.elk.log.appender;
    
    import ch.qos.logback.classic.spi.ILoggingEvent;
    import ch.qos.logback.classic.spi.IThrowableProxy;
    import ch.qos.logback.classic.spi.ThrowableProxyUtil;
    import ch.qos.logback.core.UnsynchronizedAppenderBase;
    import com.alibaba.fastjson.JSON;
    import com.elk.log.utils.InitES;
    import com.elk.log.vo.EsLogVo;
    import com.elk.log.vo.Location;
    import io.searchbox.client.JestClient;
    import io.searchbox.client.JestResult;
    import io.searchbox.core.Index;
    
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Map;
    import java.util.Properties;
    
    public class ESAppender  extends UnsynchronizedAppenderBase<ILoggingEvent> {
    
        private static JestClient jestClient ;
    
        //索引名称
        String esIndex = "java-log-#date#";
        //索引类型
        String esType = "java-log";
        //是否打印行号
        boolean isLocationInfo = true;
        //运行环境
        String env = "";
        //es地址
        String esAddress = "";
    
        public String getEsIndex() {
            return esIndex;
        }
    
        public void setEsIndex(String esIndex) {
            this.esIndex = esIndex;
        }
    
        public String getEsType() {
            return esType;
        }
    
        public void setEsType(String esType) {
            this.esType = esType;
        }
    
        public boolean isLocationInfo() {
            return isLocationInfo;
        }
    
        public void setLocationInfo(boolean locationInfo) {
            isLocationInfo = locationInfo;
        }
    
        public String getEnv() {
            return env;
        }
    
        public void setEnv(String env) {
            this.env = env;
        }
    
        public String getEsAddress() {
            return esAddress;
        }
    
        public void setEsAddress(String esAddress) {
            this.esAddress = esAddress;
        }
    
        @Override
        protected void append(ILoggingEvent event) {
            EsLogVo esLogVo = new EsLogVo();
            esLogVo.setHost("HostName");
            esLogVo.setIp("127.0.0.1");
            esLogVo.setEnv(this.env);
            esLogVo.setLevel(event.getLevel().toString());
            Location location = new Location();
            StackTraceElement[] callerDataArray = event.getCallerData();
            if(callerDataArray != null && callerDataArray.length >0){
                StackTraceElement immediateCallerData = callerDataArray[0];
                location.setClassName(immediateCallerData.getClassName());
                location.setMethod(immediateCallerData.getMethodName());
                location.setFile(immediateCallerData.getFileName());
                location.setLine(Integer.toString(immediateCallerData.getLineNumber()));
            }
            IThrowableProxy tp = event.getThrowableProxy();
            if (tp != null){
                String throwable = ThrowableProxyUtil.asString(tp);
                esLogVo.setThrowable(throwable);
            }
            esLogVo.setLocation(location);
            esLogVo.setLogger(event.getLoggerName());
            esLogVo.setMessage(event.getFormattedMessage());
    
            SimpleDateFormat df = new SimpleDateFormat("yyyy.MM.dd");
            SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd");
            esLogVo.setTimestamp(df2.format(new Date(event.getTimeStamp())));
            esLogVo.setThread(event.getThreadName());
    
            Map<String ,String > mdcPropertyMap = event.getMDCPropertyMap();
            esLogVo.setTraceId(mdcPropertyMap.get("traceId"));
            esLogVo.setRpcId(mdcPropertyMap.get("rpcId"));
    
    
            String jsonString = JSON.toJSONString(esLogVo);
            String esIndex_format = esIndex.replace("#date#",df.format(new Date(event.getTimeStamp())));
            Index index = new Index.Builder(esLogVo).index(esIndex_format).type(esType).build();
            try{
                JestResult result = jestClient.execute(index);
                System.out.println(result);
            }catch (Exception e){
                e.printStackTrace();
            }
    
        }
    
        @Override
        public void start() {
            super.start();
            Properties properties = new Properties();
            properties.put("es.hosts",esAddress);
            properties.put("es.username","zkpk");
            properties.put("es.password","123456");
            jestClient = InitES.jestClient(properties);
        }
        
        @Override
        public void stop() {
            super.stop();
            jestClient.shutdownClient();
        }
    }
    

    我们主要实现append方法,重写start(),stop()。
    append主要是插入es索引,start和stop主要用来打开和关闭ES的链接。

    工具类InitES(用于初始化es连接用的):

        package com.elk.log.utils;
    
    import io.searchbox.client.JestClientFactory;
    import io.searchbox.client.config.HttpClientConfig;
    import io.searchbox.client.JestClient;
    
    
    import java.util.ArrayList;
    import java.util.LinkedHashSet;
    import java.util.List;
    import java.util.Properties;
    public class InitES {
    
        private static io.searchbox.client.JestClient JestClient;
    
        public static JestClient jestClient(Properties properties) {
            JestClientFactory factory = new JestClientFactory();
            String userName = properties.getProperty("es.username");
            String password = properties.getProperty("es.password");
            String esHosts = properties.getProperty("es.hosts");
            List<String> serverList = new ArrayList <String>();
            for (String address:esHosts.split(",")) {
                serverList.add("http://"+address);
            }
    
            HttpClientConfig.Builder builder = new HttpClientConfig.Builder(serverList);
            builder.maxTotalConnection(20);
            builder.defaultMaxTotalConnectionPerRoute(5);
            builder.defaultCredentials(userName,password);
            builder.multiThreaded(true);
    
            factory.setHttpClientConfig(builder.build());
    
            if (JestClient == null) {
                JestClient = factory.getObject();
            }
            return JestClient;
        }
    }
    

    这里,我们用Jest方式连接ES。

    两个Javabean:EsLogVo,Location:

    public  class EsLogVo {
    
        private String host;
        private String ip;
        private String env;
        private String message;
        private String timestamp;
        private String logger;
        private String level;
        private String thread;
        private String throwable;
        private Location location;
        private String traceId;
        private String rpcId;
      ...set and get...
        
    }
    

    public class Location {
        private String className;
        private String method;
        private String file;
        private String line;
    
      ...set and get...
        
    }
    

    测试类ESLogTest:

    package com.elk.log;
    
    import org.junit.Test;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.TimeUnit;
    
    public class ESLogTest {
        private static final Logger logger = LoggerFactory.getLogger(ESLogTest.class);
    
        @Test
        public  void testLog() throws InterruptedException{
        logger.info("我是正常信息 test message info");
        logger.error("我一条异常的信息",new Exception("项目报错了,加班吧!!!!"));
        logger.debug("debug消息 debug hello hi");
        logger.warn("警告警告");
        TimeUnit.SECONDS.sleep(10);
        }
    
    }
    

    配置文件logback.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
        <contextName>logback-api</contextName>
        <property name="logback.system" value="logback-system"/>
        <property name="logback.path" value="../logs/logback-system"/>
        <property name="logback.level" value="DEBUG"/>
        <property name="logback.pattern" value="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5p - %m%n"/>
        <property name="logback.env" value="dev"/>
        <property name="logback.isLocation" value="true"/>
        <property name="logback.esAddress" value="192.168.0.128:9200,192.168.0.129:9200,192.168.0.130:9200"/>
    
    
        <appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>[%-5p %d{yy-MM-dd HH:mm:ss} %m] %caller{1}</pattern>
            </encoder>
        </appender>
    
    
        <appender name="ES" class="com.elk.log.appender.ESAppender">
            <!--索引名字 date为通配符 ,会自动替换为yyyy.MM.dd格式-->
            <esIndex>java-log-#date#</esIndex>
            <!--索引类型-->
            <esType>${logback.system}</esType>
            <!--运行环境-->
            <env>${logback.env}</env>
            <!--ES地址-->
            <esAddress>${logback.esAddress}</esAddress>
        </appender>
    
        <appender name="ASYNC_ES" class="ch.qos.logback.classic.AsyncAppender">
            <!--默认情况下,当BlockingQueue还有20%容量,他将丢弃TRACE,DEBUG和INFO级别的event,只保留warn和error-->
            <discardingThreshold>0</discardingThreshold>
            <!--BlockingQueue的最大容量,默认情况下,大小为256-->
            <queueSize>256</queueSize>
            <appender-ref ref="ES"/>
            <!--要是保留行号,需要开启为true-->
            <includeCallerData>true</includeCallerData>
        </appender>
    
        <logger name="com.elk.log" additivity="true">
            <level value="${logback.level}"></level>
            <appender-ref ref="ASYNC_ES" />
        </logger>
    
        <root level="${logback.level}">
            <appender-ref ref="stdout" />
        </root>
    </configuration>
    

    这里我们定义自己的ESAppender,取名未ES,并将所需参数传到ESAppender。
    然后我们再把它放到AsyncAppender里面进行异步处理,防止ELK报错影响到主程序。

    OK,测试:
    我们启动ELK,在Kibana的Dev Tools上创建一个java的动态日志模板:

        PUT _template/java-log
        {
            "template" : "java-log-*",
                "order" : 0,
                "settings" : {
            "index":{
                "refresh_interval": "5s"
            }
        },
            "mappings": {
            "_default_": {
                "dynamic_templates": [
                {
                    "message_field": {
                    "match_mapping_type": "string",
                            "path_match": "message",
                            "mapping": {
                        "norms": false,
                                "type": "text",
                                "analyzer": "ik_max_word",
                                "search_analyzer": "ik_max_word"
                    }
                }
                },
                {
                    "throwable_fields": {
                    "match_mapping_type": "string",
                            "path_match": "throwable",
                            "mapping": {
                        "norms": false,
                                "type": "text",
                                "analyzer": "ik_max_word",
                                "search_analyzer": "ik_max_word"
                    }
                }
                },
                {
                    "string_fields": {
                    "match_mapping_type": "string",
                            "match": "*",
                            "mapping": {
                        "norms": false,
                                "type": "text",
                                "analyzer": "ik_max_word",
                                "search_analyzer": "ik_max_word",
                                "fields": {
                            "keyword": {
                                "type": "keyword"
                            }
                        }
                    }
                }
                }
      ],
                "_all": {
                    "enabled": false
                },
                "properties": {
                    "env": {
                        "type": "keyword"
                    },
                    "host": {
                        "type": "keyword"
                    },
                    "ip": {
                        "type": "ip"
                    },
                    "level": {
                        "type": "keyword"
                    },
                    "location": {
                        "properties": {
                            "line": {
                                "type": "integer"
                            }
                        }
                    },
                    "timestamp": {
                        "type": "date"
                    }
                }
            }
        }
        }
    

    用的是IK分词器,refresh时间定为5s用于生产上调高效率,定义了几个参数作为关键搜索。

    然后将java-log-*作为主要观察对象,回到Discover

    回到idea,在ESLogTest上跑testLog:

            [INFO  18-03-07 16:56:49 我是正常信息 test message info] Caller+0  at com.elk.log.ESLogTest.testLog(ESLogTest.java:14)
    [ERROR 18-03-07 16:56:49 我一条异常的信息] Caller+0  at com.elk.log.ESLogTest.testLog(ESLogTest.java:15)
            java.lang.Exception: 项目报错了,加班吧!!!!
            at com.elk.log.ESLogTest.testLog(ESLogTest.java:15)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:59)
            at org.junit.internal.runners.MethodRoadie.runTestMethod(MethodRoadie.java:98)
            at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:79)
            at org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:87)
            at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:77)
            at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:42)
            at org.junit.internal.runners.JUnit4ClassRunner.invokeTestMethod(JUnit4ClassRunner.java:88)
            at org.junit.internal.runners.JUnit4ClassRunner.runMethods(JUnit4ClassRunner.java:51)
            at org.junit.internal.runners.JUnit4ClassRunner$1.run(JUnit4ClassRunner.java:44)
            at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:27)
            at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:37)
            at org.junit.internal.runners.JUnit4ClassRunner.run(JUnit4ClassRunner.java:42)
            at org.junit.runner.JUnitCore.run(JUnitCore.java:130)
            at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
            at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
            at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
            at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
    [DEBUG 18-03-07 16:56:49 debug消息 debug hello hi] Caller+0    at com.elk.log.ESLogTest.testLog(ESLogTest.java:16)
    [WARN  18-03-07 16:56:49 警告警告] Caller+0  at com.elk.log.ESLogTest.testLog(ESLogTest.java:17)
    [DEBUG 18-03-07 16:56:50 Request and operation succeeded] Caller+0   at io.searchbox.action.AbstractAction.createNewElasticSearchResult(AbstractAction.java:75)
            io.searchbox.core.DocumentResult@4421d75d
                    [DEBUG 18-03-07 16:56:51 Request and operation succeeded] Caller+0   at io.searchbox.action.AbstractAction.createNewElasticSearchResult(AbstractAction.java:75)
            io.searchbox.core.DocumentResult@25a5426c
                    [DEBUG 18-03-07 16:56:51 Request and operation succeeded] Caller+0   at io.searchbox.action.AbstractAction.createNewElasticSearchResult(AbstractAction.java:75)
            io.searchbox.core.DocumentResult@5fd33fbc
                    [DEBUG 18-03-07 16:56:51 Request and operation succeeded] Caller+0   at io.searchbox.action.AbstractAction.createNewElasticSearchResult(AbstractAction.java:75)
            io.searchbox.core.DocumentResult@25a52369
    

    可以说都是我们想要的信息。

    我们再回到Kibana:

    image.png

    可以看到,测试上的4条日志信息已经可以插到ES上。

    然后再看看错误的那条日志信息:

    image.png

    报错的类,方法,行数,报错内容。。。都一一列出来。

    任务完成,到此,我们可以在不影响其他代码的情况下,将日志插入到ES里,在以后的编程中可以快速定位问题。


    扩展:
    我们看看除了DBAppender实现Appender这条线外,还有哪些类走这条线:


    image.png

    我们看到实现Appender接口的就有两个类:AppenderBase和UnsynchronizedAppenderBase。
    从字面上的意思就是一个同步的,一个异步的,我们用对比工具对比一下,发现这两个类几乎没什么区别,最大不同就是AppenderBase的doAppend方法多了synchronized的锁。
    像插到数据库日志的DBAppender,控制台打印日志的ConsoleAppender。。。就会走异步的UnsynchronizedAppenderBase这条线。
    像SMTPAppender,JMSAppenderBase,SocketAppenderBase。。。就会走AppenderBase这条线。
    所以像DBAppender和DBAppenderBase,这两个类可以合成一个整体,直接继承UnsynchronizedAppenderBase,走异步这条线。

    -----------------------谢谢观看,希望本文对你有帮助----------------------------

    相关文章

      网友评论

        本文标题:基于Logback底层原理实现ELK日志分析系统

        本文链接:https://www.haomeiwen.com/subject/lhrrfftx.html