logback日志写入kafka遇到的那些坑

作者: 9c0ddf06559c | 来源:发表于2018-01-05 19:53 被阅读1439次

    这两天在学习storm实时流的时候需要将logback日志写入kafka,这期间遇到了很多坑,这里把遇到的坑和解决的问题记录一下,和大家共勉

    坑1:引入kafka的依赖和import的包不对

    由于第一次使用kafka,按照以往的经验,觉得应该是引入clinet的依赖
    所以就引入了

           <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.9.0.0</version>
            </dependency>
    

    然后
    Producer 引入的是 import org.apache.kafka.clients.producer.Producer
    结果就在调用producer.send方法的时候一直阻塞,也不报错,并且
    properties.put("serializer.class","kafka.serializer.StringEncoder"); 方法也一直报找不到类

    结果一番周折,最终发现原来是引入的包不对,应该是引入

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>${kafka.version}</version>
            </dependency>
    

    并且由于kafka是用scala语言写的,所以还应该引入scala的源依赖

          <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
    

    并且应该
    import kafka.javaapi.producer.Producer;

    坑二:在解决了坑一的问题以后,原本以为应该没事了,可是还是有问题,启动写日志的类一直报错

    Exception in thread "main" java.lang.ExceptionInInitializerError
        at org.apache.log4j.LogManager.getLogger(LogManager.java:44)
        at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:66)
        at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:270)
        at org.apache.commons.logging.impl.SLF4JLogFactory.getInstance(SLF4JLogFactory.java:155)
        at org.apache.commons.logging.impl.SLF4JLogFactory.getInstance(SLF4JLogFactory.java:132)
        at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:657)
        at org.apache.hadoop.conf.Configuration.<clinit>(Configuration.java:173)
    
    Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
        at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49)
        ... 8 more
    

    百度了一下,发现,原因是:==log4j-over-slf4j和slf4j-log4j12是跟Java日志系统相关的两个jar包,当它们同时出现在classpath下时,就可能会引起堆栈溢出异常。==

    解决的办法也很简单,在引入kafka依赖的时候,排除log4j12的依赖

    <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
    

    这样问题就迎刃而解了。

    最后附上logback写入kafka的全部代码

    logback.xml:loback配置文件

    <?xml version="1.0" encoding="utf-8" ?>
    <configuration>
        <!--配置自定义日志输出类-->
        <appender name="KAFKA" class="com.gwf.log.KafkaAppender">
            <topic>mytopic</topic>
            <zookeeperHost>localhost:2181</zookeeperHost>
            <brokerList>localhost:9092</brokerList>
            <formatter class="com.gwf.log.formatter.JsonFormatter">
                <expectJson>false</expectJson>
            </formatter>
        </appender>
    
    <!--debug 级别日志使用KAFKA写入-->
        <root level="debug">
            <appender-ref ref="KAFKA"/>
        </root>
    </configuration>
    

    JsonFormatter

    import ch.qos.logback.classic.spi.ILoggingEvent;
    import com.gwf.log.formatter.Formatter;
    
    public class JsonFormatter implements Formatter {
        private static final String QUOTE = "\"";
        private static final String COLON = ":";
        private static final String COMMA = ",";
    
        private boolean expectJson = false;
    
        @Override
        public String format(ILoggingEvent event) {
            StringBuilder sb = new StringBuilder();
            sb.append("{");
    
            fieldName("level",sb);
            quoto(event.getLevel().levelStr,sb);
            sb.append(COMMA);
    
            fieldName("logger",sb);
            quoto(event.getLoggerName(),sb);
            sb.append(COMMA);
    
            fieldName("timestamp",sb);
            sb.append(event.getTimeStamp());
            sb.append(COMMA);
    
            fieldName("message",sb);
            if(this.expectJson){
                sb.append(event.getFormattedMessage());
            }else {
                quoto(event.getFormattedMessage(),sb);
            }
    
            sb.append("}");
            return sb.toString();
        }
    
        private static void fieldName(String name,StringBuilder sb){
            quoto(name,sb);
            sb.append(COLON);
        }
    
        private static void quoto(String value,StringBuilder sb){
            sb.append(QUOTE);
            sb.append(value);
            sb.append(QUOTE);
        }
    
        private boolean isExpectJson(){
            return expectJson;
        }
    
        public void setExpectJson(boolean expectJson){
            this.expectJson = expectJson;
        }
    }
    

    KafkaAppender: 写入logback日志的类,集成AppenderBase<ILoggingEvent> 并重写append方法可以自定义发送日志的逻辑

    package com.gwf.log;
    
    import ch.qos.logback.classic.spi.ILoggingEvent;
    import ch.qos.logback.core.AppenderBase;
    import com.gwf.log.formatter.Formatter;
    import com.gwf.log.formatter.MessageFormatter;
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import lombok.Data;
    
    
    import java.util.Properties;
    
    
    @Data
    public class KafkaAppender extends AppenderBase<ILoggingEvent> {
    
        private String topic;
        private String zookeeperHost;
        private String brokerList;
        private Producer<String,String> producer;
        private Formatter formatter;
    
        @Override
        public void start() {
            if(null == this.formatter){
                this.formatter = new MessageFormatter();
            }
            super.start();
            Properties properties = new Properties();
            properties.put("metadata.broker.list",brokerList);
            properties.put("serializer.class","kafka.serializer.StringEncoder");
            properties.put("request.required.acks","1");
            ProducerConfig config = new ProducerConfig(properties);
            this.producer = new Producer<String, String>(config);
        }
    
        @Override
        public void stop() {
            super.stop();
            this.producer.close();
        }
    
        @Override
        protected void append(ILoggingEvent iLoggingEvent) {
        //讲日志转换成json
            String payload = this.formatter.format(iLoggingEvent);
            producer.send(new KeyedMessage<String, String>(topic,payload));
        }
    }
    

    RogueApplication: 模拟日志写入程序

    package com.gwf.log;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class RogueApplication  {
        private static final Logger LOG = LoggerFactory.getLogger(RogueApplication.class);
    
        public static void main(String[] args) throws InterruptedException {
            int slowCount = 6;
            int fastCount = 15;
            //slow state
            for (int i=0;i<slowCount;i++){
                LOG.warn("This is a warning (slow state)");
                Thread.sleep(5000);
            }
    
            //enter rapid state
            for(int i=0;i<fastCount;i++){
                LOG.warn("This is a warning (rapid state)");
                Thread.sleep(1000);
            }
    
            //return to slow state
            for(int i=0;i<slowCount;i++){
                LOG.warn("This is a warning (slow state)");
                Thread.sleep(5000);
            }
        }
    }
    
    

    相关文章

      网友评论

      • 89aebf8eba3d:您好,你这个有在线上使用吗?
      • fb1d121ded67:能不能顺便上传个JsonFormatter文件啊 ,分享就分享完整,总是缺点什么
        9c0ddf06559c:好的,已经更新了,由于这部分不是kafka的重点,只是为了模拟,就没有上传,不好意思

      本文标题:logback日志写入kafka遇到的那些坑

      本文链接:https://www.haomeiwen.com/subject/vsnsnxtx.html